You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/23 04:21:54 UTC

[james-project] 10/10: JAMES-2851 CassandraBlobStore reads ByteBuffer instead of byte[]

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4cb2104ea88b1c5468d673be360be333f38e1a49
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Aug 20 14:20:18 2019 +0200

    JAMES-2851 CassandraBlobStore reads ByteBuffer instead of byte[]
---
 .../james/blob/cassandra/CassandraBlobStore.java     | 20 ++++++++++++++++----
 .../james/blob/cassandra/CassandraBucketDAO.java     |  9 +++------
 .../blob/cassandra/CassandraDefaultBucketDAO.java    |  9 +++------
 .../james/blob/cassandra/CassandraBucketDAOTest.java | 20 ++++++++++----------
 .../cassandra/CassandraDefaultBucketDAOTest.java     | 18 +++++++++---------
 .../java/org/apache/james/util/ReactorUtils.java     | 19 ++++++++++---------
 .../java/org/apache/james/util/ReactorUtilsTest.java | 15 ++++++++++-----
 7 files changed, 61 insertions(+), 49 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 171f215..7f7efe3 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.cassandra;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -42,7 +43,6 @@ import org.apache.james.util.ReactorUtils;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -114,7 +114,7 @@ public class CassandraBlobStore implements BlobStore {
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         return readBlobParts(bucketName, blobId)
             .collectList()
-            .map(parts -> Bytes.concat(parts.toArray(new byte[0][])));
+            .map(this::byteBuffersToBytesArray);
     }
 
     @Override
@@ -127,7 +127,7 @@ public class CassandraBlobStore implements BlobStore {
         return BucketName.DEFAULT;
     }
 
-    private Flux<byte[]> readBlobParts(BucketName bucketName, BlobId blobId) {
+    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
         Integer rowCount = selectRowCount(bucketName, blobId)
             .publishOn(Schedulers.elastic())
             .single()
@@ -173,7 +173,7 @@ public class CassandraBlobStore implements BlobStore {
         }
     }
 
-    private Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
+    private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
         if (isDefaultBucket(bucketName)) {
             return defaultBucketDAO.readPart(blobId, partIndex);
         } else {
@@ -208,4 +208,16 @@ public class CassandraBlobStore implements BlobStore {
     private boolean isDefaultBucket(BucketName bucketName) {
         return bucketName.equals(getDefaultBucketName());
     }
+
+    private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) {
+        int targetSize = byteBuffers
+            .stream()
+            .mapToInt(ByteBuffer::remaining)
+            .sum();
+
+        return byteBuffers
+            .stream()
+            .reduce(ByteBuffer.allocate(targetSize), (accumulator, element) -> accumulator.put(element))
+            .array();
+    }
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index 8d355bf..f6d124f 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -145,7 +145,7 @@ class CassandraBucketDAO {
             .map(row -> row.getInt(NUMBER_OF_CHUNK));
     }
 
-    Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, int position) {
+    Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
                 .setString(BucketBlobParts.BUCKET, bucketName.asString())
@@ -173,10 +173,7 @@ class CassandraBucketDAO {
             .map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), blobIdFactory.from(row.getString(ID))));
     }
 
