You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2018/08/23 12:50:03 UTC

[2/8] james-project git commit: JAMES-2525 changes ObjectStore#read to return an InputStream

JAMES-2525 changes ObjectStore#read to return an InputStream

The original method is renamed `readBytes` name and all
existing uses are changed to follow the rename.

The new method is implemented in the CassandraBlobsDAO and delegates to
the existing implementation.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/86979704
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/86979704
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/86979704

Branch: refs/heads/master
Commit: 86979704d7467a9910dd571655e54bd29aaef128
Parents: 23cbb7b
Author: Jean Helou <jh...@codamens.fr>
Authored: Mon Aug 20 17:09:39 2018 +0200
Committer: Jean Helou <jh...@codamens.fr>
Committed: Thu Aug 23 14:18:49 2018 +0200

----------------------------------------------------------------------
 .../mail/CassandraAttachmentMapper.java         |  2 +-
 .../cassandra/mail/CassandraMessageDAO.java     |  2 +-
 .../migration/AttachmentV2MigrationTest.java    |  4 +-
 .../org/apache/james/blob/api/ObjectStore.java  |  5 +-
 .../james/blob/api/ObjectStoreException.java    | 31 ++++++++++
 .../james/blob/api/ObjectStoreContract.java     | 61 +++++++++++++++++---
 .../james/blob/cassandra/CassandraBlobsDAO.java | 31 +++++++++-
 .../blob/cassandra/CassandraBlobsDAOTest.java   |  4 +-
 .../cassandra/CassandraMailRepository.java      |  4 +-
 9 files changed, 125 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index 678b10b..444ec85 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -87,7 +87,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
             return CompletableFuture.completedFuture(Optional.empty());
         }
         DAOAttachment daoAttachment = daoAttachmentOptional.get();
-        return objectStore.read(daoAttachment.getBlobId())
+        return objectStore.readBytes(daoAttachment.getBlobId())
             .thenApply(bytes -> Optional.of(daoAttachment.toAttachment(bytes)));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 8fe0284..a451adb 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -372,7 +372,7 @@ public class CassandraMessageDAO {
     }
 
     private CompletableFuture<byte[]> getFieldContent(String field, Row row) {
-        return objectStore.read(blobIdFactory.from(row.getString(field)));
+        return objectStore.readBytes(blobIdFactory.from(row.getString(field)));
     }
 
     public static MessageResult notFound(ComposedMessageIdWithMetaData id) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
