You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/05/19 09:52:31 UTC

[james-project] branch master updated (18ff6da -> f088eae)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 18ff6da  JAMES-3107 Fix some zeroed metrics
     new 192d3d9  JAMES-3586 Add optimisticConsistencyLevel parameter to Cassandra configuration
     new ff8f4f8  JAMES-3586 Do read with CL ONE in Cassandra Blobstore when optimistic CL is enabled
     new cdf0fa1  JAMES-3586 Add metrics for hit/miss ratio on Cassandra BlobStore reads with CL ONE
     new f088eae  JAMES-3586 Documentation for optimistic consistency level

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../init/configuration/CassandraConfiguration.java |  36 +++-
 .../pages/distributed/configure/cassandra.adoc     |   4 +
 .../cassandra/mail/CassandraMessageDAOTest.java    |   3 +-
 .../cassandra/mail/CassandraMessageDAOV3Test.java  |   3 +-
 .../mail/migration/MessageV3MigrationTest.java     |   3 +-
 .../mailbox/cassandra/mail/utils/GuiceUtils.java   |   3 +-
 .../blob/cassandra/CassandraBlobStoreDAO.java      |  54 ++++-
 .../blob/cassandra/CassandraBlobStoreFactory.java  |   5 +-
 .../james/blob/cassandra/CassandraBucketDAO.java   |  20 ++
 .../blob/cassandra/CassandraDefaultBucketDAO.java  |  18 ++
 .../cassandra/CassandraBlobStoreClOneTest.java     | 235 +++++++++++++++++++++
 .../blob/cassandra/CassandraBlobStoreDAOTest.java  |  12 +-
 .../blob/cassandra/CassandraBlobStoreTest.java     |   6 +-
 .../CassandraPassTroughBlobStoreTest.java          |  14 +-
 .../blob/cassandra/cache/CachedBlobStoreTest.java  |   2 +-
 .../sample-configuration/cassandra.properties      |   5 +
 .../sample-configuration/cassandra.properties      |   5 +
 .../mailrepository-cassandra/pom.xml               |   5 +
 .../cassandra/CassandraMailRepositoryTest.java     |   5 +-
 ...aMailRepositoryWithFakeImplementationsTest.java |   3 +-
 .../RabbitMQMailQueueConfigurationChangeTest.java  |   2 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |   7 +-
 src/site/xdoc/server/config-cassandra.xml          |   3 +
 23 files changed, 420 insertions(+), 33 deletions(-)
 create mode 100644 server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/04: JAMES-3586 Add metrics for hit/miss ratio on Cassandra BlobStore reads with CL ONE

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cdf0fa1960813ee9a3cfca691d1eab193d720aa7
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Tue May 18 16:30:42 2021 +0700

    JAMES-3586 Add metrics for hit/miss ratio on Cassandra BlobStore reads with CL ONE
---
 .../cassandra/mail/CassandraMessageDAOTest.java    |  3 +-
 .../cassandra/mail/CassandraMessageDAOV3Test.java  |  3 +-
 .../mail/migration/MessageV3MigrationTest.java     |  3 +-
 .../mailbox/cassandra/mail/utils/GuiceUtils.java   |  3 +-
 .../blob/cassandra/CassandraBlobStoreDAO.java      | 24 +++++++-
 .../blob/cassandra/CassandraBlobStoreFactory.java  |  5 +-
 .../cassandra/CassandraBlobStoreClOneTest.java     | 68 +++++++++++++++++++++-
 .../blob/cassandra/CassandraBlobStoreDAOTest.java  | 12 ++--
 .../blob/cassandra/CassandraBlobStoreTest.java     |  6 +-
 .../CassandraPassTroughBlobStoreTest.java          | 14 +++--
 .../blob/cassandra/cache/CachedBlobStoreTest.java  |  2 +-
 .../mailrepository-cassandra/pom.xml               |  5 ++
 .../cassandra/CassandraMailRepositoryTest.java     |  5 +-
 ...aMailRepositoryWithFakeImplementationsTest.java |  3 +-
 .../RabbitMQMailQueueConfigurationChangeTest.java  |  2 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |  7 ++-
 16 files changed, 133 insertions(+), 32 deletions(-)

diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index d343025..7003aae 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -86,7 +87,7 @@ class CassandraMessageDAOTest {
     void setUp(CassandraCluster cassandra) {
         messageIdFactory = new CassandraMessageId.Factory();
         messageId = messageIdFactory.generate();
-        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
             .passthrough();
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         testee = new CassandraMessageDAO(
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
index fe331c6..7ef00ea 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -85,7 +86,7 @@ class CassandraMessageDAOV3Test {
     void setUp(CassandraCluster cassandra) {
         CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
         messageId = messageIdFactory.generate();
-        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
             .passthrough();
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         testee = new CassandraMessageDAOV3(
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java
index 729759f..efd5cda 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java
@@ -48,6 +48,7 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -76,7 +77,7 @@ class MessageV3MigrationTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
             .passthrough();
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         daoV2 = new CassandraMessageDAO(
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
index eb3f20b..3f9c30d 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
@@ -44,6 +44,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.mail.eventsourcing.acl.ACLModule;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.UidProvider;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 
 import com.datastax.driver.core.Session;
 import com.google.common.collect.ImmutableSet;
@@ -80,7 +81,7 @@ public class GuiceUtils {
             binder -> binder.bind(UidProvider.class).to(CassandraUidProvider.class),
             binder -> binder.bind(ACLMapper.class).to(CassandraACLMapper.class),
             binder -> binder.bind(BlobId.Factory.class).toInstance(new HashBlobId.Factory()),
-            binder -> binder.bind(BlobStore.class).toProvider(() -> CassandraBlobStoreFactory.forTesting(session).passthrough()),
+            binder -> binder.bind(BlobStore.class).toProvider(() -> CassandraBlobStoreFactory.forTesting(session, new RecordingMetricFactory()).passthrough()),
             binder -> binder.bind(Session.class).toInstance(session),
             binder -> Multibinder.newSetBinder(binder, new TypeLiteral<EventDTOModule<? extends Event, ? extends EventDTO>>() {})
                 .addBinding().toInstance(ACLModule.ACL_UPDATE),
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
index f22e096..06cfc41 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
@@ -37,6 +37,8 @@ import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.metrics.api.Metric;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.DataChunker;
 import org.apache.james.util.ReactorUtils;
 
@@ -51,21 +53,33 @@ import reactor.core.publisher.Mono;
 public class CassandraBlobStoreDAO implements BlobStoreDAO {
     public static final boolean LAZY = false;
 
+    public static final String CASSANDRA_BLOBSTORE_CL_ONE_MISS_COUNT_METRIC_NAME = "cassandraBlobStoreClOneMisses";
+    public static final String CASSANDRA_BLOBSTORE_CL_ONE_HIT_COUNT_METRIC_NAME = "cassandraBlobStoreClOneHits";
+
     private final CassandraDefaultBucketDAO defaultBucketDAO;
     private final CassandraBucketDAO bucketDAO;
     private final CassandraConfiguration configuration;
     private final BucketName defaultBucket;
 
+    private final MetricFactory metricFactory;
+    private final Metric metricClOneHitCount;
+    private final Metric metricClOneMissCount;
+
     @Inject
     @VisibleForTesting
     public CassandraBlobStoreDAO(CassandraDefaultBucketDAO defaultBucketDAO,
                                  CassandraBucketDAO bucketDAO,
                                  CassandraConfiguration cassandraConfiguration,
-                                 @Named(BlobStore.DEFAULT_BUCKET_NAME_QUALIFIER) BucketName defaultBucket) {
+                                 @Named(BlobStore.DEFAULT_BUCKET_NAME_QUALIFIER) BucketName defaultBucket,
+                                 MetricFactory metricFactory) {
         this.defaultBucketDAO = defaultBucketDAO;
         this.bucketDAO = bucketDAO;
         this.configuration = cassandraConfiguration;
         this.defaultBucket = defaultBucket;
+        this.metricFactory = metricFactory;
+
+        this.metricClOneMissCount = metricFactory.generate(CASSANDRA_BLOBSTORE_CL_ONE_MISS_COUNT_METRIC_NAME);
+        this.metricClOneHitCount = metricFactory.generate(CASSANDRA_BLOBSTORE_CL_ONE_HIT_COUNT_METRIC_NAME);
     }
 
     @Override
@@ -168,7 +182,9 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
     private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
         if (configuration.isOptimisticConsistencyLevel()) {
             return readPartClOne(bucketName, blobId, partIndex)
-                .switchIfEmpty(readPartClDefault(bucketName, blobId, partIndex));
+                .doOnNext(any -> metricClOneHitCount.increment())
+                .switchIfEmpty(Mono.fromRunnable(metricClOneMissCount::increment)
+                    .then(readPartClDefault(bucketName, blobId, partIndex)));
         } else {
             return readPartClDefault(bucketName, blobId, partIndex);
         }
@@ -193,7 +209,9 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
     private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
         if (configuration.isOptimisticConsistencyLevel()) {
             return selectRowCountClOne(bucketName, blobId)
-                .switchIfEmpty(selectRowCountClDefault(bucketName, blobId));
+                .doOnNext(any -> metricClOneHitCount.increment())
+                .switchIfEmpty(Mono.fromRunnable(metricClOneMissCount::increment)
+                    .then(selectRowCountClDefault(bucketName, blobId)));
         } else {
             return selectRowCountClDefault(bucketName, blobId);
         }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreFactory.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreFactory.java
index 9c1ca73..8d0e679 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreFactory.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreFactory.java
@@ -22,16 +22,17 @@ package org.apache.james.blob.cassandra;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.server.blob.deduplication.BlobStoreFactory;
 
 import com.datastax.driver.core.Session;
 
 public class CassandraBlobStoreFactory {
-    public static BlobStoreFactory.RequireStoringStrategy forTesting(Session session) {
+    public static BlobStoreFactory.RequireStoringStrategy forTesting(Session session, MetricFactory metricFactory) {
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, session);
         CassandraDefaultBucketDAO defaultBucketDAO = new CassandraDefaultBucketDAO(session);
-        CassandraBlobStoreDAO blobStoreDAO = new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, CassandraConfiguration.DEFAULT_CONFIGURATION, BucketName.DEFAULT);
+        CassandraBlobStoreDAO blobStoreDAO = new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, CassandraConfiguration.DEFAULT_CONFIGURATION, BucketName.DEFAULT, metricFactory);
         return BlobStoreFactory.builder()
             .blobStoreDAO(blobStoreDAO)
             .blobIdFactory(blobIdFactory)
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
index a9da043..cd8bfad 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
@@ -20,8 +20,12 @@
 package org.apache.james.blob.cassandra;
 
 import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.blob.cassandra.CassandraBlobStoreDAO.CASSANDRA_BLOBSTORE_CL_ONE_HIT_COUNT_METRIC_NAME;
+import static org.apache.james.blob.cassandra.CassandraBlobStoreDAO.CASSANDRA_BLOBSTORE_CL_ONE_MISS_COUNT_METRIC_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Durations.FIVE_SECONDS;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -38,6 +42,7 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.MetricableBlobStore;
 import org.apache.james.blob.api.ObjectStoreException;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.server.blob.deduplication.BlobStoreFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -63,10 +68,11 @@ class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract {
             .blobPartSize(CHUNK_SIZE)
             .optimisticConsistencyLevel(true)
             .build();
+        MetricFactory metricFactory = metricsTestExtension.getMetricFactory();
         testee = new MetricableBlobStore(
-            metricsTestExtension.getMetricFactory(),
+            metricFactory,
             BlobStoreFactory.builder()
-                .blobStoreDAO(new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT))
+                .blobStoreDAO(new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT, metricFactory))
                 .blobIdFactory(blobIdFactory)
                 .defaultBucketName()
                 .deduplication());
@@ -168,4 +174,62 @@ class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract {
 
         assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
     }
+
+    @Test
+    void readShouldPublishHitRatioClOneMetric() {
+        BlobStore store = testee();
+
+        BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
+        store.read(store.getDefaultBucketName(), blobId);
+
+        await().atMost(FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().countFor(CASSANDRA_BLOBSTORE_CL_ONE_HIT_COUNT_METRIC_NAME))
+                .isEqualTo(2));
+    }
+
+    @Test
+    void readBytesShouldPublishHitRatioClOneMetric() {
+        BlobStore store = testee();
+
+        BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
+        Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
+
+        await().atMost(FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().countFor(CASSANDRA_BLOBSTORE_CL_ONE_HIT_COUNT_METRIC_NAME))
+                .isEqualTo(2));
+    }
+
+    @Test
+    void readShouldPublishMissRatioClOneMetric() {
+        BlobStore store = testee();
+
+        BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
+
+        when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+        store.read(store.getDefaultBucketName(), blobId);
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+        store.read(store.getDefaultBucketName(), blobId);
+
+        await().atMost(FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().countFor(CASSANDRA_BLOBSTORE_CL_ONE_MISS_COUNT_METRIC_NAME))
+                .isEqualTo(2));
+    }
+
+    @Test
+    void readBytesShouldPublishMissRatioClOneMetric() {
+        BlobStore store = testee();
+
+        BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
+
+        when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+        Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+        Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
+
+        await().atMost(FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().countFor(CASSANDRA_BLOBSTORE_CL_ONE_MISS_COUNT_METRIC_NAME))
+                .isEqualTo(2));
+    }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java
index a24497a..6b2ad1d 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java
@@ -26,6 +26,7 @@ import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BlobStoreDAOContract;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -46,11 +47,12 @@ public class CassandraBlobStoreDAOTest implements BlobStoreDAOContract {
         defaultBucketDAO = new CassandraDefaultBucketDAO(cassandra.getConf());
         testee = new CassandraBlobStoreDAO(
             defaultBucketDAO,
-                bucketDAO,
-                CassandraConfiguration.builder()
-                    .blobPartSize(CHUNK_SIZE)
-                    .build(),
-            BucketName.DEFAULT);
+            bucketDAO,
+            CassandraConfiguration.builder()
+                .blobPartSize(CHUNK_SIZE)
+                .build(),
+            BucketName.DEFAULT,
+            new RecordingMetricFactory());
     }
 
     @Override
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index 35bae4b..a3b7748 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.DeduplicationBlobStoreContract;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.MetricableBlobStore;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.server.blob.deduplication.BlobStoreFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -49,10 +50,11 @@ public class CassandraBlobStoreTest implements CassandraBlobStoreContract, Dedup
         CassandraConfiguration cassandraConfiguration = CassandraConfiguration.builder()
             .blobPartSize(CHUNK_SIZE)
             .build();
+        MetricFactory metricFactory = metricsTestExtension.getMetricFactory();
         testee = new MetricableBlobStore(
-            metricsTestExtension.getMetricFactory(),
+            metricFactory,
             BlobStoreFactory.builder()
-                .blobStoreDAO(new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT))
+                .blobStoreDAO(new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT, metricFactory))
                 .blobIdFactory(blobIdFactory)
                 .defaultBucketName()
                 .deduplication());
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraPassTroughBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraPassTroughBlobStoreTest.java
index 2489e62..c1bbbda 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraPassTroughBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraPassTroughBlobStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.DeleteBlobStoreContract;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.MetricableBlobStore;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.server.blob.deduplication.BlobStoreFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -49,13 +50,14 @@ public class CassandraPassTroughBlobStoreTest implements DeleteBlobStoreContract
         CassandraConfiguration cassandraConfiguration = CassandraConfiguration.builder()
             .blobPartSize(CHUNK_SIZE)
             .build();
+        MetricFactory metricFactory = metricsTestExtension.getMetricFactory();
         testee = new MetricableBlobStore(
-            metricsTestExtension.getMetricFactory(),
-                BlobStoreFactory.builder()
-                    .blobStoreDAO(new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT))
-                    .blobIdFactory(blobIdFactory)
-                    .defaultBucketName()
-                    .passthrough());
+            metricFactory,
+            BlobStoreFactory.builder()
+                .blobStoreDAO(new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT, metricFactory))
+                .blobIdFactory(blobIdFactory)
+                .defaultBucketName()
+                .passthrough());
     }
 
     @Override
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
index b27ca8c..5fdd36f8 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
@@ -83,7 +83,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        backend = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+        backend = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
             .passthrough();
         CassandraCacheConfiguration cacheConfig = new CassandraCacheConfiguration.Builder()
             .sizeThresholdInBytes(EIGHT_KILOBYTES.length + 1)
