You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/02/11 16:19:18 UTC

[james-project] branch master updated (9d6f88b -> 63363dc)

This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 9d6f88b  JAMES-3028 avoid OOM when running MemoryDumbBlobStoreTest
     new beebeed  JAMES-3028 Implement DataChuncker for InputStream
     new 5dc55b5  JAMES-3028 read that returns InputStream is lazy, we must consume it to trigger the exception
     new 625fec0  JAMES-3028 Dumb implementation for Cassandra BlobStore
     new 1c91118  JAMES-3028 rewrite CassandraBlobStore using CassandraDumpBlobStore
     new 05326ae  JAMES-3028 bind CassandraDumbBlobStore to make the product to work
     new 63363dc  Merge remote-tracking branch 'mine/split-blobstore-cassandra'

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mail/CassandraAttachmentFallbackTest.java      |   2 +-
 .../cassandra/mail/CassandraMessageDAOTest.java    |   2 +-
 .../migration/AttachmentMessageIdCreationTest.java |   2 +-
 .../mail/migration/AttachmentV2MigrationTest.java  |   2 +-
 .../mailbox/cassandra/mail/utils/GuiceUtils.java   |  11 +-
 .../blob/api/BucketDumbBlobStoreContract.java      |   4 +-
 .../blob/api/DeleteDumbBlobStoreContract.java      |   2 +-
 .../blob/api/ReadSaveDumbBlobStoreContract.java    |   2 +-
 .../james/blob/cassandra/CassandraBlobStore.java   | 195 ++++++---------------
 ...aBlobStore.java => CassandraDumbBlobStore.java} | 188 +++++++++-----------
 .../james/blob/cassandra/utils/DataChunker.java    |  49 ++++--
 .../blob/cassandra/CassandraBlobStoreTest.java     |  15 +-
 .../blob/cassandra/CassandraDumbBlobStoreTest.java |  58 +++---
 .../blob/cassandra/utils/DataChunkerTest.java      | 181 +++++++++++++------
 .../modules/mailbox/CassandraBlobStoreModule.java  |   7 +
 .../cassandra/CassandraMailRepositoryTest.java     |   2 +-
 ...aMailRepositoryWithFakeImplementationsTest.java |   4 +-
 .../RabbitMQMailQueueConfigurationChangeTest.java  |   2 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |   2 +-
 19 files changed, 367 insertions(+), 363 deletions(-)
 copy server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/{CassandraBlobStore.java => CassandraDumbBlobStore.java} (58%)
 copy backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java => server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java (56%)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/06: JAMES-3028 Dumb implementation for Cassandra BlobStore

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 625fec0a4af1fcde0e8429f78cd9e1a71a73030f
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Thu Jan 9 17:47:51 2020 +0100

    JAMES-3028 Dumb implementation for Cassandra BlobStore
---
 .../blob/cassandra/CassandraDumbBlobStore.java     | 208 +++++++++++++++++++++
 .../blob/cassandra/CassandraDumbBlobStoreTest.java |  62 ++++++
 2 files changed, 270 insertions(+)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