-    private byte[] rowToData(Row row) {
-        ByteBuffer byteBuffer = row.getBytes(BucketBlobParts.DATA);
-        byte[] data = new byte[byteBuffer.remaining()];
-        byteBuffer.get(data);
-        return data;
+    private ByteBuffer rowToData(Row row) {
+        return row.getBytes(BucketBlobParts.DATA);
     }
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
index 3bcc99e..d564066 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
@@ -123,7 +123,7 @@ public class CassandraDefaultBucketDAO {
             .map(row -> row.getInt(NUMBER_OF_CHUNK));
     }
 
-    Mono<byte[]> readPart(BlobId blobId, int position) {
+    Mono<ByteBuffer> readPart(BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
                 .setString(DefaultBucketBlobParts.ID, blobId.asString())
@@ -143,10 +143,7 @@ public class CassandraDefaultBucketDAO {
                 .setString(DefaultBucketBlobParts.ID, blobId.asString()));
     }
 
-    private byte[] rowToData(Row row) {
-        ByteBuffer byteBuffer = row.getBytes(DefaultBucketBlobParts.DATA);
-        byte[] data = new byte[byteBuffer.remaining()];
-        byteBuffer.get(data);
-        return data;
+    private ByteBuffer rowToData(Row row) {
+        return row.getBytes(DefaultBucketBlobParts.DATA);
     }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
index 66a7abc..2142200 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
@@ -55,7 +55,7 @@ class CassandraBucketDAOTest {
 
     @Test
     void readPartShouldReturnEmptyWhenNone() {
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -88,7 +88,7 @@ class CassandraBucketDAOTest {
 
         testee.deleteParts(BUCKET_NAME, BLOB_ID).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
         assertThat(maybeBytes).isEmpty();
     }
 
@@ -99,8 +99,8 @@ class CassandraBucketDAOTest {
 
         testee.deleteParts(BUCKET_NAME, BLOB_ID).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
-        Optional<byte[]> maybeBytes2 = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes2 = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
         assertThat(maybeBytes).isEmpty();
         assertThat(maybeBytes2).isEmpty();
     }
@@ -109,16 +109,16 @@ class CassandraBucketDAOTest {
     void readPartShouldReturnPreviouslySavedData() {
         testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
 
-        assertThat(maybeBytes).contains(DATA);
+        assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA));
     }
 
     @Test
     void readPartShouldNotReturnContentOfOtherParts() {
         testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -127,7 +127,7 @@ class CassandraBucketDAOTest {
     void readPartShouldNotReturnContentOfOtherBuckets() {
         testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME_2, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME_2, BLOB_ID, POSITION).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -137,9 +137,9 @@ class CassandraBucketDAOTest {
         testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
         testee.writePart(ByteBuffer.wrap(DATA_2), BUCKET_NAME, BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
 
-        assertThat(maybeBytes).contains(DATA_2);
+        assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA_2));
     }
 
     @Test
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
index 01ffaab..78d9359 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
@@ -52,7 +52,7 @@ class CassandraDefaultBucketDAOTest {
 
     @Test
     void readPartShouldReturnEmptyWhenNone() {
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -61,16 +61,16 @@ class CassandraDefaultBucketDAOTest {
     void readPartShouldReturnPreviouslySavedData() {
         testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
 
-        assertThat(maybeBytes).contains(DATA);
+        assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA));
     }
 
     @Test
     void readPartShouldNotReturnContentOfOtherParts() {
         testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -80,9 +80,9 @@ class CassandraDefaultBucketDAOTest {
         testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
         testee.writePart(ByteBuffer.wrap(DATA_2), BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
 
-        assertThat(maybeBytes).contains(DATA_2);
+        assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA_2));
     }
 
     @Test
@@ -138,7 +138,7 @@ class CassandraDefaultBucketDAOTest {
 
         testee.deleteParts(BLOB_ID).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
         assertThat(maybeBytes).isEmpty();
     }
 
@@ -149,8 +149,8 @@ class CassandraDefaultBucketDAOTest {
 
         testee.deleteParts(BLOB_ID).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
-        Optional<byte[]> maybeBytes2 = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes2 = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
         assertThat(maybeBytes).isEmpty();
         assertThat(maybeBytes2).isEmpty();
     }
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 1ed7963..df51e07 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -18,9 +18,9 @@
  ****************************************************************/
 package org.apache.james.util;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.Spliterator;
 import java.util.stream.Stream;
@@ -33,18 +33,18 @@ public class ReactorUtils {
         return Mono.fromRunnable(runnable).then(Mono.empty());
     }
 
-    public static InputStream toInputStream(Flux<byte[]> byteArrays) {
+    public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
         return new StreamInputStream(byteArrays.toStream(1));
     }
 
     private static  class StreamInputStream extends InputStream {
         private static final int NO_MORE_DATA = -1;
 
-        private final Stream<byte[]> source;
-        private final Spliterator<byte[]> spliterator;
-        private Optional<ByteArrayInputStream> currentItemByteStream;
+        private final Stream<ByteBuffer> source;
+        private final Spliterator<ByteBuffer> spliterator;
+        private Optional<ByteBuffer> currentItemByteStream;
 
-        StreamInputStream(Stream<byte[]> source) {
+        StreamInputStream(Stream<ByteBuffer> source) {
             this.source = source;
             this.spliterator = source.spliterator();
             this.currentItemByteStream = Optional.empty();
@@ -62,8 +62,9 @@ public class ReactorUtils {
                     return NO_MORE_DATA;
                 }
 
-                return currentItemByteStream.map(ByteArrayInputStream::read)
-                    .filter(readResult -> readResult != NO_MORE_DATA)
+                return currentItemByteStream
+                    .filter(ByteBuffer::hasRemaining)
+                    .map(buffer -> buffer.get() & 0xFF)
                     .orElseGet(this::readNextChunk);
             } catch (Throwable t) {
                 source.close();
@@ -77,7 +78,7 @@ public class ReactorUtils {
 
         private void switchToNextChunk() {
             spliterator.tryAdvance(bytes ->
-                currentItemByteStream = Optional.of(new ByteArrayInputStream(bytes)));
+                currentItemByteStream = Optional.of(bytes));
         }
 
         private Integer readNextChunk() {
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 98bb9e1..7bdc678 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -83,11 +84,12 @@ class ReactorUtilsTest {
         @Test
         void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<byte[]> source = Flux.range(0, 10)
+            Flux<ByteBuffer> source = Flux.range(0, 10)
                 .subscribeOn(Schedulers.elastic())
                 .limitRate(2)
                 .doOnRequest(request -> generateElements.getAndAdd((int) request))
-                .map(index -> new byte[] {(byte) (int) index});
+                .map(index -> new byte[] {(byte) (int) index})
+                .map(ByteBuffer::wrap);
 
             InputStream inputStream = ReactorUtils.toInputStream(source);
             byte[] readBytes = new byte[5];
@@ -101,8 +103,9 @@ class ReactorUtilsTest {
         @Test
         void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
+            Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
                     .subscribeOn(Schedulers.elastic())
+                    .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
 
@@ -118,8 +121,9 @@ class ReactorUtilsTest {
         @Test
         void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
+            Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
                     .subscribeOn(Schedulers.elastic())
+                    .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
 
@@ -135,8 +139,9 @@ class ReactorUtilsTest {
         @Test
         void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<byte[]> source = Flux.<byte[]>empty()
+            Flux<ByteBuffer> source = Flux.<byte[]>empty()
                     .subscribeOn(Schedulers.elastic())
+                    .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org