You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/01 07:31:00 UTC

[james-project] 07/20: JAMES-2838 Reading/writing from non default bucket names

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 37899eb8f350e167ee1e22a39dfd15f630c4a8bb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 25 10:27:15 2019 +0200

    JAMES-2838 Reading/writing from non default bucket names
---
 .../james/blob/cassandra/CassandraBlobStore.java   | 70 +++++++++++++++++-----
 .../blob/cassandra/CassandraBlobStoreTest.java     | 28 ++++++++-
 2 files changed, 83 insertions(+), 15 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index e294acf..931ea38 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -54,13 +54,15 @@ public class CassandraBlobStore implements BlobStore {
     private static final int PREFETCH = 16;
     private static final int MAX_CONCURRENCY = 2;
     private final CassandraDefaultBucketDAO defaultBucketDAO;
+    private final CassandraBucketDAO bucketDAO;
     private final DataChunker dataChunker;
     private final CassandraConfiguration configuration;
     private final HashBlobId.Factory blobIdFactory;
 
     @Inject
-    CassandraBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, CassandraConfiguration cassandraConfiguration, HashBlobId.Factory blobIdFactory) {
+    CassandraBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, CassandraBucketDAO bucketDAO, CassandraConfiguration cassandraConfiguration, HashBlobId.Factory blobIdFactory) {
         this.defaultBucketDAO = defaultBucketDAO;
+        this.bucketDAO = bucketDAO;
         this.configuration = cassandraConfiguration;
         this.blobIdFactory = blobIdFactory;
         this.dataChunker = new DataChunker();
@@ -68,28 +70,31 @@ public class CassandraBlobStore implements BlobStore {
 
     @VisibleForTesting
     public CassandraBlobStore(Session session) {
-        this(new CassandraDefaultBucketDAO(session), CassandraConfiguration.DEFAULT_CONFIGURATION, new HashBlobId.Factory());
+        this(new CassandraDefaultBucketDAO(session),
+            new CassandraBucketDAO(session),
+            CassandraConfiguration.DEFAULT_CONFIGURATION,
+            new HashBlobId.Factory());
     }
 
     @Override
     public Mono<BlobId> save(BucketName bucketName, byte[] data) {
         Preconditions.checkNotNull(data);
 
-        return saveAsMono(data);
+        return saveAsMono(bucketName, data);
     }
 
-    private Mono<BlobId> saveAsMono(byte[] data) {
+    private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] data) {
         BlobId blobId = blobIdFactory.forPayload(data);
-        return saveBlobParts(data, blobId)
-            .flatMap(numberOfChunk -> defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk)
+        return saveBlobParts(bucketName, data, blobId)
+            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk)
                 .then(Mono.just(blobId)));
     }
 
-    private Mono<Integer> saveBlobParts(byte[] data, BlobId blobId) {
+    private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] data, BlobId blobId) {
         Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize());
         return Flux.fromStream(chunks)
             .publishOn(Schedulers.elastic(), PREFETCH)
-            .flatMap(pair -> defaultBucketDAO.writePart(pair.getValue(), blobId, getChunkNum(pair))
+            .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue())
                 .then(Mono.just(getChunkNum(pair))))
             .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
             .flatMap(Mono::justOrEmpty)
@@ -97,6 +102,7 @@ public class CassandraBlobStore implements BlobStore {
             .defaultIfEmpty(0);
     }
 
+
     private int numToCount(int number) {
         return number + 1;
     }
@@ -107,7 +113,7 @@ public class CassandraBlobStore implements BlobStore {
 
     @Override
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
-        return readBlobParts(blobId)
+        return readBlobParts(bucketName, blobId)
             .collectList()
             .map(parts -> Bytes.concat(parts.toArray(new byte[0][])));
     }