new file mode 100644
index 0000000..1ac59f6
--- /dev/null
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -0,0 +1,208 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.cassandra;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.DumbBlobStore;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.blob.cassandra.utils.DataChunker;
+import org.apache.james.util.ReactorUtils;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class CassandraDumbBlobStore implements DumbBlobStore {
+
+    public static final String DEFAULT_BUCKET = "cassandraDefault";
+    public static final boolean LAZY = false;
+
+    private final CassandraDefaultBucketDAO defaultBucketDAO;
+    private final CassandraBucketDAO bucketDAO;
+    private final DataChunker dataChunker;
+    private final CassandraConfiguration configuration;
+    private final HashBlobId.Factory blobIdFactory;
+    private final BucketName defaultBucket;
+
+    @Inject
+    CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
+                           CassandraBucketDAO bucketDAO,
+                           CassandraConfiguration cassandraConfiguration,
+                           HashBlobId.Factory blobIdFactory,
+                           BucketName defaultBucket) {
+        this.defaultBucketDAO = defaultBucketDAO;
+        this.bucketDAO = bucketDAO;
+        this.configuration = cassandraConfiguration;
+        this.blobIdFactory = blobIdFactory;
+        this.defaultBucket = defaultBucket;
+        this.dataChunker = new DataChunker();
+    }
+
+    @Override
+    public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
+        return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId));
+    }
+
+    @Override
+    public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+        return readBlobParts(bucketName, blobId)
+            .collectList()
+            .map(this::byteBuffersToBytesArray);
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+        Preconditions.checkNotNull(data);
+
+        return Mono.fromCallable(() -> dataChunker.chunk(data, configuration.getBlobPartSize()))
+            .flatMap(chunks -> save(bucketName, blobId, chunks));
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
+        Preconditions.checkNotNull(bucketName);
+        Preconditions.checkNotNull(inputStream);
+
+        return Mono.fromCallable(() -> dataChunker.chunkStream(inputStream, configuration.getBlobPartSize()))
+            .flatMap(chunks -> save(bucketName, blobId, chunks))
+            .onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving input stream", e));
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
+        return Mono.using(content::openBufferedStream,
+            stream -> save(bucketName, blobId, stream),
+            Throwing.consumer(InputStream::close).sneakyThrow(),
+            LAZY);
+    }
+
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) {
+        return saveBlobParts(bucketName, blobId, chunksAsFlux)
+            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk));
+    }
+
+    private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) {
+        return chunksAsFlux
+            .index()
+            .concatMap(pair -> writePart(bucketName, blobId, pair.getT1().intValue(), pair.getT2()))
+            .count()
+            .map(Long::intValue);
+    }
+
+    private Mono<?> writePart(BucketName bucketName, BlobId blobId, int position, ByteBuffer data) {
+        Mono<?> write;
+        if (isDefaultBucket(bucketName)) {
+            write = defaultBucketDAO.writePart(data, blobId, position);
+        } else {
+            write = bucketDAO.writePart(data, bucketName, blobId, position);
+        }
+        int anyNonEmptyValue = 1;
+        return write.thenReturn(anyNonEmptyValue);
+    }
+
+    private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer numberOfChunk) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk);
+        } else {
+            return bucketDAO.saveBlobPartsReferences(bucketName, blobId, numberOfChunk);
+        }
+    }
+
+    private boolean isDefaultBucket(BucketName bucketName) {
+        return bucketName.equals(defaultBucket);
+    }
+
+    @Override
+    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.deletePosition(blobId)
+                .then(defaultBucketDAO.deleteParts(blobId));
+        } else {
+            return bucketDAO.deletePosition(bucketName, blobId)
+                .then(bucketDAO.deleteParts(bucketName, blobId));
+        }
+    }
+
+    @Override
+    public Mono<Void> deleteBucket(BucketName bucketName) {
+        Preconditions.checkNotNull(bucketName);
+        Preconditions.checkArgument(!isDefaultBucket(bucketName), "Deleting the default bucket is forbidden");
+
+        return bucketDAO.listAll()
+            .filter(bucketNameBlobIdPair -> bucketNameBlobIdPair.getKey().equals(bucketName))
+            .map(Pair::getValue)
+            .flatMap(blobId -> delete(bucketName, blobId))
+            .then();
+    }
+
+    private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.readPart(blobId, partIndex);
+        } else {
+            return bucketDAO.readPart(bucketName, blobId, partIndex);
+        }
+    }
+
+    private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.selectRowCount(blobId);
+        } else {
+            return bucketDAO.selectRowCount(bucketName, blobId);
+        }
+    }
+
+    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
+        return selectRowCount(bucketName, blobId)
+            .single()
+            .onErrorMap(NoSuchElementException.class, e ->
+                new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)))
+            .flatMapMany(rowCount -> Flux.range(0, rowCount)
+                .concatMap(partIndex -> readPart(bucketName, blobId, partIndex)
+                    .single()
+                    .onErrorMap(NoSuchElementException.class, e ->
+                        new ObjectNotFoundException(String.format("Missing blob part for blobId %s and position %d", blobId.asString(), partIndex)))));
+    }
+
+    private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) {
+        int targetSize = byteBuffers
+            .stream()
+            .mapToInt(ByteBuffer::remaining)
+            .sum();
+
+        return byteBuffers
+            .stream()
+            .reduce(ByteBuffer.allocate(targetSize), ByteBuffer::put)
+            .array();
+    }
+}
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
new file mode 100644
index 0000000..0e81087
--- /dev/null
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
@@ -0,0 +1,62 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.cassandra;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.DumbBlobStore;
+import org.apache.james.blob.api.DumbBlobStoreContract;
+import org.apache.james.blob.api.HashBlobId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class CassandraDumbBlobStoreTest implements DumbBlobStoreContract {
+    private static final int CHUNK_SIZE = 10240;
+    private static final int MULTIPLE_CHUNK_SIZE = 3;
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraBlobModule.MODULE);
+
+    private DumbBlobStore testee;
+    private CassandraDefaultBucketDAO defaultBucketDAO;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+        CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf());
+        defaultBucketDAO = new CassandraDefaultBucketDAO(cassandra.getConf());
+        testee = new CassandraDumbBlobStore(
+            defaultBucketDAO,
+                bucketDAO,
+                CassandraConfiguration.builder()
+                    .blobPartSize(CHUNK_SIZE)
+                    .build(),
+                blobIdFactory,
+            BucketName.DEFAULT);
+    }
+
+    @Override
+    public DumbBlobStore testee() {
+        return testee;
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/06: JAMES-3028 bind CassandraDumbBlobStore to make the product to work

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 05326ae6d651d7a89386689cdab4fa3341000608
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Jan 13 14:25:11 2020 +0100

    JAMES-3028 bind CassandraDumbBlobStore to make the product to work
---
 .../apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java | 11 ++++++++++-
 .../org/apache/james/blob/cassandra/CassandraBlobStore.java   |  6 +++++-
 .../apache/james/blob/cassandra/CassandraDumbBlobStore.java   |  3 ++-
 .../james/modules/mailbox/CassandraBlobStoreModule.java       |  7 +++++++
 4 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
index 889b032..e3218c3 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
@@ -19,19 +19,24 @@
 
 package org.apache.james.mailbox.cassandra.mail.utils;
 
+import static com.google.inject.Scopes.SINGLETON;
+
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.cassandra.CassandraBlobStore;
+import org.apache.james.blob.cassandra.CassandraDumbBlobStore;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.model.MessageId;
 
 import com.datastax.driver.core.Session;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.name.Names;
 import com.google.inject.util.Modules;
 
 public class GuiceUtils {
@@ -49,7 +54,11 @@ public class GuiceUtils {
             Modules.combine(
                 binder -> binder.bind(MessageId.Factory.class).toInstance(messageIdFactory),
                 binder -> binder.bind(BlobId.Factory.class).toInstance(new HashBlobId.Factory()),
-                binder -> binder.bind(BlobStore.class).to(CassandraBlobStore.class),
+                binder -> binder.bind(BlobStore.class).to(CassandraBlobStore.class).in(SINGLETON),
+                binder -> binder.bind(CassandraDumbBlobStore.class).in(SINGLETON),
+                binder -> binder.bind(BucketName.class)
+                    .annotatedWith(Names.named(CassandraDumbBlobStore.DEFAULT_BUCKET))
+                    .toInstance(BucketName.DEFAULT),
                 binder -> binder.bind(Session.class).toInstance(session),
                 binder -> binder.bind(CassandraTypesProvider.class).toInstance(typesProvider),
                 binder -> binder.bind(CassandraConfiguration.class).toInstance(configuration)));
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 4a7d876..cccee34 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.cassandra;
 import java.io.InputStream;
 
 import javax.inject.Inject;
+import javax.inject.Named;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
@@ -50,7 +51,10 @@ public class CassandraBlobStore implements BlobStore {
     private final CassandraDumbBlobStore dumbBlobStore;
 
     @Inject
-    CassandraBlobStore(HashBlobId.Factory blobIdFactory, BucketName defaultBucketName, CassandraDumbBlobStore dumbBlobStore) {
+    CassandraBlobStore(HashBlobId.Factory blobIdFactory,
+                       @Named(CassandraDumbBlobStore.DEFAULT_BUCKET) BucketName defaultBucketName,
+                       CassandraDumbBlobStore dumbBlobStore) {
+
         this.blobIdFactory = blobIdFactory;
         this.defaultBucketName = defaultBucketName;
         this.dumbBlobStore = dumbBlobStore;
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
index 581c4f1..94e2bde 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 import javax.inject.Inject;
+import javax.inject.Named;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
@@ -58,7 +59,7 @@ public class CassandraDumbBlobStore implements DumbBlobStore {
     CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
                            CassandraBucketDAO bucketDAO,
                            CassandraConfiguration cassandraConfiguration,
-                           BucketName defaultBucket) {
+                           @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
         this.defaultBucketDAO = defaultBucketDAO;
         this.bucketDAO = bucketDAO;
         this.configuration = cassandraConfiguration;
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraBlobStoreModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraBlobStoreModule.java
index 8c1d3c5..97f5eb6 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraBlobStoreModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraBlobStoreModule.java
@@ -21,10 +21,12 @@ package org.apache.james.modules.mailbox;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.MetricableBlobStore;
 import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobStore;
 import org.apache.james.blob.cassandra.CassandraDefaultBucketDAO;
+import org.apache.james.blob.cassandra.CassandraDumbBlobStore;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
@@ -36,6 +38,11 @@ public class CassandraBlobStoreModule extends AbstractModule {
     protected void configure() {
         bind(CassandraDefaultBucketDAO.class).in(Scopes.SINGLETON);
         bind(CassandraBlobStore.class).in(Scopes.SINGLETON);
+        bind(CassandraDumbBlobStore.class).in(Scopes.SINGLETON);
+
+        bind(BucketName.class)
+            .annotatedWith(Names.named(CassandraDumbBlobStore.DEFAULT_BUCKET))
+            .toInstance(BucketName.DEFAULT);
 
         bind(BlobStore.class)
             .annotatedWith(Names.named(MetricableBlobStore.BLOB_STORE_IMPLEMENTATION))


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/06: JAMES-3028 read that returns InputStream is lazy, we must consume it to trigger the exception

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5dc55b54bc6b3f1e088785689b7c0040edf6539d
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jan 16 10:56:13 2020 +0100

    JAMES-3028 read that returns InputStream is lazy, we must consume it to trigger the exception
---
 .../java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java   | 4 ++--
 .../java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java   | 2 +-
 .../java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
index cbf8543..2a7fbae 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
@@ -54,7 +54,7 @@ public interface BucketDumbBlobStoreContract {
         store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
         store.deleteBucket(TEST_BUCKET_NAME).block();
 
-        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID))
+        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read())
             .isInstanceOf(ObjectNotFoundException.class);
     }
 
@@ -116,7 +116,7 @@ public interface BucketDumbBlobStoreContract {
         DumbBlobStore store = testee();
 
         store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
-        assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID))
+        assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).read())
             .isInstanceOf(ObjectNotFoundException.class);
     }
 
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
index a391b90..5f1b0fa 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
@@ -70,7 +70,7 @@ public interface DeleteDumbBlobStoreContract  {
         store.save(TEST_BUCKET_NAME, TEST_BLOB_ID,  SHORT_BYTEARRAY).block();
         store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
 
-        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID))
+        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read())
             .isInstanceOf(ObjectStoreException.class);
     }
 
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
index 820bd64..833dafd 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
@@ -172,7 +172,7 @@ public interface ReadSaveDumbBlobStoreContract {
     default void readStreamShouldThrowWhenNotExisting() {
         DumbBlobStore store = testee();
 
-        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown")))
+        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown")).read())
             .isInstanceOf(ObjectNotFoundException.class);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/06: JAMES-3028 Implement DataChuncker for InputStream

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit beebeedf70d12343de673ac804ed3fb8499d7d99
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Jan 10 15:11:47 2020 +0100

    JAMES-3028 Implement DataChuncker for InputStream
---
 .../james/blob/cassandra/CassandraBlobStore.java   |  23 ++-
 .../james/blob/cassandra/utils/DataChunker.java    |  49 ++++--
 .../blob/cassandra/utils/DataChunkerTest.java      | 181 ++++++++++++++-------
 3 files changed, 173 insertions(+), 80 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 854bf44..5976354 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -25,7 +25,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -43,10 +42,10 @@ import org.apache.james.util.ReactorUtils;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple2;
 
 public class CassandraBlobStore implements BlobStore {
 
@@ -78,22 +77,22 @@ public class CassandraBlobStore implements BlobStore {
     @Override
     public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) {
         Preconditions.checkNotNull(data);
-
         return saveAsMono(bucketName, data);
     }
 
     private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] data) {
         BlobId blobId = blobIdFactory.forPayload(data);
-        return saveBlobParts(bucketName, data, blobId)
-            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk)
-                .then(Mono.just(blobId)));
+        return Mono.fromCallable(() -> dataChunker.chunk(data, configuration.getBlobPartSize()))
+            .flatMap(chunks -> saveBlobParts(bucketName, blobId, chunks))
+            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk))
+            .thenReturn(blobId);
     }
 