diff --git a/server/mailrepository/mailrepository-cassandra/pom.xml b/server/mailrepository/mailrepository-cassandra/pom.xml
index 42f4ffc..a22c963 100644
--- a/server/mailrepository/mailrepository-cassandra/pom.xml
+++ b/server/mailrepository/mailrepository-cassandra/pom.xml
@@ -79,6 +79,11 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>metrics-tests</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>testing-base</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
index 3545bdb..3f1d9b0 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
@@ -38,6 +38,7 @@ import org.apache.james.mailrepository.MailRepositoryContract;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
@@ -66,7 +67,7 @@ class CassandraMailRepositoryTest {
             CassandraMailRepositoryMailDaoAPI mailDAO = new MergingCassandraMailRepositoryMailDao(v1, v2);
             CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
             CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
-            BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+            BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
                 .passthrough();
 
             cassandraMailRepository = new CassandraMailRepository(URL,
@@ -106,7 +107,7 @@ class CassandraMailRepositoryTest {
             CassandraMailRepositoryMailDaoAPI mailDAO = new MergingCassandraMailRepositoryMailDao(v1, v2);
             CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
             CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
-            BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+            BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
                 .deduplication();
 
             cassandraMailRepository = new CassandraMailRepository(URL,
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
index a420ac5..73b24e0 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
@@ -37,6 +37,7 @@ import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.core.builder.MimeMessageBuilder;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.server.core.MailImpl;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
@@ -66,7 +67,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
         CassandraMailRepositoryMailDaoAPI mailDAO = new CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, cassandra.getTypesProvider());
         keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
         countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
-        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
             .passthrough();
 
         cassandraMailRepository = new CassandraMailRepository(URL,
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index b4041f6..2c79f17 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -102,7 +102,7 @@ class RabbitMQMailQueueConfigurationChangeTest {
 
     @BeforeEach
     void setup(CassandraCluster cassandra) throws Exception {
-        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+        BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
             .passthrough();
         mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
         clock = new UpdatableTickingClock(IN_SLICE_1);
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 85e44c1..32e0d8b 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -61,6 +61,7 @@ import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.core.builder.MimeMessageBuilder;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
 import org.apache.james.metrics.api.Gauge;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueMetricContract;
 import org.apache.james.queue.api.MailQueueMetricExtension;
@@ -136,7 +137,7 @@ class RabbitMQMailQueueTest {
                 RabbitMQMailQueueConfiguration.builder()
                     .sizeMetricsEnabled(true)
                     .build(),
-                CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+                CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
                     .passthrough());
         }
 
@@ -802,7 +803,7 @@ class RabbitMQMailQueueTest {
                 RabbitMQMailQueueConfiguration.builder()
                     .sizeMetricsEnabled(false)
                     .build(),
-                CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+                CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
                     .passthrough());
         }
 
@@ -825,7 +826,7 @@ class RabbitMQMailQueueTest {
                 RabbitMQMailQueueConfiguration.builder()
                     .sizeMetricsEnabled(true)
                     .build(),
-                CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+                CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
                     .deduplication());
         }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/04: JAMES-3586 Add optimisticConsistencyLevel parameter to Cassandra configuration

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 192d3d9fa2d5ee36766fa408f98e3407acacda13
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Mon May 17 17:48:24 2021 +0700

    JAMES-3586 Add optimisticConsistencyLevel parameter to Cassandra configuration
---
 .../init/configuration/CassandraConfiguration.java | 36 +++++++++++++++++++---
 1 file changed, 31 insertions(+), 5 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
index 0519b13..ce6b87a 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
@@ -58,6 +58,7 @@ public class CassandraConfiguration {
     public static final List<String> VALID_CONSISTENCY_LEVEL_REGULAR = ImmutableList.of("QUORUM", "LOCAL_QUORUM", "EACH_QUORUM");
     public static final List<String> VALID_CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = ImmutableList.of("SERIAL", "LOCAL_SERIAL");
     public static final boolean DEFAULT_STRONG_CONSISTENCY = true;
+    public static final boolean DEFAULT_OPTIMISTIC_CONSISTENCY_LEVEL = false;
 
     private static final String MAILBOX_READ_REPAIR = "mailbox.read.repair.chance";
     private static final String MAILBOX_MAX_COUNTERS_READ_REPAIR = "mailbox.counters.read.repair.chance.max";
@@ -78,6 +79,7 @@ public class CassandraConfiguration {
     private static final String MESSAGE_WRITE_STRONG_CONSISTENCY = "message.write.strong.consistency.unsafe";
     private static final String CONSISTENCY_LEVEL_REGULAR = "cassandra.consistency_level.regular";
     private static final String CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = "cassandra.consistency_level.lightweight_transaction";
+    private static final String OPTIMISTIC_CONSISTENCY_LEVEL = "optimistic.consistency.level.enabled";
 
     public static final CassandraConfiguration DEFAULT_CONFIGURATION = builder().build();
 
@@ -101,6 +103,7 @@ public class CassandraConfiguration {
         private Optional<Boolean> mailboxReadStrongConsistency = Optional.empty();
         private Optional<Boolean> messageReadStrongConsistency = Optional.empty();
         private Optional<Boolean> messageWriteStrongConsistency = Optional.empty();
+        private Optional<Boolean> optimisticConsistencyLevel = Optional.empty();
 
         public Builder mailboxReadStrongConsistency(boolean value) {
             this.mailboxReadStrongConsistency = Optional.of(value);
@@ -313,6 +316,16 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder optimisticConsistencyLevel(boolean value) {
+            this.optimisticConsistencyLevel = Optional.of(value);
+            return this;
+        }
+
+        public Builder optimisticConsistencyLevel(Optional<Boolean> value) {
+            this.optimisticConsistencyLevel = value;
+            return this;
+        }
+
         public CassandraConfiguration build() {
             String consistencyLevelRegular = this.consistencyLevelRegular.orElse(DEFAULT_CONSISTENCY_LEVEL_REGULAR);
             String consistencyLevelLightweightTransaction = this.consistencyLevelLightweightTransaction.orElse(DEFAULT_CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION);
@@ -340,7 +353,8 @@ public class CassandraConfiguration {
                 mailboxCountersReadRepairChanceOneHundred.orElse(DEFAULT_ONE_HUNDRED_MAILBOX_COUNTERS_READ_REPAIR_CHANCE),
                 mailboxReadStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY),
                 messageReadStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY),
-                messageWriteStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY));
+                messageWriteStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY),
+                optimisticConsistencyLevel.orElse(DEFAULT_OPTIMISTIC_CONSISTENCY_LEVEL));
         }
     }
 
@@ -388,6 +402,8 @@ public class CassandraConfiguration {
                 propertiesConfiguration.getBoolean(MESSAGE_READ_STRONG_CONSISTENCY, null)))
             .messageWriteStrongConsistency(Optional.ofNullable(
                 propertiesConfiguration.getBoolean(MESSAGE_WRITE_STRONG_CONSISTENCY, null)))