@@ -115,7 +121,7 @@ public class CassandraBlobStore implements BlobStore {
     @Override
     public InputStream read(BucketName bucketName, BlobId blobId) {
         PipedInputStream pipedInputStream = new PipedInputStream();
-        readBlobParts(blobId)
+        readBlobParts(bucketName, blobId)
             .subscribe(new PipedStreamSubscriber(pipedInputStream));
         return pipedInputStream;
     }
@@ -125,15 +131,15 @@ public class CassandraBlobStore implements BlobStore {
         return BucketName.DEFAULT;
     }
 
-    private Flux<byte[]> readBlobParts(BlobId blobId) {
-        Integer rowCount = defaultBucketDAO.selectRowCount(blobId)
+    private Flux<byte[]> readBlobParts(BucketName bucketName, BlobId blobId) {
+        Integer rowCount = selectRowCount(bucketName, blobId)
             .publishOn(Schedulers.elastic())
             .switchIfEmpty(Mono.error(
                 new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))))
             .block();
         return Flux.range(0, rowCount)
             .publishOn(Schedulers.elastic(), PREFETCH)
-            .flatMapSequential(partIndex -> defaultBucketDAO.readPart(blobId, partIndex)
+            .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex)
                 .switchIfEmpty(Mono.error(new IllegalStateException(
                     String.format("Missing blob part for blobId %s and position %d", blobId, partIndex))))
                 , MAX_CONCURRENCY, PREFETCH);
@@ -143,7 +149,7 @@ public class CassandraBlobStore implements BlobStore {
     public Mono<BlobId> save(BucketName bucketName, InputStream data) {
         Preconditions.checkNotNull(data);
         return Mono.fromCallable(() -> IOUtils.toByteArray(data))
-            .flatMap(this::saveAsMono);
+            .flatMap(bytes -> saveAsMono(bucketName, bytes));
     }
 
     @Override
@@ -155,4 +161,40 @@ public class CassandraBlobStore implements BlobStore {
     public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
         throw new NotImplementedException("not implemented");
     }
+
+    private Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.readPart(blobId, partIndex);
+        } else {
+            return bucketDAO.readPart(bucketName, blobId, partIndex);
+        }
+    }
+
+    private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.selectRowCount(blobId);
+        } else {
+            return bucketDAO.selectRowCount(bucketName, blobId);
+        }
+    }
+
+    private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer numberOfChunk) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk);
+        } else {
+            return bucketDAO.saveBlobPartsReferences(bucketName, blobId, numberOfChunk);
+        }
+    }
+
+    private Mono<Void> writePart(BucketName bucketName, BlobId blobId, int position, ByteBuffer data) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.writePart(data, blobId, position);
+        } else {
+            return bucketDAO.writePart(data, bucketName, blobId, position);
+        }
+    }
+
+    private boolean isDefaultBucket(BucketName bucketName) {
+        return bucketName.equals(getDefaultBucketName());
+    }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index 7802bba..4829757 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketBlobStoreContract;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.MetricableBlobStore;
 import org.apache.james.blob.api.MetricableBlobStoreContract;
@@ -40,7 +41,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.base.Strings;
 
-public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
+public class CassandraBlobStoreTest implements MetricableBlobStoreContract, BucketBlobStoreContract {
     private static final int CHUNK_SIZE = 10240;
     private static final int MULTIPLE_CHUNK_SIZE = 3;
 
@@ -54,6 +55,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
         testee = new MetricableBlobStore(
             metricsTestExtension.getMetricFactory(),
             new CassandraBlobStore(new CassandraDefaultBucketDAO(cassandra.getConf()),
+                new CassandraBucketDAO(cassandra.getConf()),
                 CassandraConfiguration.builder()
                     .blobPartSize(CHUNK_SIZE)
                     .build(),
@@ -76,6 +78,30 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
 
     }
 
+    @Override
+    @Disabled("Not implemented yet")
+    public void deleteBucketShouldBeIdempotent() {
+
+    }
+
+    @Override
+    @Disabled("Not implemented yet")
+    public void deleteBucketConcurrentlyShouldNotFail() {
+
+    }
+
+    @Override
+    @Disabled("Not implemented yet")
+    public void deleteBucketShouldDeleteExistingBucketWithItsData() {
+
+    }
+
+    @Override
+    @Disabled("Not implemented yet")
+    public void deleteBucketShouldThrowWhenNullBucketName() {
+
+    }
+
     @Test
     void readBytesShouldReturnSplitSavedDataByChunk() {
         String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org