-    private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] data, BlobId blobId) {
-        Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize());
-        return Flux.fromStream(chunks)
+    private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) {
+        return chunksAsFlux
             .publishOn(Schedulers.elastic(), PREFETCH)
-            .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue())
+            .index()
+            .flatMap(pair -> writePart(bucketName, blobId, pair.getT1().intValue(), pair.getT2())
                 .then(Mono.just(getChunkNum(pair))))
             .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
             .<Integer>handle((t, sink) -> t.ifPresent(sink::next))
@@ -106,8 +105,8 @@ public class CassandraBlobStore implements BlobStore {
         return number + 1;
     }
 
-    private Integer getChunkNum(Pair<Integer, ByteBuffer> pair) {
-        return pair.getKey();
+    private Integer getChunkNum(Tuple2<Long, ByteBuffer> pair) {
+        return pair.getT1().intValue();
     }
 
     @Override
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java
index aad71dc..1c2c395 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/DataChunker.java
@@ -19,34 +19,59 @@
 
 package org.apache.james.blob.cassandra.utils;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import org.apache.commons.lang3.tuple.Pair;
 
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class DataChunker {
 
-    public Stream<Pair<Integer, ByteBuffer>> chunk(byte[] data, int chunkSize) {
+    private static final String CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE = "ChunkSize must be strictly positive";
+
+    public Flux<ByteBuffer> chunk(byte[] data, int chunkSize) {
         Preconditions.checkNotNull(data);
-        Preconditions.checkArgument(chunkSize > 0, "ChunkSize can not be negative");
+        Preconditions.checkArgument(chunkSize > 0, CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE);
 
         int size = data.length;
         int fullChunkCount = size / chunkSize;
 
-        return Stream.concat(
-            IntStream.range(0, fullChunkCount)
-                .mapToObj(i -> Pair.of(i, ByteBuffer.wrap(data, i * chunkSize, chunkSize))),
+        return Flux.concat(
+            Flux.range(0, fullChunkCount)
+                .map(i -> ByteBuffer.wrap(data, i * chunkSize, chunkSize)),
             lastChunk(data, chunkSize * fullChunkCount, fullChunkCount));
     }
 
-    private Stream<Pair<Integer, ByteBuffer>> lastChunk(byte[] data, int offset, int index) {
+    private Mono<ByteBuffer> lastChunk(byte[] data, int offset, int index) {
         if (offset == data.length && index > 0) {
-            return Stream.empty();
+            return Mono.empty();
         }
-        return Stream.of(Pair.of(index, ByteBuffer.wrap(data, offset, data.length - offset)));
+        return Mono.just(ByteBuffer.wrap(data, offset, data.length - offset));
     }
 
+    public Flux<ByteBuffer> chunkStream(InputStream data, int chunkSize) {
+        Preconditions.checkNotNull(data);
+        Preconditions.checkArgument(chunkSize > 0, CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE);
+        BufferedInputStream bufferedInputStream = new BufferedInputStream(data);
+        return Flux
+            .<ByteBuffer>generate(sink -> {
+                try {
+                    byte[] buffer = new byte[chunkSize];
+
+                    int size = bufferedInputStream.read(buffer);
+                    if (size <= 0) {
+                        sink.complete();
+                    } else {
+                        sink.next(ByteBuffer.wrap(buffer, 0, size));
+                    }
+                } catch (IOException e) {
+                    sink.error(e);
+                }
+            })
+            .defaultIfEmpty(ByteBuffer.wrap(new byte[0]));
+    }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java
index 0f97a0d..a645b0b 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/utils/DataChunkerTest.java
@@ -22,16 +22,20 @@ package org.apache.james.blob.cassandra.utils;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.io.ByteArrayInputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.Assumptions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
+import reactor.core.publisher.Flux;
 
 public class DataChunkerTest {
 
@@ -44,67 +48,132 @@ public class DataChunkerTest {
         testee = new DataChunker();
     }
 
-    @Test
-    public void chunkShouldThrowOnNullData() {
-        assertThatThrownBy(() -> testee.chunk(null, CHUNK_SIZE))
-            .isInstanceOf(NullPointerException.class);
-    }
-
-    @Test
-    public void chunkShouldThrowOnNegativeChunkSize() {
-        int chunkSize = -1;
-        assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
-            .isInstanceOf(IllegalArgumentException.class);
-    }
+    @Nested
+    public class ByteArray {
+
+        @Test
+        public void chunkShouldThrowOnNullData() {
+            assertThatThrownBy(() -> testee.chunk(null, CHUNK_SIZE))
+                .isInstanceOf(NullPointerException.class);
+        }
+
+        @Test
+        public void chunkShouldThrowOnNegativeChunkSize() {
+            int chunkSize = -1;
+            assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        public void chunkShouldThrowOnZeroChunkSize() {
+            int chunkSize = 0;
+            assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
+            Flux<ByteBuffer> chunks = testee.chunk(new byte[0], CHUNK_SIZE);
+            ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
+            assertThat(chunks.toStream()).containsExactly(emptyBuffer);
+        }
+
+        @Test
+        public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
+            byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
+            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
+
+            assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
+        }
+
+        @Test
+        public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
+            byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
+            Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
+
+            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
+
+            assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
+        }
+
+        @Test
+        public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
+            byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
+            byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
+            Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
+            byte[] data = Bytes.concat(part1, part2);
+
+            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
+
+            assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(part1), ByteBuffer.wrap(part2));
+        }
 
-    @Test
-    public void chunkShouldThrowOnZeroChunkSize() {
-        int chunkSize = 0;
-        assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
-            .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test
-    public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
-        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(new byte[0], CHUNK_SIZE);
-        ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
-        assertThat(chunks)
-            .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, emptyBuffer)));
-    }
-
-    @Test
-    public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
-        byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
-
-        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
-
-        assertThat(chunks)
-            .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, ByteBuffer.wrap(data))));
+    @Nested
+    public class InputStream {
+
+        @Test
+        public void chunkShouldThrowOnNullData() {
+            assertThatThrownBy(() -> testee.chunkStream(null, CHUNK_SIZE))
+                .isInstanceOf(NullPointerException.class);
+        }
+
+        @Test
+        public void chunkShouldThrowOnNegativeChunkSize() {
+            int chunkSize = -1;
+            assertThatThrownBy(() -> testee.chunkStream(new ByteArrayInputStream(new byte[0]), chunkSize))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        public void chunkShouldThrowOnZeroChunkSize() {
+            int chunkSize = 0;
+            assertThatThrownBy(() -> testee.chunkStream(new ByteArrayInputStream(new byte[0]), chunkSize))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
+            Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(new byte[0]), CHUNK_SIZE);
+            assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(new byte[0]);
+        }
+
+        @Test
+        public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
+            byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
+            Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
+
+            assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data);
+        }
+
+        @Test
+        public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
+            byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
+            Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
+
+            Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
+
+            assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data);
+        }
+
+        @Test
+        public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
+            byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
+            byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
+            Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
+            byte[] data = Bytes.concat(part1, part2);
+
+            Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
+
+            assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(part1, part2);
+        }
     }
 
