You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/24 10:34:38 UTC

[james-project] 07/07: JAMES-3224 Use CassandraConsistenciesConfiguration for default consistency levels

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 26268b1817e4af7f9cc788b85e80f08ae977f6b1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 22 15:20:26 2020 +0700

    JAMES-3224 Use CassandraConsistenciesConfiguration for default consistency levels
---
 .../backends/cassandra/init/ClusterFactory.java      | 11 ++++++-----
 .../cassandra/init/ResilientClusterProvider.java     |  9 +++++----
 .../CassandraConsistenciesConfiguration.java         |  2 ++
 .../james/backends/cassandra/CassandraCluster.java   |  3 ++-
 .../james/backends/cassandra/DockerCassandra.java    | 20 +++-----------------
 .../backends/cassandra/init/ClusterFactoryTest.java  |  7 ++++---
 .../cassandra/init/ResilientClusterProviderTest.java |  8 +++++---
 .../SessionWithInitializedTablesFactoryTest.java     |  3 ++-
 .../integration/rabbitmq/FixingGhostMailboxTest.java |  3 ++-
 9 files changed, 31 insertions(+), 35 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ClusterFactory.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ClusterFactory.java
index dc696df..30bfa56 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ClusterFactory.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ClusterFactory.java
@@ -21,11 +21,11 @@ package org.apache.james.backends.cassandra.init;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
+import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
 
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SocketOptions;
@@ -33,7 +33,7 @@ import com.google.common.base.Preconditions;
 
 public class ClusterFactory {
 
-    public static Cluster create(ClusterConfiguration configuration) {
+    public static Cluster create(ClusterConfiguration configuration, CassandraConsistenciesConfiguration consistenciesConfiguration) {
         Preconditions.checkState(configuration.getUsername().isPresent() == configuration.getPassword().isPresent(), "If you specify username, you must specify password");
 
         Cluster.Builder clusterBuilder = Cluster.builder()
@@ -46,7 +46,7 @@ public class ClusterFactory {
             configuration.getPassword().ifPresent(password ->
                 clusterBuilder.withCredentials(username, password)));
 
-        clusterBuilder.withQueryOptions(queryOptions());
+        clusterBuilder.withQueryOptions(queryOptions(consistenciesConfiguration));
 
         SocketOptions socketOptions = new SocketOptions();
         socketOptions.setReadTimeoutMillis(configuration.getReadTimeoutMillis());
@@ -71,9 +71,10 @@ public class ClusterFactory {
         }
     }
 
-    private static QueryOptions queryOptions() {
+    private static QueryOptions queryOptions(CassandraConsistenciesConfiguration consistenciesConfiguration) {
         return new QueryOptions()
-                .setConsistencyLevel(ConsistencyLevel.QUORUM);
+                .setConsistencyLevel(consistenciesConfiguration.getRegular())
+                .setSerialConsistencyLevel(consistenciesConfiguration.getLightweightTransaction());
     }
 
     private static void ensureContactable(Cluster cluster) {
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
index 4eeb502..999e24d 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
@@ -30,6 +30,7 @@ import javax.inject.Inject;
 import javax.inject.Provider;
 import javax.inject.Singleton;
 
+import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,21 +51,21 @@ public class ResilientClusterProvider implements Provider<Cluster> {
 
     @VisibleForTesting
     @Inject
-    ResilientClusterProvider(ClusterConfiguration configuration) {
+    ResilientClusterProvider(ClusterConfiguration configuration, CassandraConsistenciesConfiguration consistenciesConfiguration) {
         Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
-        cluster = Mono.fromCallable(getClusterRetryCallable(configuration))
+        cluster = Mono.fromCallable(getClusterRetryCallable(configuration, consistenciesConfiguration))
             .doOnError(e -> LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", waitDelay, e))
             .retryWhen(Retry.backoff(configuration.getMaxRetry(), waitDelay).scheduler(Schedulers.elastic()))
             .publishOn(Schedulers.elastic())
             .block();
     }
 
-    private Callable<Cluster> getClusterRetryCallable(ClusterConfiguration configuration) {
+    private Callable<Cluster> getClusterRetryCallable(ClusterConfiguration configuration, CassandraConsistenciesConfiguration consistenciesConfiguration) {
         LOGGER.info("Trying to connect to Cassandra service at {} (list {})", LocalDateTime.now(),
             ImmutableList.copyOf(configuration.getHosts()).toString());
 
         return () -> {
-            Cluster cluster = ClusterFactory.create(configuration);
+            Cluster cluster = ClusterFactory.create(configuration, consistenciesConfiguration);
             try {
                 keyspaceExist(cluster, "any"); // plays a sample query to ensure we can contact the cluster
                 return cluster;
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConsistenciesConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConsistenciesConfiguration.java
index e1f71e6..8a70b45 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConsistenciesConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConsistenciesConfiguration.java
@@ -25,6 +25,8 @@ import com.datastax.driver.core.ConsistencyLevel;
 import com.google.common.base.MoreObjects;
 
 public class CassandraConsistenciesConfiguration {
+    public static final CassandraConsistenciesConfiguration DEFAULT = new CassandraConsistenciesConfiguration(ConsistencyLevel.QUORUM, ConsistencyLevel.SERIAL);
+
     public static ConsistencyLevel fromString(String value) {
         switch (value) {
             case "QUORUM":
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
index 2090236..48c787f 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java
@@ -25,6 +25,7 @@ import org.apache.james.backends.cassandra.init.CassandraTableManager;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.init.ClusterFactory;
 import org.apache.james.backends.cassandra.init.SessionWithInitializedTablesFactory;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.KeyspaceConfiguration;
 import org.apache.james.util.Host;
@@ -59,7 +60,7 @@ public final class CassandraCluster implements AutoCloseable {
         this.module = module;
 
         this.clusterConfiguration = DockerCassandra.configurationBuilder(host).build();
-        this.nonPrivilegedCluster = ClusterFactory.create(clusterConfiguration);
+        this.nonPrivilegedCluster = ClusterFactory.create(clusterConfiguration, CassandraConsistenciesConfiguration.DEFAULT);
         KeyspaceConfiguration keyspaceConfiguration = KeyspaceConfiguration.builder()
             .keyspace(KEYSPACE)
             .replicationFactor(1)
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandra.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandra.java
index e11113d..2ef8b92 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandra.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandra.java
@@ -21,6 +21,7 @@ package org.apache.james.backends.cassandra;
 
 import org.apache.james.backends.cassandra.init.ClusterFactory;
 import org.apache.james.backends.cassandra.init.KeyspaceFactory;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.KeyspaceConfiguration;
 import org.apache.james.util.Host;
@@ -34,7 +35,6 @@ import org.testcontainers.images.builder.dockerfile.DockerfileBuilder;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 import com.github.dockerjava.api.DockerClient;
 import com.google.common.collect.ImmutableMap;
 
@@ -59,28 +59,14 @@ public class DockerCassandra {
         }
 
         public void initializeKeyspace(KeyspaceConfiguration configuration) {
-            try (Cluster privilegedCluster = ClusterFactory.create(cassandra.superUserConfigurationBuilder().build())) {
+            try (Cluster privilegedCluster = ClusterFactory.create(cassandra.superUserConfigurationBuilder().build(),
+                    CassandraConsistenciesConfiguration.DEFAULT)) {
                 provisionNonPrivilegedUser(privilegedCluster);
                 KeyspaceFactory.createKeyspace(configuration, privilegedCluster);
                 grantPermissionToTestingUser(privilegedCluster, configuration.getKeyspace());
             }
         }
 
-        public void dropKeyspace(String keyspace) {
-            try (Cluster cluster = ClusterFactory.create(cassandra.superUserConfigurationBuilder().build())) {
-                try (Session cassandraSession = cluster.newSession()) {
-                    boolean applied = cassandraSession.execute(
-                        SchemaBuilder.dropKeyspace(keyspace)
-                            .ifExists())
-                        .wasApplied();
-
-                    if (!applied) {
-                        throw new IllegalStateException("cannot drop keyspace '" + keyspace + "'");
-                    }
-                }
-            }
-        }
-
         private void provisionNonPrivilegedUser(Cluster privilegedCluster) {
             try (Session session = privilegedCluster.newSession()) {
                 session.execute("CREATE ROLE IF NOT EXISTS " + CASSANDRA_TESTING_USER + " WITH PASSWORD = '" + CASSANDRA_TESTING_PASSWORD + "' AND LOGIN = true");
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/ClusterFactoryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/ClusterFactoryTest.java
index 7799644..11e30cd 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/ClusterFactoryTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/ClusterFactoryTest.java
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.DockerCassandra;
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -47,7 +48,7 @@ class ClusterFactoryTest {
     @Test
     void consistencyLevelShouldBeEqualToQuorum(DockerCassandra dockerCassandra) {
         Cluster cluster = ClusterFactory.create(dockerCassandra.configurationBuilder()
-            .build());
+            .build(), CassandraConsistenciesConfiguration.DEFAULT);
 
         ConsistencyLevel consistencyLevel = cluster.getConfiguration()
             .getQueryOptions()
@@ -62,14 +63,14 @@ class ClusterFactoryTest {
 
         assertThatThrownBy(() -> ClusterFactory.create(
             dockerCassandra.configurationBuilder()
-                .build()))
+                .build(), CassandraConsistenciesConfiguration.DEFAULT))
             .isInstanceOf(NoHostAvailableException.class);
     }
 
     @Test
     void createShouldReturnAContactableCluster(DockerCassandra dockerCassandra) {
         Cluster cluster = ClusterFactory.create(dockerCassandra.configurationBuilder()
-            .build());
+            .build(), CassandraConsistenciesConfiguration.DEFAULT);
 
         assertThatClusterIsContactable(cluster);
     }
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/ResilientClusterProviderTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/ResilientClusterProviderTest.java
index 29142d0..13dd5f7 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/ResilientClusterProviderTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/ResilientClusterProviderTest.java
@@ -26,6 +26,7 @@ import java.time.Duration;
 
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -38,7 +39,7 @@ class ResilientClusterProviderTest {
 
     @Test
     void getShouldNotThrowWhenHealthyCassandra() {
-        assertThatCode(() -> new ResilientClusterProvider(cassandraExtension.clusterConfiguration().build())
+        assertThatCode(() -> new ResilientClusterProvider(cassandraExtension.clusterConfiguration().build(), CassandraConsistenciesConfiguration.DEFAULT)
                 .get())
             .doesNotThrowAnyException();
     }
@@ -50,7 +51,7 @@ class ResilientClusterProviderTest {
             assertThatThrownBy(() -> new ResilientClusterProvider(cassandraExtension.clusterConfiguration()
                     .maxRetry(1)
                     .minDelay(1)
-                    .build())
+                    .build(), CassandraConsistenciesConfiguration.DEFAULT)
                 .get())
                 .isInstanceOf(Exception.class);
         } finally {
@@ -68,7 +69,8 @@ class ResilientClusterProviderTest {
                 .subscribeOn(Schedulers.elastic())
                 .subscribe();
 
-            assertThatCode(() -> new ResilientClusterProvider(cassandraExtension.clusterConfiguration().build())
+            assertThatCode(() -> new ResilientClusterProvider(cassandraExtension.clusterConfiguration().build(),
+                    CassandraConsistenciesConfiguration.DEFAULT)
                 .get())
                 .doesNotThrowAnyException();
         } finally {
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
index 84a1a71..8f2ae89 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
@@ -29,6 +29,7 @@ import java.util.function.Supplier;
 import org.apache.james.backends.cassandra.DockerCassandra;
 import org.apache.james.backends.cassandra.DockerCassandraExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.KeyspaceConfiguration;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
@@ -123,7 +124,7 @@ class SessionWithInitializedTablesFactoryTest {
     private static Supplier<Session> createSession(DockerCassandraExtension.DockerCassandra cassandraServer) {
         ClusterConfiguration clusterConfiguration = DockerCassandra.configurationBuilder(cassandraServer.getHost())
             .build();
-        Cluster cluster = ClusterFactory.create(clusterConfiguration);
+        Cluster cluster = ClusterFactory.create(clusterConfiguration, CassandraConsistenciesConfiguration.DEFAULT);
         KeyspaceConfiguration keyspaceConfiguration = DockerCassandra.mainKeyspaceConfiguration();
         KeyspaceFactory.createKeyspace(keyspaceConfiguration, cluster);
         return () -> new SessionWithInitializedTablesFactory(
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/FixingGhostMailboxTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/FixingGhostMailboxTest.java
index bf73116..cd59cd5 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/FixingGhostMailboxTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/FixingGhostMailboxTest.java
@@ -51,6 +51,7 @@ import org.apache.james.JamesServerBuilder;
 import org.apache.james.JamesServerExtension;
 import org.apache.james.SearchConfiguration;
 import org.apache.james.backends.cassandra.init.ClusterFactory;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
 import org.apache.james.core.Username;
 import org.apache.james.jmap.AccessToken;
@@ -161,7 +162,7 @@ class FixingGhostMailboxTest {
 
         CassandraProbe probe = server.getProbe(CassandraProbe.class);
         ClusterConfiguration cassandraConfiguration = probe.getConfiguration();
-        try (Cluster cluster = ClusterFactory.create(cassandraConfiguration)) {
+        try (Cluster cluster = ClusterFactory.create(cassandraConfiguration, CassandraConsistenciesConfiguration.DEFAULT)) {
             try (Session session = cluster.connect(probe.getMainKeyspaceConfiguration().getKeyspace())) {
                 simulateGhostMailboxBug(session);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org