You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2018/08/23 12:50:03 UTC
[2/8] james-project git commit: JAMES-2525 changes ObjectStore#read
to return an InputStream
JAMES-2525 changes ObjectStore#read to return an InputStream
The original method is renamed `readBytes` name and all
existing uses are changed to follow the rename.
The new method is implemented in the CassandraBlobsDAO and delegates to
the existing implementation.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/86979704
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/86979704
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/86979704
Branch: refs/heads/master
Commit: 86979704d7467a9910dd571655e54bd29aaef128
Parents: 23cbb7b
Author: Jean Helou <jh...@codamens.fr>
Authored: Mon Aug 20 17:09:39 2018 +0200
Committer: Jean Helou <jh...@codamens.fr>
Committed: Thu Aug 23 14:18:49 2018 +0200
----------------------------------------------------------------------
.../mail/CassandraAttachmentMapper.java | 2 +-
.../cassandra/mail/CassandraMessageDAO.java | 2 +-
.../migration/AttachmentV2MigrationTest.java | 4 +-
.../org/apache/james/blob/api/ObjectStore.java | 5 +-
.../james/blob/api/ObjectStoreException.java | 31 ++++++++++
.../james/blob/api/ObjectStoreContract.java | 61 +++++++++++++++++---
.../james/blob/cassandra/CassandraBlobsDAO.java | 31 +++++++++-
.../blob/cassandra/CassandraBlobsDAOTest.java | 4 +-
.../cassandra/CassandraMailRepository.java | 4 +-
9 files changed, 125 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index 678b10b..444ec85 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -87,7 +87,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
return CompletableFuture.completedFuture(Optional.empty());
}
DAOAttachment daoAttachment = daoAttachmentOptional.get();
- return objectStore.read(daoAttachment.getBlobId())
+ return objectStore.readBytes(daoAttachment.getBlobId())
.thenApply(bytes -> Optional.of(daoAttachment.toAttachment(bytes)));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 8fe0284..a451adb 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -372,7 +372,7 @@ public class CassandraMessageDAO {
}
private CompletableFuture<byte[]> getFieldContent(String field, Row row) {
- return objectStore.read(blobIdFactory.from(row.getString(field)));
+ return objectStore.readBytes(blobIdFactory.from(row.getString(field)));
}
public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
index 26bcf94..8a45267 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -130,9 +130,9 @@ public class AttachmentV2MigrationTest {
.contains(CassandraAttachmentDAOV2.from(attachment1, BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).join())
.contains(CassandraAttachmentDAOV2.from(attachment2, BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
- assertThat(blobsDAO.read(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join())
+ assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join())
.isEqualTo(attachment1.getBytes());
- assertThat(blobsDAO.read(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).join())
+ assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).join())
.isEqualTo(attachment2.getBytes());
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java
index c2bd88a..7582a2c 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java
@@ -18,11 +18,14 @@
****************************************************************/
package org.apache.james.blob.api;
+import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
public interface ObjectStore {
CompletableFuture<BlobId> save(byte[] data);
- CompletableFuture<byte[]> read(BlobId blobId);
+ CompletableFuture<byte[]> readBytes(BlobId blobId);
+
+ InputStream read(BlobId blobId);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java
new file mode 100644
index 0000000..624a8e0
--- /dev/null
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java
@@ -0,0 +1,31 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.api;
+
+public class ObjectStoreException extends RuntimeException {
+
+ public ObjectStoreException(String message) {
+ super(message);
+ }
+
+ public ObjectStoreException(String message, Throwable cause) {
+ super(message,cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java
index bd4bd8e..bb51e14 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java
@@ -21,9 +21,12 @@ package org.apache.james.blob.api;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import com.google.common.base.Strings;
@@ -44,7 +47,7 @@ public interface ObjectStoreContract {
default void saveShouldSaveEmptyData() throws Exception {
BlobId blobId = testee().save(new byte[]{}).join();
- byte[] bytes = testee().read(blobId).join();
+ byte[] bytes = testee().readBytes(blobId).join();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
}
@@ -57,39 +60,79 @@ public interface ObjectStoreContract {
}
@Test
- default void readShouldBeEmptyWhenNoExisting() throws IOException {
- byte[] bytes = testee().read(blobIdFactory().from("unknown")).join();
+ default void readBytesShouldBeEmptyWhenNoExisting() throws IOException {
+ byte[] bytes = testee().readBytes(blobIdFactory().from("unknown")).join();
assertThat(bytes).isEmpty();
}
@Test
- default void readShouldReturnSavedData() throws IOException {
+ default void readBytesShouldReturnSavedData() throws IOException {
BlobId blobId = testee().save("toto".getBytes(StandardCharsets.UTF_8)).join();
- byte[] bytes = testee().read(blobId).join();
+ byte[] bytes = testee().readBytes(blobId).join();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("toto");
}
@Test
- default void readShouldReturnLongSavedData() throws IOException {
+ default void readBytesShouldReturnLongSavedData() throws IOException {
String longString = Strings.repeat("0123456789\n", 1000);
BlobId blobId = testee().save(longString.getBytes(StandardCharsets.UTF_8)).join();
- byte[] bytes = testee().read(blobId).join();
+ byte[] bytes = testee().readBytes(blobId).join();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
}
@Test
- default void readShouldReturnBigSavedData() throws IOException {
+ default void readBytesShouldReturnBigSavedData() throws IOException {
// 12 MB of text
String bigString = Strings.repeat("0123456789\r\n", 1024 * 1024);
BlobId blobId = testee().save(bigString.getBytes(StandardCharsets.UTF_8)).join();
- byte[] bytes = testee().read(blobId).join();
+ byte[] bytes = testee().readBytes(blobId).join();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(bigString);
}
+
+ @Test
+ default void readShouldBeEmptyWhenNoExistingStream() throws IOException {
+ InputStream stream = testee().read(blobIdFactory().from("unknown"));
+
+ assertThat(stream.read()).isEqualTo(IOUtils.EOF);
+ }
+
+ @Test
+ default void readShouldReturnSavedData() throws IOException {
+ byte[] bytes = "toto".getBytes(StandardCharsets.UTF_8);
+ BlobId blobId = testee().save(bytes).join();
+
+ InputStream read = testee().read(blobId);
+
+ assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
+ }
+
+ @Test
+ default void readShouldReturnLongSavedData() throws IOException {
+ String longString = Strings.repeat("0123456789\n", 1000);
+ byte[] bytes = longString.getBytes(StandardCharsets.UTF_8);
+ BlobId blobId = testee().save(bytes).join();
+
+ InputStream read = testee().read(blobId);
+
+ assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
+ }
+
+ @Test
+ default void readShouldReturnBigSavedData() throws IOException {
+ // 12 MB of text
+ String bigString = Strings.repeat("0123456789\r\n", 1024 * 1024);
+ byte[] bytes = bigString.getBytes(StandardCharsets.UTF_8);
+ BlobId blobId = testee().save(bytes).join();
+
+ InputStream read = testee().read(blobId);
+
+ assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
index 7c8eda8..3436d73 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
@@ -24,7 +24,11 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.Pipe;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
@@ -37,6 +41,7 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.ObjectStore;
+import org.apache.james.blob.api.ObjectStoreException;
import org.apache.james.blob.cassandra.BlobTable.BlobParts;
import org.apache.james.blob.cassandra.utils.DataChunker;
import org.apache.james.util.FluentFutureStream;
@@ -47,6 +52,8 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.github.fge.lambdas.Throwing;
+import com.github.fge.lambdas.consumers.ConsumerChainer;
import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -149,7 +156,7 @@ public class CassandraBlobsDAO implements ObjectStore {
}
@Override
- public CompletableFuture<byte[]> read(BlobId blobId) {
+ public CompletableFuture<byte[]> readBytes(BlobId blobId) {
return cassandraAsyncExecutor.executeSingleRow(
select.bind()
.setString(BlobTable.ID, blobId.asString()))
@@ -209,4 +216,26 @@ public class CassandraBlobsDAO implements ObjectStore {
this.row = row;
}
}
+
+ @Override
+ public InputStream read(BlobId blobId) {
+ try {
+ Pipe pipe = Pipe.open();
+ ConsumerChainer<ByteBuffer> consumer = Throwing.consumer(
+ bytes -> {
+ try (Pipe.SinkChannel sink = pipe.sink()) {
+ sink.write(bytes);
+ }
+ }
+ );
+ readBytes(blobId)
+ .thenApply(ByteBuffer::wrap)
+ .thenAccept(consumer.sneakyThrow());
+ return Channels.newInputStream(pipe.source());
+ } catch (IOException cause) {
+ throw new ObjectStoreException(
+ "Failed to convert CompletableFuture<byte[]> to InputStream",
+ cause);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
index 07af72f..cb7db4d 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
@@ -84,11 +84,11 @@ public class CassandraBlobsDAOTest implements ObjectStoreContract {
}
@Test
- public void readShouldReturnSplitSavedDataByChunk() throws IOException {
+ public void readBytesShouldReturnSplitSavedDataByChunk() throws IOException {
String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE);
BlobId blobId = testee.save(longString.getBytes(StandardCharsets.UTF_8)).join();
- byte[] bytes = testee.read(blobId).join();
+ byte[] bytes = testee.readBytes(blobId).join();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index c2e449d..9074481 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -158,8 +158,8 @@ public class CassandraMailRepository implements MailRepository {
public CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
return CompletableFutureUtil.combine(
- objectStore.read(mailDTO.getHeaderBlobId()),
- objectStore.read(mailDTO.getBodyBlobId()),
+ objectStore.readBytes(mailDTO.getHeaderBlobId()),
+ objectStore.readBytes(mailDTO.getBodyBlobId()),
Bytes::concat)
.thenApply(this::toMimeMessage)
.thenApply(mimeMessage -> mailDTO.getMailBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org