-    @Test
-    public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
-        byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
-        assertThat(data.length).isEqualTo(CHUNK_SIZE);
-
-        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
-
-        assertThat(chunks)
-            .containsOnlyElementsOf(ImmutableList.of(Pair.of(0, ByteBuffer.wrap(data))));
-    }
-
-    @Test
-    public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
-        byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
-        byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
-        byte[] data = Bytes.concat(part1, part2);
-
-        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
-
-        assertThat(chunks)
-        .containsOnlyElementsOf(ImmutableList.of(
-                Pair.of(0, ByteBuffer.wrap(part1)),
-                Pair.of(1, ByteBuffer.wrap(part2))));
+    static byte[] read(ByteBuffer buffer) {
+        byte[] arr = new byte[buffer.remaining()];
+        buffer.get(arr);
+        return arr;
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/06: Merge remote-tracking branch 'mine/split-blobstore-cassandra'

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 63363dc9c994690a978bfaa689965cb52b455539
Merge: 9d6f88b 05326ae
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Feb 11 17:18:39 2020 +0100

    Merge remote-tracking branch 'mine/split-blobstore-cassandra'

 .../mail/CassandraAttachmentFallbackTest.java      |   2 +-
 .../cassandra/mail/CassandraMessageDAOTest.java    |   2 +-
 .../migration/AttachmentMessageIdCreationTest.java |   2 +-
 .../mail/migration/AttachmentV2MigrationTest.java  |   2 +-
 .../mailbox/cassandra/mail/utils/GuiceUtils.java   |  11 +-
 .../blob/api/BucketDumbBlobStoreContract.java      |   4 +-
 .../blob/api/DeleteDumbBlobStoreContract.java      |   2 +-
 .../blob/api/ReadSaveDumbBlobStoreContract.java    |   2 +-
 .../james/blob/cassandra/CassandraBlobStore.java   | 195 ++++++---------------
 ...aBlobStore.java => CassandraDumbBlobStore.java} | 188 +++++++++-----------
 .../james/blob/cassandra/utils/DataChunker.java    |  49 ++++--
 .../blob/cassandra/CassandraBlobStoreTest.java     |  15 +-
 .../blob/cassandra/CassandraDumbBlobStoreTest.java |  61 +++++++
 .../blob/cassandra/utils/DataChunkerTest.java      | 181 +++++++++++++------
 .../modules/mailbox/CassandraBlobStoreModule.java  |   7 +
 .../cassandra/CassandraMailRepositoryTest.java     |   2 +-
 ...aMailRepositoryWithFakeImplementationsTest.java |   4 +-
 .../RabbitMQMailQueueConfigurationChangeTest.java  |   2 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |   2 +-
 19 files changed, 402 insertions(+), 331 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/06: JAMES-3028 rewrite CassandraBlobStore using CassandraDumpBlobStore

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1c91118d92d8ae677aa301ec90af28f777146e3b
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Jan 10 17:44:55 2020 +0100

    JAMES-3028 rewrite CassandraBlobStore using CassandraDumpBlobStore