+            .optimisticConsistencyLevel(Optional.ofNullable(
+                propertiesConfiguration.getBoolean(OPTIMISTIC_CONSISTENCY_LEVEL, null)))
             .build();
     }
 
@@ -410,6 +426,7 @@ public class CassandraConfiguration {
     private final boolean mailboxReadStrongConsistency;
     private final boolean messageReadStrongConsistency;
     private final boolean messageWriteStrongConsistency;
+    private final boolean optimisticConsistencyLevel;
 
     @VisibleForTesting
     CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize,
@@ -418,8 +435,9 @@ public class CassandraConfiguration {
                            int blobPartSize, final int attachmentV2MigrationReadTimeout, int messageAttachmentIdsReadTimeout,
                            String consistencyLevelRegular, String consistencyLevelLightweightTransaction,
                            float mailboxReadRepair, float mailboxCountersReadRepairChanceMax,
-                           float mailboxCountersReadRepairChanceOneHundred, boolean mailboxReadStrongConsistency, boolean messageReadStrongConsistency,
-                           boolean messageWriteStrongConsistency) {
+                           float mailboxCountersReadRepairChanceOneHundred, boolean mailboxReadStrongConsistency,
+                           boolean messageReadStrongConsistency, boolean messageWriteStrongConsistency,
+                           boolean optimisticConsistencyLevel) {
         this.aclMaxRetry = aclMaxRetry;
         this.messageReadChunkSize = messageReadChunkSize;
         this.expungeChunkSize = expungeChunkSize;
@@ -439,6 +457,7 @@ public class CassandraConfiguration {
         this.mailboxReadStrongConsistency = mailboxReadStrongConsistency;
         this.messageReadStrongConsistency = messageReadStrongConsistency;
         this.messageWriteStrongConsistency = messageWriteStrongConsistency;
+        this.optimisticConsistencyLevel = optimisticConsistencyLevel;
     }
 
     public boolean isMailboxReadStrongConsistency() {
@@ -517,6 +536,10 @@ public class CassandraConfiguration {
         return mailboxCountersReadRepairChanceOneHundred;
     }
 
+    public boolean isOptimisticConsistencyLevel() {
+        return optimisticConsistencyLevel;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof CassandraConfiguration) {
@@ -540,7 +563,8 @@ public class CassandraConfiguration {
                 && Objects.equals(this.messageReadStrongConsistency, that.messageReadStrongConsistency)
                 && Objects.equals(this.messageWriteStrongConsistency, that.messageWriteStrongConsistency)
                 && Objects.equals(this.consistencyLevelRegular, that.consistencyLevelRegular)
-                && Objects.equals(this.consistencyLevelLightweightTransaction, that.consistencyLevelLightweightTransaction);
+                && Objects.equals(this.consistencyLevelLightweightTransaction, that.consistencyLevelLightweightTransaction)
+                && Objects.equals(this.optimisticConsistencyLevel, that.optimisticConsistencyLevel);
         }
         return false;
     }
@@ -552,7 +576,8 @@ public class CassandraConfiguration {
             mailboxCountersReadRepairChanceOneHundred, mailboxCountersReadRepairChanceMax,
             blobPartSize, attachmentV2MigrationReadTimeout, messageAttachmentIdsReadTimeout,
             consistencyLevelRegular, consistencyLevelLightweightTransaction, mailboxReadRepair,
-            messageReadStrongConsistency, mailboxReadStrongConsistency, messageWriteStrongConsistency);
+            messageReadStrongConsistency, mailboxReadStrongConsistency, messageWriteStrongConsistency,
+            optimisticConsistencyLevel);
     }
 
     @Override
@@ -577,6 +602,7 @@ public class CassandraConfiguration {
             .add("mailboxReadStrongConsistency", mailboxReadStrongConsistency)
             .add("consistencyLevelRegular", consistencyLevelRegular)
             .add("consistencyLevelLightweightTransaction", consistencyLevelLightweightTransaction)
+            .add("optimisticConsistencyLevel", optimisticConsistencyLevel)
             .toString();
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/04: JAMES-3586 Do read with CL ONE in Cassandra Blobstore when optimistic CL is enabled

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ff8f4f8d620b45678d29343f704fd165fde1647f
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Tue May 18 14:52:56 2021 +0700

    JAMES-3586 Do read with CL ONE in Cassandra Blobstore when optimistic CL is enabled
    
    If the data is not found, it could be an inconsistency and we fall back to the normal CL read.
---
 .../blob/cassandra/CassandraBlobStoreDAO.java      |  34 ++++
 .../james/blob/cassandra/CassandraBucketDAO.java   |  20 +++
 .../blob/cassandra/CassandraDefaultBucketDAO.java  |  18 +++
 .../cassandra/CassandraBlobStoreClOneTest.java     | 171 +++++++++++++++++++++
 4 files changed, 243 insertions(+)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
index 5c0b069..f22e096 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
@@ -166,6 +166,23 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
     }
 
     private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
+        if (configuration.isOptimisticConsistencyLevel()) {
+            return readPartClOne(bucketName, blobId, partIndex)
+                .switchIfEmpty(readPartClDefault(bucketName, blobId, partIndex));
+        } else {
+            return readPartClDefault(bucketName, blobId, partIndex);
+        }
+    }
+
+    private Mono<ByteBuffer> readPartClOne(BucketName bucketName, BlobId blobId, Integer partIndex) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.readPartClOne(blobId, partIndex);
+        } else {
+            return bucketDAO.readPartClOne(bucketName, blobId, partIndex);
+        }
+    }
+
+    private Mono<ByteBuffer> readPartClDefault(BucketName bucketName, BlobId blobId, Integer partIndex) {
         if (isDefaultBucket(bucketName)) {
             return defaultBucketDAO.readPart(blobId, partIndex);
         } else {
@@ -174,6 +191,23 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
     }
 
     private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
+        if (configuration.isOptimisticConsistencyLevel()) {
+            return selectRowCountClOne(bucketName, blobId)
+                .switchIfEmpty(selectRowCountClDefault(bucketName, blobId));
+        } else {
+            return selectRowCountClDefault(bucketName, blobId);
+        }
+    }
+
+    private Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId blobId) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.selectRowCountClOne(blobId);
+        } else {
+            return bucketDAO.selectRowCountClOne(bucketName, blobId);
+        }
+    }
+
+    private Mono<Integer> selectRowCountClDefault(BucketName bucketName, BlobId blobId) {
         if (isDefaultBucket(bucketName)) {
             return defaultBucketDAO.selectRowCount(blobId);
         } else {
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index 0287f37..809d4c3 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.cassandra;
 
+import static com.datastax.driver.core.ConsistencyLevel.ONE;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -145,6 +146,15 @@ public class CassandraBucketDAO {
             .map(row -> row.getInt(NUMBER_OF_CHUNK));
     }
 
+    Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId blobId) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            select.bind()
+                .setString(BUCKET, bucketName.asString())
+                .setString(ID, blobId.asString())
+                .setConsistencyLevel(ONE))
+            .map(row -> row.getInt(NUMBER_OF_CHUNK));
+    }
+
     Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