index 26bcf94..8a45267 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -130,9 +130,9 @@ public class AttachmentV2MigrationTest {
             .contains(CassandraAttachmentDAOV2.from(attachment1, BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
         assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).join())
             .contains(CassandraAttachmentDAOV2.from(attachment2, BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
-        assertThat(blobsDAO.read(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join())
+        assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join())
             .isEqualTo(attachment1.getBytes());
-        assertThat(blobsDAO.read(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).join())
+        assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).join())
             .isEqualTo(attachment2.getBytes());
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java
index c2bd88a..7582a2c 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java
@@ -18,11 +18,14 @@
  ****************************************************************/
 package org.apache.james.blob.api;
 
+import java.io.InputStream;
 import java.util.concurrent.CompletableFuture;
 
 public interface ObjectStore {
 
     CompletableFuture<BlobId> save(byte[] data);
 
-    CompletableFuture<byte[]> read(BlobId blobId);
+    CompletableFuture<byte[]> readBytes(BlobId blobId);
+
+    InputStream read(BlobId blobId);
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java
new file mode 100644
index 0000000..624a8e0
--- /dev/null
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java
@@ -0,0 +1,31 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.api;
+
+public class ObjectStoreException extends RuntimeException {
+
+    public ObjectStoreException(String message) {
+        super(message);
+    }
+
+    public ObjectStoreException(String message, Throwable cause) {
+        super(message,cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java
index bd4bd8e..bb51e14 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java
@@ -21,9 +21,12 @@ package org.apache.james.blob.api;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.base.Strings;
@@ -44,7 +47,7 @@ public interface ObjectStoreContract {
     default void saveShouldSaveEmptyData() throws Exception {
         BlobId blobId = testee().save(new byte[]{}).join();
 
-        byte[] bytes = testee().read(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).join();
 
         assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
     }
@@ -57,39 +60,79 @@ public interface ObjectStoreContract {
     }
 
     @Test
-    default void readShouldBeEmptyWhenNoExisting() throws IOException {
-        byte[] bytes = testee().read(blobIdFactory().from("unknown")).join();
+    default void readBytesShouldBeEmptyWhenNoExisting() throws IOException {
+        byte[] bytes = testee().readBytes(blobIdFactory().from("unknown")).join();
 
         assertThat(bytes).isEmpty();
     }
 
     @Test
-    default void readShouldReturnSavedData() throws IOException {
+    default void readBytesShouldReturnSavedData() throws IOException {
         BlobId blobId = testee().save("toto".getBytes(StandardCharsets.UTF_8)).join();
 
-        byte[] bytes = testee().read(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).join();
 
         assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("toto");
     }
 
     @Test
-    default void readShouldReturnLongSavedData() throws IOException {
+    default void readBytesShouldReturnLongSavedData() throws IOException {
         String longString = Strings.repeat("0123456789\n", 1000);
         BlobId blobId = testee().save(longString.getBytes(StandardCharsets.UTF_8)).join();
 
-        byte[] bytes = testee().read(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).join();
 
         assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
     }
 
     @Test
-    default void readShouldReturnBigSavedData() throws IOException {
+    default void readBytesShouldReturnBigSavedData() throws IOException {
         // 12 MB of text
         String bigString = Strings.repeat("0123456789\r\n", 1024 * 1024);
         BlobId blobId = testee().save(bigString.getBytes(StandardCharsets.UTF_8)).join();
 
-        byte[] bytes = testee().read(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).join();
 
         assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(bigString);
     }
+
+    @Test
+    default void readShouldBeEmptyWhenNoExistingStream() throws IOException {
+        InputStream stream = testee().read(blobIdFactory().from("unknown"));
+
+        assertThat(stream.read()).isEqualTo(IOUtils.EOF);
+    }
+
+    @Test
+    default void readShouldReturnSavedData() throws IOException {
+        byte[] bytes = "toto".getBytes(StandardCharsets.UTF_8);
+        BlobId blobId = testee().save(bytes).join();
+
+        InputStream read = testee().read(blobId);
+
+        assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
+    }
+
+    @Test
+    default void readShouldReturnLongSavedData() throws IOException {
+        String longString = Strings.repeat("0123456789\n", 1000);
+        byte[] bytes = longString.getBytes(StandardCharsets.UTF_8);
+        BlobId blobId = testee().save(bytes).join();
+
+        InputStream read = testee().read(blobId);
+
+        assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
+    }
+
+    @Test
+    default void readShouldReturnBigSavedData() throws IOException {
+        // 12 MB of text
+        String bigString = Strings.repeat("0123456789\r\n", 1024 * 1024);
+        byte[] bytes = bigString.getBytes(StandardCharsets.UTF_8);
+        BlobId blobId = testee().save(bytes).join();
+
+        InputStream read = testee().read(blobId);
+
+        assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
index 7c8eda8..3436d73 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
@@ -24,7 +24,11 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.Pipe;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.IntStream;
@@ -37,6 +41,7 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.ObjectStore;
+import org.apache.james.blob.api.ObjectStoreException;
 import org.apache.james.blob.cassandra.BlobTable.BlobParts;
 import org.apache.james.blob.cassandra.utils.DataChunker;
 import org.apache.james.util.FluentFutureStream;
@@ -47,6 +52,8 @@ import org.slf4j.LoggerFactory;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.github.fge.lambdas.Throwing;
+import com.github.fge.lambdas.consumers.ConsumerChainer;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -149,7 +156,7 @@ public class CassandraBlobsDAO implements ObjectStore {
     }
 
     @Override
-    public CompletableFuture<byte[]> read(BlobId blobId) {
+    public CompletableFuture<byte[]> readBytes(BlobId blobId) {
         return cassandraAsyncExecutor.executeSingleRow(
             select.bind()
                 .setString(BlobTable.ID, blobId.asString()))
@@ -209,4 +216,26 @@ public class CassandraBlobsDAO implements ObjectStore {
             this.row = row;
         }
     }
+
+    @Override
+    public InputStream read(BlobId blobId) {
+        try {
+            Pipe pipe = Pipe.open();
+            ConsumerChainer<ByteBuffer> consumer = Throwing.consumer(
+                bytes -> {
+                    try (Pipe.SinkChannel sink = pipe.sink()) {
+                        sink.write(bytes);
+                    }
+                }
+            );
+            readBytes(blobId)
+                .thenApply(ByteBuffer::wrap)
+                .thenAccept(consumer.sneakyThrow());
+            return Channels.newInputStream(pipe.source());
+        } catch (IOException cause) {
+            throw new ObjectStoreException(
+                "Failed to convert CompletableFuture<byte[]> to InputStream",
+                cause);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
index 07af72f..cb7db4d 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
@@ -84,11 +84,11 @@ public class CassandraBlobsDAOTest implements ObjectStoreContract {
     }
 
     @Test
-    public void readShouldReturnSplitSavedDataByChunk() throws IOException {
+    public void readBytesShouldReturnSplitSavedDataByChunk() throws IOException {
         String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE);
         BlobId blobId = testee.save(longString.getBytes(StandardCharsets.UTF_8)).join();
 
-        byte[] bytes = testee.read(blobId).join();
+        byte[] bytes = testee.readBytes(blobId).join();
 
         assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index c2e449d..9074481 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -158,8 +158,8 @@ public class CassandraMailRepository implements MailRepository {
 
     public CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
         return CompletableFutureUtil.combine(
-            objectStore.read(mailDTO.getHeaderBlobId()),
-            objectStore.read(mailDTO.getBodyBlobId()),
+            objectStore.readBytes(mailDTO.getHeaderBlobId()),
+            objectStore.readBytes(mailDTO.getBodyBlobId()),
             Bytes::concat)
             .thenApply(this::toMimeMessage)
             .thenApply(mimeMessage -> mailDTO.getMailBuilder()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org