---
 .../mail/CassandraAttachmentFallbackTest.java      |   2 +-
 .../cassandra/mail/CassandraMessageDAOTest.java    |   2 +-
 .../migration/AttachmentMessageIdCreationTest.java |   2 +-
 .../mail/migration/AttachmentV2MigrationTest.java  |   2 +-
 .../james/blob/cassandra/CassandraBlobStore.java   | 192 ++++++---------------
 .../blob/cassandra/CassandraDumbBlobStore.java     |   3 -
 .../blob/cassandra/CassandraBlobStoreTest.java     |  15 +-
 .../blob/cassandra/CassandraDumbBlobStoreTest.java |   1 -
 .../cassandra/CassandraMailRepositoryTest.java     |   2 +-
 ...aMailRepositoryWithFakeImplementationsTest.java |   4 +-
 .../RabbitMQMailQueueConfigurationChangeTest.java  |   2 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |   2 +-
 12 files changed, 69 insertions(+), 160 deletions(-)

diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
index 4cf6df1..98aacf0 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
@@ -68,7 +68,7 @@ class CassandraAttachmentFallbackTest {
         attachmentDAOV2 = new CassandraAttachmentDAOV2(BLOB_ID_FACTORY, cassandra.getConf());
         attachmentDAO = new CassandraAttachmentDAO(cassandra.getConf(),
             CassandraConfiguration.DEFAULT_CONFIGURATION);
-        blobStore = new CassandraBlobStore(cassandra.getConf());
+        blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
         attachmentMessageIdDAO = new CassandraAttachmentMessageIdDAO(cassandra.getConf(), new CassandraMessageId.Factory());
         CassandraAttachmentOwnerDAO ownerDAO = new CassandraAttachmentOwnerDAO(cassandra.getConf());
         attachmentMapper = new CassandraAttachmentMapper(attachmentDAO, attachmentDAOV2, blobStore, attachmentMessageIdDAO, ownerDAO);
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index 9581a15..bec5100 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -93,7 +93,7 @@ class CassandraMessageDAOTest {
     void setUp(CassandraCluster cassandra) {
         messageIdFactory = new CassandraMessageId.Factory();
         messageId = messageIdFactory.generate();
-        CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+        CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         testee = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobStore, blobIdFactory,
             new CassandraMessageId.Factory());
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
index 7317a97..531c719 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
@@ -86,7 +86,7 @@ class AttachmentMessageIdCreationTest {
     void setUp(CassandraCluster cassandra) {
         CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
 
-        blobStore = new CassandraBlobStore(cassandra.getConf());
+        blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
         cassandraMessageDAO = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(),
             blobStore, new HashBlobId.Factory(), messageIdFactory);
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
index 1ddaf1b..4e40a47 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -72,7 +72,7 @@ class AttachmentV2MigrationTest {
         attachmentDAO = new CassandraAttachmentDAO(cassandra.getConf(),
             CassandraConfiguration.DEFAULT_CONFIGURATION);
         attachmentDAOV2 = new CassandraAttachmentDAOV2(BLOB_ID_FACTORY, cassandra.getConf());
-        blobsStore = new CassandraBlobStore(cassandra.getConf());
+        blobsStore = CassandraBlobStore.forTesting(cassandra.getConf());
         migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, blobsStore);
 
         attachment1 = Attachment.builder()
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 5976354..4a7d876 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -20,202 +20,114 @@
 package org.apache.james.blob.cassandra;
 
 import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
-import org.apache.james.blob.api.ObjectNotFoundException;
-import org.apache.james.blob.cassandra.utils.DataChunker;
-import org.apache.james.util.ReactorUtils;
 
 import com.datastax.driver.core.Session;
+import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import reactor.core.publisher.Flux;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.HashingInputStream;
+import com.google.common.io.FileBackedOutputStream;
+
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 public class CassandraBlobStore implements BlobStore {
 
-    private static final int PREFETCH = 16;
-    private static final int MAX_CONCURRENCY = 1;
-    private final CassandraDefaultBucketDAO defaultBucketDAO;
-    private final CassandraBucketDAO bucketDAO;
-    private final DataChunker dataChunker;
-    private final CassandraConfiguration configuration;
+    public static final boolean LAZY_RESSOURCE_CLEANUP = false;
+    public static final int FILE_THRESHOLD = 10000;
     private final HashBlobId.Factory blobIdFactory;
+    private final BucketName defaultBucketName;
+    private final CassandraDumbBlobStore dumbBlobStore;
 
     @Inject
-    CassandraBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, CassandraBucketDAO bucketDAO, CassandraConfiguration cassandraConfiguration, HashBlobId.Factory blobIdFactory) {
-        this.defaultBucketDAO = defaultBucketDAO;
-        this.bucketDAO = bucketDAO;
-        this.configuration = cassandraConfiguration;
+    CassandraBlobStore(HashBlobId.Factory blobIdFactory, BucketName defaultBucketName, CassandraDumbBlobStore dumbBlobStore) {
         this.blobIdFactory = blobIdFactory;
-        this.dataChunker = new DataChunker();
+        this.defaultBucketName = defaultBucketName;
+        this.dumbBlobStore = dumbBlobStore;
     }
 
     @VisibleForTesting
-    public CassandraBlobStore(Session session) {
-        this(new CassandraDefaultBucketDAO(session),
-            new CassandraBucketDAO(new HashBlobId.Factory(), session),
-            CassandraConfiguration.DEFAULT_CONFIGURATION,
-            new HashBlobId.Factory());
+    public static CassandraBlobStore forTesting(Session session) {
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+        CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, session);
+        CassandraDefaultBucketDAO defaultBucketDAO = new CassandraDefaultBucketDAO(session);
+        return new CassandraBlobStore(
+            blobIdFactory,
+            BucketName.DEFAULT,
+            new CassandraDumbBlobStore(defaultBucketDAO, bucketDAO, CassandraConfiguration.DEFAULT_CONFIGURATION, BucketName.DEFAULT));
     }
 
     @Override
     public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) {
+        Preconditions.checkNotNull(bucketName);
         Preconditions.checkNotNull(data);
-        return saveAsMono(bucketName, data);
-    }
 
-    private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] data) {
         BlobId blobId = blobIdFactory.forPayload(data);
-        return Mono.fromCallable(() -> dataChunker.chunk(data, configuration.getBlobPartSize()))
-            .flatMap(chunks -> saveBlobParts(bucketName, blobId, chunks))
-            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk))
-            .thenReturn(blobId);
-    }
 