@@ -154,6 +164,16 @@ public class CassandraBucketDAO {
             .map(this::rowToData);
     }
 
+    Mono<ByteBuffer> readPartClOne(BucketName bucketName, BlobId blobId, int position) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            selectPart.bind()
+                .setString(BucketBlobParts.BUCKET, bucketName.asString())
+                .setString(BucketBlobParts.ID, blobId.asString())
+                .setInt(BucketBlobParts.CHUNK_NUMBER, position)
+                .setConsistencyLevel(ONE))
+            .map(this::rowToData);
+    }
+
     Mono<Void> deletePosition(BucketName bucketName, BlobId blobId) {
         return cassandraAsyncExecutor.executeVoid(
             delete.bind()
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
index af99a4b..0866491 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.cassandra;
 
+import static com.datastax.driver.core.ConsistencyLevel.ONE;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -123,6 +124,14 @@ public class CassandraDefaultBucketDAO {
             .map(row -> row.getInt(NUMBER_OF_CHUNK));
     }
 
+    Mono<Integer> selectRowCountClOne(BlobId blobId) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            select.bind()
+                .setString(ID, blobId.asString())
+                .setConsistencyLevel(ONE))
+            .map(row -> row.getInt(NUMBER_OF_CHUNK));
+    }
+
     Mono<ByteBuffer> readPart(BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
@@ -131,6 +140,15 @@ public class CassandraDefaultBucketDAO {
             .map(this::rowToData);
     }
 
+    Mono<ByteBuffer> readPartClOne(BlobId blobId, int position) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            selectPart.bind()
+                .setString(DefaultBucketBlobParts.ID, blobId.asString())
+                .setInt(DefaultBucketBlobParts.CHUNK_NUMBER, position)
+                .setConsistencyLevel(ONE))
+            .map(this::rowToData);
+    }
+
     Mono<Void> deletePosition(BlobId blobId) {
         return cassandraAsyncExecutor.executeVoid(
             delete.bind()
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
new file mode 100644
index 0000000..a9da043
--- /dev/null
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
@@ -0,0 +1,171 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.cassandra;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.MetricableBlobStore;
+import org.apache.james.blob.api.ObjectStoreException;
+import org.apache.james.server.blob.deduplication.BlobStoreFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.base.Strings;
+
+import reactor.core.publisher.Mono;
+
+class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract {
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraBlobModule.MODULE);
+
+    private BlobStore testee;
+    private CassandraDefaultBucketDAO defaultBucketDAO;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+        CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf());
+        defaultBucketDAO = spy(new CassandraDefaultBucketDAO(cassandra.getConf()));
+        CassandraConfiguration cassandraConfiguration = CassandraConfiguration.builder()
+            .blobPartSize(CHUNK_SIZE)
+            .optimisticConsistencyLevel(true)
+            .build();
+        testee = new MetricableBlobStore(
+            metricsTestExtension.getMetricFactory(),
+            BlobStoreFactory.builder()
+                .blobStoreDAO(new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT))
+                .blobIdFactory(blobIdFactory)
+                .defaultBucketName()
+                .deduplication());
+    }
+
+    @Override
+    public BlobStore testee() {
+        return testee;
+    }
+
+    @Override
+    public BlobId.Factory blobIdFactory() {
+        return new HashBlobId.Factory();
+    }
+
+    @Override
+    public CassandraDefaultBucketDAO defaultBucketDAO() {
+        return defaultBucketDAO;
+    }
+
+    @Override
+    @Test
+    public void readShouldNotReturnInvalidResultsWhenPartialDataPresent() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+        when(defaultBucketDAO().readPart(blobId, 1)).thenReturn(Mono.empty());
+
+        assertThatThrownBy(() -> IOUtils.toString(testee().read(testee().getDefaultBucketName(), blobId), StandardCharsets.UTF_8))
+            .isInstanceOf(ObjectStoreException.class)
+            .hasMessageContaining("Missing blob part for blobId");
+    }
+
+    @Override
+    @Test
+    public void readBytesShouldNotReturnInvalidResultsWhenPartialDataPresent() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+        when(defaultBucketDAO().readPart(blobId, 1)).thenReturn(Mono.empty());
+
+        assertThatThrownBy(() -> Mono.from(testee().readBytes(testee().getDefaultBucketName(), blobId)).block())
+            .isInstanceOf(ObjectStoreException.class)
+            .hasMessageContaining("Missing blob part for blobId");
+    }
+
+    @Test
+    void readShouldReturnValidResultWhenDataMissingInOneNodeButPresentInOthers() throws IOException {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+
+        String data = IOUtils.toString(testee().read(testee().getDefaultBucketName(), blobId), StandardCharsets.UTF_8);
+
+        assertThat(data).isEqualTo(longString);
+    }
+
+    @Test
+    void readBytesShouldReturnValidResultWhenDataMissingInOneNodeButPresentInOthers() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+
+        byte[] bytes = Mono.from(testee().readBytes(testee().getDefaultBucketName(), blobId)).block();
+
+        assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
+    }
+
+    @Test
+    void readShouldReturnValidResultWhenPartialDataMissingInOneNodeButPresentInOthers() throws IOException {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+
+        String data = IOUtils.toString(testee().read(testee().getDefaultBucketName(), blobId), StandardCharsets.UTF_8);
+
+        assertThat(data).isEqualTo(longString);
+    }
+
+    @Test
+    void readBytesShouldReturnValidResultWhenPartialDataMissingInOneNodeButPresentInOthers() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+
+        byte[] bytes = Mono.from(testee().readBytes(testee().getDefaultBucketName(), blobId)).block();
+
+        assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/04: JAMES-3586 Documentation for optimistic consistency level

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f088eae59af13e02946ee25c856a5b4b4408a32f
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Tue May 18 17:09:44 2021 +0700

    JAMES-3586 Documentation for optimistic consistency level
