You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/01 07:31:00 UTC
[james-project] 07/20: JAMES-2838 Reading/writing from non default
bucket names
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 37899eb8f350e167ee1e22a39dfd15f630c4a8bb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 25 10:27:15 2019 +0200
JAMES-2838 Reading/writing from non default bucket names
---
.../james/blob/cassandra/CassandraBlobStore.java | 70 +++++++++++++++++-----
.../blob/cassandra/CassandraBlobStoreTest.java | 28 ++++++++-
2 files changed, 83 insertions(+), 15 deletions(-)
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index e294acf..931ea38 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -54,13 +54,15 @@ public class CassandraBlobStore implements BlobStore {
private static final int PREFETCH = 16;
private static final int MAX_CONCURRENCY = 2;
private final CassandraDefaultBucketDAO defaultBucketDAO;
+ private final CassandraBucketDAO bucketDAO;
private final DataChunker dataChunker;
private final CassandraConfiguration configuration;
private final HashBlobId.Factory blobIdFactory;
@Inject
- CassandraBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, CassandraConfiguration cassandraConfiguration, HashBlobId.Factory blobIdFactory) {
+ CassandraBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, CassandraBucketDAO bucketDAO, CassandraConfiguration cassandraConfiguration, HashBlobId.Factory blobIdFactory) {
this.defaultBucketDAO = defaultBucketDAO;
+ this.bucketDAO = bucketDAO;
this.configuration = cassandraConfiguration;
this.blobIdFactory = blobIdFactory;
this.dataChunker = new DataChunker();
@@ -68,28 +70,31 @@ public class CassandraBlobStore implements BlobStore {
@VisibleForTesting
public CassandraBlobStore(Session session) {
- this(new CassandraDefaultBucketDAO(session), CassandraConfiguration.DEFAULT_CONFIGURATION, new HashBlobId.Factory());
+ this(new CassandraDefaultBucketDAO(session),
+ new CassandraBucketDAO(session),
+ CassandraConfiguration.DEFAULT_CONFIGURATION,
+ new HashBlobId.Factory());
}
@Override
public Mono<BlobId> save(BucketName bucketName, byte[] data) {
Preconditions.checkNotNull(data);
- return saveAsMono(data);
+ return saveAsMono(bucketName, data);
}
- private Mono<BlobId> saveAsMono(byte[] data) {
+ private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] data) {
BlobId blobId = blobIdFactory.forPayload(data);
- return saveBlobParts(data, blobId)
- .flatMap(numberOfChunk -> defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk)
+ return saveBlobParts(bucketName, data, blobId)
+ .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk)
.then(Mono.just(blobId)));
}
- private Mono<Integer> saveBlobParts(byte[] data, BlobId blobId) {
+ private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] data, BlobId blobId) {
Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize());
return Flux.fromStream(chunks)
.publishOn(Schedulers.elastic(), PREFETCH)
- .flatMap(pair -> defaultBucketDAO.writePart(pair.getValue(), blobId, getChunkNum(pair))
+ .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue())
.then(Mono.just(getChunkNum(pair))))
.collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
.flatMap(Mono::justOrEmpty)
@@ -97,6 +102,7 @@ public class CassandraBlobStore implements BlobStore {
.defaultIfEmpty(0);
}
+
private int numToCount(int number) {
return number + 1;
}
@@ -107,7 +113,7 @@ public class CassandraBlobStore implements BlobStore {
@Override
public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
- return readBlobParts(blobId)
+ return readBlobParts(bucketName, blobId)
.collectList()
.map(parts -> Bytes.concat(parts.toArray(new byte[0][])));
}
@@ -115,7 +121,7 @@ public class CassandraBlobStore implements BlobStore {
@Override
public InputStream read(BucketName bucketName, BlobId blobId) {
PipedInputStream pipedInputStream = new PipedInputStream();
- readBlobParts(blobId)
+ readBlobParts(bucketName, blobId)
.subscribe(new PipedStreamSubscriber(pipedInputStream));
return pipedInputStream;
}
@@ -125,15 +131,15 @@ public class CassandraBlobStore implements BlobStore {
return BucketName.DEFAULT;
}
- private Flux<byte[]> readBlobParts(BlobId blobId) {
- Integer rowCount = defaultBucketDAO.selectRowCount(blobId)
+ private Flux<byte[]> readBlobParts(BucketName bucketName, BlobId blobId) {
+ Integer rowCount = selectRowCount(bucketName, blobId)
.publishOn(Schedulers.elastic())
.switchIfEmpty(Mono.error(
new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))))
.block();
return Flux.range(0, rowCount)
.publishOn(Schedulers.elastic(), PREFETCH)
- .flatMapSequential(partIndex -> defaultBucketDAO.readPart(blobId, partIndex)
+ .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex)
.switchIfEmpty(Mono.error(new IllegalStateException(
String.format("Missing blob part for blobId %s and position %d", blobId, partIndex))))
, MAX_CONCURRENCY, PREFETCH);
@@ -143,7 +149,7 @@ public class CassandraBlobStore implements BlobStore {
public Mono<BlobId> save(BucketName bucketName, InputStream data) {
Preconditions.checkNotNull(data);
return Mono.fromCallable(() -> IOUtils.toByteArray(data))
- .flatMap(this::saveAsMono);
+ .flatMap(bytes -> saveAsMono(bucketName, bytes));
}
@Override
@@ -155,4 +161,40 @@ public class CassandraBlobStore implements BlobStore {
public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
throw new NotImplementedException("not implemented");
}
+
+ private Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
+ if (isDefaultBucket(bucketName)) {
+ return defaultBucketDAO.readPart(blobId, partIndex);
+ } else {
+ return bucketDAO.readPart(bucketName, blobId, partIndex);
+ }
+ }
+
+ private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
+ if (isDefaultBucket(bucketName)) {
+ return defaultBucketDAO.selectRowCount(blobId);
+ } else {
+ return bucketDAO.selectRowCount(bucketName, blobId);
+ }
+ }
+
+ private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer numberOfChunk) {
+ if (isDefaultBucket(bucketName)) {
+ return defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk);
+ } else {
+ return bucketDAO.saveBlobPartsReferences(bucketName, blobId, numberOfChunk);
+ }
+ }
+
+ private Mono<Void> writePart(BucketName bucketName, BlobId blobId, int position, ByteBuffer data) {
+ if (isDefaultBucket(bucketName)) {
+ return defaultBucketDAO.writePart(data, blobId, position);
+ } else {
+ return bucketDAO.writePart(data, bucketName, blobId, position);
+ }
+ }
+
+ private boolean isDefaultBucket(BucketName bucketName) {
+ return bucketName.equals(getDefaultBucketName());
+ }
}
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index 7802bba..4829757 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketBlobStoreContract;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.MetricableBlobStore;
import org.apache.james.blob.api.MetricableBlobStoreContract;
@@ -40,7 +41,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import com.google.common.base.Strings;
-public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
+public class CassandraBlobStoreTest implements MetricableBlobStoreContract, BucketBlobStoreContract {
private static final int CHUNK_SIZE = 10240;
private static final int MULTIPLE_CHUNK_SIZE = 3;
@@ -54,6 +55,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
testee = new MetricableBlobStore(
metricsTestExtension.getMetricFactory(),
new CassandraBlobStore(new CassandraDefaultBucketDAO(cassandra.getConf()),
+ new CassandraBucketDAO(cassandra.getConf()),
CassandraConfiguration.builder()
.blobPartSize(CHUNK_SIZE)
.build(),
@@ -76,6 +78,30 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
}
+ @Override
+ @Disabled("Not implemented yet")
+ public void deleteBucketShouldBeIdempotent() {
+
+ }
+
+ @Override
+ @Disabled("Not implemented yet")
+ public void deleteBucketConcurrentlyShouldNotFail() {
+
+ }
+
+ @Override
+ @Disabled("Not implemented yet")
+ public void deleteBucketShouldDeleteExistingBucketWithItsData() {
+
+ }
+
+ @Override
+ @Disabled("Not implemented yet")
+ public void deleteBucketShouldThrowWhenNullBucketName() {
+
+ }
+
@Test
void readBytesShouldReturnSplitSavedDataByChunk() {
String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org