-    private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) {
-        return chunksAsFlux
-            .publishOn(Schedulers.elastic(), PREFETCH)
-            .index()
-            .flatMap(pair -> writePart(bucketName, blobId, pair.getT1().intValue(), pair.getT2())
-                .then(Mono.just(getChunkNum(pair))))
-            .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
-            .<Integer>handle((t, sink) -> t.ifPresent(sink::next))
-            .map(this::numToCount)
-            .defaultIfEmpty(0);
+        return dumbBlobStore.save(bucketName, blobId, data)
+            .then(Mono.just(blobId));
     }
 
-
-    private int numToCount(int number) {
-        return number + 1;
+    @Override
+    public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) {
+        Preconditions.checkNotNull(bucketName);
+        Preconditions.checkNotNull(data);
+        HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
+        return Mono.using(
+            () -> new FileBackedOutputStream(FILE_THRESHOLD),
+            fileBackedOutputStream -> saveAndGenerateBlobId(bucketName, hashingInputStream, fileBackedOutputStream),
+            Throwing.consumer(FileBackedOutputStream::reset).sneakyThrow(),
+            LAZY_RESSOURCE_CLEANUP);
     }
 
-    private Integer getChunkNum(Tuple2<Long, ByteBuffer> pair) {
-        return pair.getT1().intValue();
+    private Mono<BlobId> saveAndGenerateBlobId(BucketName bucketName, HashingInputStream hashingInputStream, FileBackedOutputStream fileBackedOutputStream) {
+        return Mono.fromCallable(() -> {
+            IOUtils.copy(hashingInputStream, fileBackedOutputStream);
+            return Tuples.of(blobIdFactory.from(hashingInputStream.hash().toString()), fileBackedOutputStream.asByteSource());
+        })
+            .flatMap(tuple -> dumbBlobStore.save(bucketName, tuple.getT1(), tuple.getT2()).thenReturn(tuple.getT1()));
     }
 
     @Override
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
-        return readBlobParts(bucketName, blobId)
-            .collectList()
-            .map(this::byteBuffersToBytesArray);
+        Preconditions.checkNotNull(bucketName);
+        return dumbBlobStore.readBytes(bucketName, blobId);
     }
 
     @Override
     public InputStream read(BucketName bucketName, BlobId blobId) {
-        return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId));
-    }
-
-    @Override
-    public BucketName getDefaultBucketName() {
-        return BucketName.DEFAULT;
-    }
-
-    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
-        return selectRowCount(bucketName, blobId)
-            .publishOn(Schedulers.elastic())
-            .single()
-            .onErrorResume(NoSuchElementException.class, e -> Mono.error(
-                new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))))
-            .flatMapMany(rowCount -> Flux.range(0, rowCount))
-            .publishOn(Schedulers.elastic(), PREFETCH)
-            .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex)
-                .single()
-                .onErrorResume(NoSuchElementException.class, e -> Mono.error(
-                    new ObjectNotFoundException(String.format("Missing blob part for blobId %s and position %d", blobId, partIndex)))),
-                MAX_CONCURRENCY, PREFETCH);
-    }
-
-    @Override
-    public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) {
-        Preconditions.checkNotNull(data);
-        return Mono.fromCallable(() -> IOUtils.toByteArray(data))
-            .flatMap(bytes -> saveAsMono(bucketName, bytes));
+        Preconditions.checkNotNull(bucketName);
+        return dumbBlobStore.read(bucketName, blobId);
     }
 
     @Override
     public Mono<Void> deleteBucket(BucketName bucketName) {
         Preconditions.checkNotNull(bucketName);
-        Preconditions.checkArgument(!isDefaultBucket(bucketName), "Deleting the default bucket is forbidden");
 
-        return bucketDAO.listAll()
-            .filter(bucketNameBlobIdPair -> bucketNameBlobIdPair.getKey().equals(bucketName))
-            .map(Pair::getValue)
-            .flatMap(blobId -> delete(bucketName, blobId))
-            .then();
+        return dumbBlobStore.deleteBucket(bucketName);
     }
 
     @Override
