You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/23 04:21:54 UTC
[james-project] 10/10: JAMES-2851 CassandraBlobStore reads
ByteBuffer instead of byte[]
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4cb2104ea88b1c5468d673be360be333f38e1a49
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Aug 20 14:20:18 2019 +0200
JAMES-2851 CassandraBlobStore reads ByteBuffer instead of byte[]
---
.../james/blob/cassandra/CassandraBlobStore.java | 20 ++++++++++++++++----
.../james/blob/cassandra/CassandraBucketDAO.java | 9 +++------
.../blob/cassandra/CassandraDefaultBucketDAO.java | 9 +++------
.../james/blob/cassandra/CassandraBucketDAOTest.java | 20 ++++++++++----------
.../cassandra/CassandraDefaultBucketDAOTest.java | 18 +++++++++---------
.../java/org/apache/james/util/ReactorUtils.java | 19 ++++++++++---------
.../java/org/apache/james/util/ReactorUtilsTest.java | 15 ++++++++++-----
7 files changed, 61 insertions(+), 49 deletions(-)
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 171f215..7f7efe3 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.cassandra;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Comparator;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -42,7 +43,6 @@ import org.apache.james.util.ReactorUtils;
import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.primitives.Bytes;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -114,7 +114,7 @@ public class CassandraBlobStore implements BlobStore {
public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
return readBlobParts(bucketName, blobId)
.collectList()
- .map(parts -> Bytes.concat(parts.toArray(new byte[0][])));
+ .map(this::byteBuffersToBytesArray);
}
@Override
@@ -127,7 +127,7 @@ public class CassandraBlobStore implements BlobStore {
return BucketName.DEFAULT;
}
- private Flux<byte[]> readBlobParts(BucketName bucketName, BlobId blobId) {
+ private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
Integer rowCount = selectRowCount(bucketName, blobId)
.publishOn(Schedulers.elastic())
.single()
@@ -173,7 +173,7 @@ public class CassandraBlobStore implements BlobStore {
}
}
- private Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
+ private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
if (isDefaultBucket(bucketName)) {
return defaultBucketDAO.readPart(blobId, partIndex);
} else {
@@ -208,4 +208,16 @@ public class CassandraBlobStore implements BlobStore {
private boolean isDefaultBucket(BucketName bucketName) {
return bucketName.equals(getDefaultBucketName());
}
+
+ private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) {
+ int targetSize = byteBuffers
+ .stream()
+ .mapToInt(ByteBuffer::remaining)
+ .sum();
+
+ return byteBuffers
+ .stream()
+ .reduce(ByteBuffer.allocate(targetSize), (accumulator, element) -> accumulator.put(element))
+ .array();
+ }
}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index 8d355bf..f6d124f 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -145,7 +145,7 @@ class CassandraBucketDAO {
.map(row -> row.getInt(NUMBER_OF_CHUNK));
}
- Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, int position) {
+ Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, int position) {
return cassandraAsyncExecutor.executeSingleRow(
selectPart.bind()
.setString(BucketBlobParts.BUCKET, bucketName.asString())
@@ -173,10 +173,7 @@ class CassandraBucketDAO {
.map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), blobIdFactory.from(row.getString(ID))));
}
- private byte[] rowToData(Row row) {
- ByteBuffer byteBuffer = row.getBytes(BucketBlobParts.DATA);
- byte[] data = new byte[byteBuffer.remaining()];
- byteBuffer.get(data);
- return data;
+ private ByteBuffer rowToData(Row row) {
+ return row.getBytes(BucketBlobParts.DATA);
}
}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
index 3bcc99e..d564066 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
@@ -123,7 +123,7 @@ public class CassandraDefaultBucketDAO {
.map(row -> row.getInt(NUMBER_OF_CHUNK));
}
- Mono<byte[]> readPart(BlobId blobId, int position) {
+ Mono<ByteBuffer> readPart(BlobId blobId, int position) {
return cassandraAsyncExecutor.executeSingleRow(
selectPart.bind()
.setString(DefaultBucketBlobParts.ID, blobId.asString())
@@ -143,10 +143,7 @@ public class CassandraDefaultBucketDAO {
.setString(DefaultBucketBlobParts.ID, blobId.asString()));
}
- private byte[] rowToData(Row row) {
- ByteBuffer byteBuffer = row.getBytes(DefaultBucketBlobParts.DATA);
- byte[] data = new byte[byteBuffer.remaining()];
- byteBuffer.get(data);
- return data;
+ private ByteBuffer rowToData(Row row) {
+ return row.getBytes(DefaultBucketBlobParts.DATA);
}
}
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
index 66a7abc..2142200 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
@@ -55,7 +55,7 @@ class CassandraBucketDAOTest {
@Test
void readPartShouldReturnEmptyWhenNone() {
- Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
assertThat(maybeBytes).isEmpty();
}
@@ -88,7 +88,7 @@ class CassandraBucketDAOTest {
testee.deleteParts(BUCKET_NAME, BLOB_ID).block();
- Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
assertThat(maybeBytes).isEmpty();
}
@@ -99,8 +99,8 @@ class CassandraBucketDAOTest {
testee.deleteParts(BUCKET_NAME, BLOB_ID).block();
- Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
- Optional<byte[]> maybeBytes2 = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes2 = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
assertThat(maybeBytes).isEmpty();
assertThat(maybeBytes2).isEmpty();
}
@@ -109,16 +109,16 @@ class CassandraBucketDAOTest {
void readPartShouldReturnPreviouslySavedData() {
testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
- Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
- assertThat(maybeBytes).contains(DATA);
+ assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA));
}
@Test
void readPartShouldNotReturnContentOfOtherParts() {
testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
- Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
assertThat(maybeBytes).isEmpty();
}
@@ -127,7 +127,7 @@ class CassandraBucketDAOTest {
void readPartShouldNotReturnContentOfOtherBuckets() {
testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
- Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME_2, BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME_2, BLOB_ID, POSITION).blockOptional();
assertThat(maybeBytes).isEmpty();
}
@@ -137,9 +137,9 @@ class CassandraBucketDAOTest {
testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
testee.writePart(ByteBuffer.wrap(DATA_2), BUCKET_NAME, BLOB_ID, POSITION).block();
- Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
- assertThat(maybeBytes).contains(DATA_2);
+ assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA_2));
}
@Test
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
index 01ffaab..78d9359 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
@@ -52,7 +52,7 @@ class CassandraDefaultBucketDAOTest {
@Test
void readPartShouldReturnEmptyWhenNone() {
- Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
assertThat(maybeBytes).isEmpty();
}
@@ -61,16 +61,16 @@ class CassandraDefaultBucketDAOTest {
void readPartShouldReturnPreviouslySavedData() {
testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
- Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
- assertThat(maybeBytes).contains(DATA);
+ assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA));
}
@Test
void readPartShouldNotReturnContentOfOtherParts() {
testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
- Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
assertThat(maybeBytes).isEmpty();
}
@@ -80,9 +80,9 @@ class CassandraDefaultBucketDAOTest {
testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
testee.writePart(ByteBuffer.wrap(DATA_2), BLOB_ID, POSITION).block();
- Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
- assertThat(maybeBytes).contains(DATA_2);
+ assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA_2));
}
@Test
@@ -138,7 +138,7 @@ class CassandraDefaultBucketDAOTest {
testee.deleteParts(BLOB_ID).block();
- Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
assertThat(maybeBytes).isEmpty();
}
@@ -149,8 +149,8 @@ class CassandraDefaultBucketDAOTest {
testee.deleteParts(BLOB_ID).block();
- Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
- Optional<byte[]> maybeBytes2 = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
+ Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+ Optional<ByteBuffer> maybeBytes2 = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
assertThat(maybeBytes).isEmpty();
assertThat(maybeBytes2).isEmpty();
}
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 1ed7963..df51e07 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -18,9 +18,9 @@
****************************************************************/
package org.apache.james.util;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Spliterator;
import java.util.stream.Stream;
@@ -33,18 +33,18 @@ public class ReactorUtils {
return Mono.fromRunnable(runnable).then(Mono.empty());
}
- public static InputStream toInputStream(Flux<byte[]> byteArrays) {
+ public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
return new StreamInputStream(byteArrays.toStream(1));
}
private static class StreamInputStream extends InputStream {
private static final int NO_MORE_DATA = -1;
- private final Stream<byte[]> source;
- private final Spliterator<byte[]> spliterator;
- private Optional<ByteArrayInputStream> currentItemByteStream;
+ private final Stream<ByteBuffer> source;
+ private final Spliterator<ByteBuffer> spliterator;
+ private Optional<ByteBuffer> currentItemByteStream;
- StreamInputStream(Stream<byte[]> source) {
+ StreamInputStream(Stream<ByteBuffer> source) {
this.source = source;
this.spliterator = source.spliterator();
this.currentItemByteStream = Optional.empty();
@@ -62,8 +62,9 @@ public class ReactorUtils {
return NO_MORE_DATA;
}
- return currentItemByteStream.map(ByteArrayInputStream::read)
- .filter(readResult -> readResult != NO_MORE_DATA)
+ return currentItemByteStream
+ .filter(ByteBuffer::hasRemaining)
+ .map(buffer -> buffer.get() & 0xFF)
.orElseGet(this::readNextChunk);
} catch (Throwable t) {
source.close();
@@ -77,7 +78,7 @@ public class ReactorUtils {
private void switchToNextChunk() {
spliterator.tryAdvance(bytes ->
- currentItemByteStream = Optional.of(new ByteArrayInputStream(bytes)));
+ currentItemByteStream = Optional.of(bytes));
}
private Integer readNextChunk() {
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 98bb9e1..7bdc678 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
@@ -83,11 +84,12 @@ class ReactorUtilsTest {
@Test
void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
AtomicInteger generateElements = new AtomicInteger(0);
- Flux<byte[]> source = Flux.range(0, 10)
+ Flux<ByteBuffer> source = Flux.range(0, 10)
.subscribeOn(Schedulers.elastic())
.limitRate(2)
.doOnRequest(request -> generateElements.getAndAdd((int) request))
- .map(index -> new byte[] {(byte) (int) index});
+ .map(index -> new byte[] {(byte) (int) index})
+ .map(ByteBuffer::wrap);
InputStream inputStream = ReactorUtils.toInputStream(source);
byte[] readBytes = new byte[5];
@@ -101,8 +103,9 @@ class ReactorUtilsTest {
@Test
void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
AtomicInteger generateElements = new AtomicInteger(0);
- Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
+ Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
.subscribeOn(Schedulers.elastic())
+ .map(ByteBuffer::wrap)
.limitRate(2)
.doOnRequest(request -> generateElements.getAndAdd((int) request));
@@ -118,8 +121,9 @@ class ReactorUtilsTest {
@Test
void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
AtomicInteger generateElements = new AtomicInteger(0);
- Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
+ Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
.subscribeOn(Schedulers.elastic())
+ .map(ByteBuffer::wrap)
.limitRate(2)
.doOnRequest(request -> generateElements.getAndAdd((int) request));
@@ -135,8 +139,9 @@ class ReactorUtilsTest {
@Test
void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException {
AtomicInteger generateElements = new AtomicInteger(0);
- Flux<byte[]> source = Flux.<byte[]>empty()
+ Flux<ByteBuffer> source = Flux.<byte[]>empty()
.subscribeOn(Schedulers.elastic())
+ .map(ByteBuffer::wrap)
.limitRate(2)
.doOnRequest(request -> generateElements.getAndAdd((int) request));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org