---
 docs/modules/servers/pages/distributed/configure/cassandra.adoc      | 4 ++++
 .../guice/cassandra-guice/sample-configuration/cassandra.properties  | 5 +++++
 .../sample-configuration/cassandra.properties                        | 5 +++++
 src/site/xdoc/server/config-cassandra.xml                            | 3 +++
 4 files changed, 17 insertions(+)

diff --git a/docs/modules/servers/pages/distributed/configure/cassandra.adoc b/docs/modules/servers/pages/distributed/configure/cassandra.adoc
index b5d33a3..813d838 100644
--- a/docs/modules/servers/pages/distributed/configure/cassandra.adoc
+++ b/docs/modules/servers/pages/distributed/configure/cassandra.adoc
@@ -79,6 +79,10 @@ would result in the use of `new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder
 This value is useful in a multi-DC Cassandra setup. Be aware of xref:../architecture/consistency-model.html#_about_multi_data_center_setups[limitation of multi-DC setups for James]
 Not specifying this value results in the driver's default load balancing policy to be used.
 
+|optimistic.consistency.level.enabled
+|Optional. Allows specifying consistency level ONE for reads in Cassandra BlobStore.
+Falls back to default read consistency level if the blob is missing. Defaults to false.
+
 |===
 
 == Pooling options
diff --git a/server/container/guice/cassandra-guice/sample-configuration/cassandra.properties b/server/container/guice/cassandra-guice/sample-configuration/cassandra.properties
index 797d4a7..a8ece8e 100644
--- a/server/container/guice/cassandra-guice/sample-configuration/cassandra.properties
+++ b/server/container/guice/cassandra-guice/sample-configuration/cassandra.properties
@@ -47,3 +47,8 @@ cassandra.retryConnection.minDelay=5000
 # cassandra.consistency_level.regular=QUORUM
 # SERIAL or LOCAL_SERIAL
 # cassandra.consistency_level.lightweight_transaction=SERIAL
+
+# Optional. Allows specifying consistency level ONE for reads in Cassandra BlobStore.
+# Falls back to default read consistency level if the blob is missing.
+# Defaults to false.
+# optimistic.consistency.level.enabled=false
diff --git a/server/container/guice/cassandra-rabbitmq-guice/sample-configuration/cassandra.properties b/server/container/guice/cassandra-rabbitmq-guice/sample-configuration/cassandra.properties
index 6b17425..c915e21 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/sample-configuration/cassandra.properties
+++ b/server/container/guice/cassandra-rabbitmq-guice/sample-configuration/cassandra.properties
@@ -47,3 +47,8 @@ cassandra.retryConnection.minDelay=5000
 # cassandra.consistency_level.regular=QUORUM
 # SERIAL or LOCAL_SERIAL
 # cassandra.consistency_level.lightweight_transaction=SERIAL
+
+# Optional. Allows specifying consistency level ONE for reads in Cassandra BlobStore.
+# Falls back to default read consistency level if the blob is missing.
+# Defaults to false.
+# optimistic.consistency.level.enabled=false
diff --git a/src/site/xdoc/server/config-cassandra.xml b/src/site/xdoc/server/config-cassandra.xml
index 64cb8cd..e05d687 100644
--- a/src/site/xdoc/server/config-cassandra.xml
+++ b/src/site/xdoc/server/config-cassandra.xml
@@ -180,6 +180,9 @@
             would result in the use of <code>new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(value).build())</code> as a LoadBalancingPolicy.
             This value is useful in a multi-DC Cassandra setup. Be aware of limitations of multi-DC setups for James.
             Not specifying this value results in the driver's default load balancing policy to be used.</dd>
+        <dt><strong>optimistic.consistency.level.enabled</strong></dt>
+        <dd>Optional. Defaults to false. Allows specifying consistency level ONE for reads in Cassandra BlobStore.
+            Falls back to default read consistency level if the blob is missing.</dd>
       </dl>
 
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org