-    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.deletePosition(blobId)
-                .then(defaultBucketDAO.deleteParts(blobId));
-        } else {
-            return bucketDAO.deletePosition(bucketName, blobId)
-                .then(bucketDAO.deleteParts(bucketName, blobId));
-        }
-    }
-
-    private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.readPart(blobId, partIndex);
-        } else {
-            return bucketDAO.readPart(bucketName, blobId, partIndex);
-        }
-    }
-
-    private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.selectRowCount(blobId);
-        } else {
-            return bucketDAO.selectRowCount(bucketName, blobId);
-        }
-    }
-
-    private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer numberOfChunk) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk);
-        } else {
-            return bucketDAO.saveBlobPartsReferences(bucketName, blobId, numberOfChunk);
-        }
+    public BucketName getDefaultBucketName() {
+        return defaultBucketName;
     }
 
-    private Mono<Void> writePart(BucketName bucketName, BlobId blobId, int position, ByteBuffer data) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.writePart(data, blobId, position);
-        } else {
-            return bucketDAO.writePart(data, bucketName, blobId, position);
-        }
-    }
+    @Override
+    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+        Preconditions.checkNotNull(bucketName);
+        Preconditions.checkNotNull(blobId);
 
-    private boolean isDefaultBucket(BucketName bucketName) {
-        return bucketName.equals(getDefaultBucketName());
+        return dumbBlobStore.delete(bucketName, blobId);
     }
 
-    private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) {
-        int targetSize = byteBuffers
-            .stream()
-            .mapToInt(ByteBuffer::remaining)
-            .sum();
-
-        return byteBuffers
-            .stream()
-            .reduce(ByteBuffer.allocate(targetSize), (accumulator, element) -> accumulator.put(element))
-            .array();
-    }
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
index 1ac59f6..581c4f1 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -52,19 +52,16 @@ public class CassandraDumbBlobStore implements DumbBlobStore {
     private final CassandraBucketDAO bucketDAO;
     private final DataChunker dataChunker;
     private final CassandraConfiguration configuration;
-    private final HashBlobId.Factory blobIdFactory;
     private final BucketName defaultBucket;
 
     @Inject
     CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
                            CassandraBucketDAO bucketDAO,
                            CassandraConfiguration cassandraConfiguration,
-                           HashBlobId.Factory blobIdFactory,
                            BucketName defaultBucket) {
         this.defaultBucketDAO = defaultBucketDAO;
         this.bucketDAO = bucketDAO;
         this.configuration = cassandraConfiguration;
-        this.blobIdFactory = blobIdFactory;
         this.defaultBucket = defaultBucket;
         this.dataChunker = new DataChunker();
     }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index 63c774e..51f6da8 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -35,6 +35,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.MetricableBlobStore;
 import org.apache.james.blob.api.MetricableBlobStoreContract;
@@ -47,7 +48,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.google.common.base.Strings;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.HashingInputStream;
-
 import reactor.core.publisher.Mono;
 
 public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
@@ -65,14 +65,15 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf());
         defaultBucketDAO = spy(new CassandraDefaultBucketDAO(cassandra.getConf()));
+        CassandraConfiguration cassandraConfiguration = CassandraConfiguration.builder()
+            .blobPartSize(CHUNK_SIZE)
+            .build();
         testee = new MetricableBlobStore(
             metricsTestExtension.getMetricFactory(),
-            new CassandraBlobStore(defaultBucketDAO,
-                bucketDAO,
-                CassandraConfiguration.builder()
-                    .blobPartSize(CHUNK_SIZE)
-                    .build(),
-                blobIdFactory));
+            new CassandraBlobStore(
+                blobIdFactory,
+                BucketName.DEFAULT,
+                new CassandraDumbBlobStore(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT)));
     }
 
     @Override
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
index 0e81087..42ce749 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
@@ -50,7 +50,6 @@ public class CassandraDumbBlobStoreTest implements DumbBlobStoreContract {
                 CassandraConfiguration.builder()
                     .blobPartSize(CHUNK_SIZE)
                     .build(),
-                blobIdFactory,
             BucketName.DEFAULT);
     }
 
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
index fa39f8c..f329125 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
@@ -57,7 +57,7 @@ class CassandraMailRepositoryTest implements MailRepositoryContract {
         CassandraMailRepositoryMailDaoAPI mailDAO = new MergingCassandraMailRepositoryMailDao(v1, v2);
         CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
         CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
-        CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+        CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
 
         cassandraMailRepository = new CassandraMailRepository(URL,
             keysDAO, countDAO, mailDAO, MimeMessageStore.factory(blobStore).mimeMessageStore());
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
index b59cabe..203b594 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
@@ -127,7 +127,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
             FailingMailDAO mailDAO = new FailingMailDAO();
             keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
             CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
-            CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+            CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
 
             cassandraMailRepository = new CassandraMailRepository(URL,
                     keysDAO, countDAO, mailDAO, MimeMessageStore.factory(blobStore).mimeMessageStore());
@@ -212,7 +212,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
             CassandraMailRepositoryMailDaoAPI mailDAO = new CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, cassandra.getTypesProvider());
             FailingKeysDAO keysDAO = new FailingKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
             countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
-            CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+            CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
 
             cassandraMailRepository = new CassandraMailRepository(URL,
                     keysDAO, countDAO, mailDAO, MimeMessageStore.factory(blobStore).mimeMessageStore());
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index 0e599f3..f363509 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -92,7 +92,7 @@ class RabbitMQMailQueueConfigurationChangeTest {
 
     @BeforeEach
     void setup(CassandraCluster cassandra) throws Exception {
-        CassandraBlobStore blobsDAO = new CassandraBlobStore(cassandra.getConf());
+        CassandraBlobStore blobsDAO = CassandraBlobStore.forTesting(cassandra.getConf());
         mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO);
         clock = new UpdatableTickingClock(IN_SLICE_1);
         mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index f8f7bd3..a745662 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -291,7 +291,7 @@ class RabbitMQMailQueueTest {
     }
 
     private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration) throws Exception {
-        CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+        CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
         MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
         clock = new UpdatableTickingClock(IN_SLICE_1);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org