You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/04/29 00:27:09 UTC

[james-project] branch master updated: JAMES-3573 Allow specifying DC in Cassandra configuration

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c850d4  JAMES-3573 Allow specifying DC in Cassandra configuration
8c850d4 is described below

commit 8c850d47a2e607f8ce88cf4e05d6934849e964df
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Apr 27 22:28:10 2021 +0700

    JAMES-3573 Allow specifying DC in Cassandra configuration
    
    This makes sense for Multi-DC Cassandra setups,
    especially given the use of LOCAL_QUORUM
    consistency level.
---
 .../backends/cassandra/init/ClusterFactory.java    |  2 ++
 .../init/configuration/ClusterConfiguration.java   | 36 ++++++++++++++++---
 .../configuration/ClusterConfigurationTest.java    | 42 ++++++++++++++++++++++
 .../pages/distributed/configure/cassandra.adoc     |  6 ++++
 src/site/xdoc/server/config-cassandra.xml          |  5 +++
 5 files changed, 86 insertions(+), 5 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ClusterFactory.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ClusterFactory.java
index 30bfa56..3f0cb20 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ClusterFactory.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ClusterFactory.java
@@ -42,6 +42,8 @@ public class ClusterFactory {
             .addContactPoint(server.getHostName())
             .withPort(server.getPort()));
 
+        configuration.getLoadBalancingPolicy().ifPresent(clusterBuilder::withLoadBalancingPolicy);
+
         configuration.getUsername().ifPresent(username ->
             configuration.getPassword().ifPresent(password ->
                 clusterBuilder.withCredentials(username, password)));
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/ClusterConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/ClusterConfiguration.java
index eb9eeee..d8207e5 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/ClusterConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/ClusterConfiguration.java
@@ -30,6 +30,9 @@ import org.apache.james.util.Host;
 
 import com.datastax.driver.core.HostDistance;
 import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
@@ -47,6 +50,7 @@ public class ClusterConfiguration {
         private Optional<Boolean> useSsl;
         private Optional<String> username;
         private Optional<String> password;
+        private Optional<String> localDc;
 
         public Builder() {
             hosts = ImmutableList.builder();
@@ -60,6 +64,7 @@ public class ClusterConfiguration {
             username = Optional.empty();
             password = Optional.empty();
             useSsl = Optional.empty();
+            localDc = Optional.empty();
         }
 
         public Builder host(Host host) {
@@ -163,6 +168,11 @@ public class ClusterConfiguration {
             return connectTimeoutMillis(Optional.of(connectTimeoutMillis));
         }
 
+        public Builder localDc(Optional<String> localDc) {
+            this.localDc = localDc;
+            return this;
+        }
+
         public ClusterConfiguration build() {
             return new ClusterConfiguration(
                 hosts.build(),
@@ -175,7 +185,8 @@ public class ClusterConfiguration {
                 connectTimeoutMillis.orElse(DEFAULT_CONNECT_TIMEOUT_MILLIS),
                 useSsl.orElse(false),
                 username,
-                password);
+                password,
+                localDc);
         }
     }
 
@@ -183,6 +194,7 @@ public class ClusterConfiguration {
     public static final String CASSANDRA_CREATE_KEYSPACE = "cassandra.keyspace.create";
     public static final String CASSANDRA_USER = "cassandra.user";
     public static final String CASSANDRA_PASSWORD = "cassandra.password";
+    public static final String CASSANDRA_LOCAL_DC = "cassandra.local.dc";
     public static final String CASSANDRA_SSL = "cassandra.ssl";
     public static final String READ_TIMEOUT_MILLIS = "cassandra.readTimeoutMillis";
     public static final String CONNECT_TIMEOUT_MILLIS = "cassandra.connectTimeoutMillis";
@@ -216,7 +228,8 @@ public class ClusterConfiguration {
             .connectTimeoutMillis(Optional.ofNullable(configuration.getInteger(CONNECT_TIMEOUT_MILLIS, null)))
             .useSsl(Optional.ofNullable(configuration.getBoolean(CASSANDRA_SSL, null)))
             .username(Optional.ofNullable(configuration.getString(CASSANDRA_USER, null)))
-            .password(Optional.ofNullable(configuration.getString(CASSANDRA_PASSWORD, null)));
+            .password(Optional.ofNullable(configuration.getString(CASSANDRA_PASSWORD, null)))
+            .localDc(Optional.ofNullable(configuration.getString(CASSANDRA_LOCAL_DC, null)));
         if (createKeySpace) {
             builder = builder.createKeyspace();
         }
@@ -273,11 +286,12 @@ public class ClusterConfiguration {
     private final boolean useSsl;
     private final Optional<String> username;
     private final Optional<String> password;
+    private final Optional<String> localDc;
 
     public ClusterConfiguration(List<Host> hosts, boolean createKeyspace, int minDelay, int maxRetry,
                                 Optional<QueryLoggerConfiguration> queryLoggerConfiguration, Optional<PoolingOptions> poolingOptions,
                                 int readTimeoutMillis, int connectTimeoutMillis, boolean useSsl, Optional<String> username,
-                                Optional<String> password) {
+                                Optional<String> password, Optional<String> localDc) {
         this.hosts = hosts;
         this.createKeyspace = createKeyspace;
         this.minDelay = minDelay;
@@ -289,6 +303,7 @@ public class ClusterConfiguration {
         this.useSsl = useSsl;
         this.username = username;
         this.password = password;
+        this.localDc = localDc;
     }
 
     public List<Host> getHosts() {
@@ -335,6 +350,16 @@ public class ClusterConfiguration {
         return password;
     }
 
+    public Optional<LoadBalancingPolicy> getLoadBalancingPolicy() {
+        return localDc.map(value -> new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder()
+            .withLocalDc(value)
+            .build()));
+    }
+
+    Optional<String> getLocalDc() {
+        return localDc;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof ClusterConfiguration) {
@@ -350,7 +375,8 @@ public class ClusterConfiguration {
                 && Objects.equals(this.connectTimeoutMillis, that.connectTimeoutMillis)
                 && Objects.equals(this.useSsl, that.useSsl)
                 && Objects.equals(this.username, that.username)
-                && Objects.equals(this.password, that.password);
+                && Objects.equals(this.password, that.password)
+                && Objects.equals(this.localDc, that.localDc);
         }
         return false;
     }
@@ -358,6 +384,6 @@ public class ClusterConfiguration {
     @Override
     public final int hashCode() {
         return Objects.hash(hosts, createKeyspace, minDelay, maxRetry, queryLoggerConfiguration, poolingOptions,
-            readTimeoutMillis, connectTimeoutMillis, username, useSsl, password);
+            readTimeoutMillis, connectTimeoutMillis, username, useSsl, password, localDc);
     }
 }
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/configuration/ClusterConfigurationTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/configuration/ClusterConfigurationTest.java
new file mode 100644
index 0000000..4be3dd2
--- /dev/null
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/configuration/ClusterConfigurationTest.java
@@ -0,0 +1,42 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.init.configuration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.junit.jupiter.api.Test;
+
+class ClusterConfigurationTest {
+    @Test
+    void fromShouldParseLocalDC() {
+        PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration();
+        propertiesConfiguration.addProperty("cassandra.local.dc", "DC2");
+
+        assertThat(ClusterConfiguration.from(propertiesConfiguration).getLocalDc()).contains("DC2");
+    }
+
+    @Test
+    void localDCShouldBeEmptyByDefault() {
+        PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration();
+
+        assertThat(ClusterConfiguration.from(propertiesConfiguration).getLocalDc()).isEmpty();
+    }
+}
\ No newline at end of file
diff --git a/docs/modules/servers/pages/distributed/configure/cassandra.adoc b/docs/modules/servers/pages/distributed/configure/cassandra.adoc
index 25c4789..b5d33a3 100644
--- a/docs/modules/servers/pages/distributed/configure/cassandra.adoc
+++ b/docs/modules/servers/pages/distributed/configure/cassandra.adoc
@@ -73,6 +73,12 @@ Allowed values: https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/dml/dml
 |Optional. Allows specifying the driver serial consistency level. Defaults to SERIAL.
 Allowed values: https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/dml/dmlConfigConsistency.html[SERIAL or LOCAL_SERIAL]
 
+|cassandra.local.dc
+|Optional. Allows specifying the local DC as part of the load balancing policy. Specifying it
+would result in the use of `new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(value).build())` as a LoadBalancingPolicy.
+This value is useful in a multi-DC Cassandra setup. Be aware of xref:../architecture/consistency-model.html#_about_multi_data_center_setups[limitation of multi-DC setups for James]
+Not specifying this value results in the driver's default load balancing policy to be used.
+
 |===
 
 == Pooling options
diff --git a/src/site/xdoc/server/config-cassandra.xml b/src/site/xdoc/server/config-cassandra.xml
index f91ed83..e188b4f 100644
--- a/src/site/xdoc/server/config-cassandra.xml
+++ b/src/site/xdoc/server/config-cassandra.xml
@@ -175,6 +175,11 @@
         <dd>Optional. Defaults to QUORUM.<br/> <a href="https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/dml/dmlConfigConsistency.html">QUORUM, LOCAL_QUORUM, or EACH_QUORUM</a>.</dd>
         <dt><strong>cassandra.consistency_level.lightweight_transaction</strong></dt>
         <dd>Optional. Defaults to SERIAL.<br/> <a href="https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/dml/dmlConfigConsistency.html">SERIAL or LOCAL_SERIAL</a>.</dd>
+        <dt><strong>cassandra.local.dc</strong></dt>
+        <dd>Optional. Allows specifying the local DC as part of the load balancing policy. Specifying it
+            would result in the use of <code>new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(value).build())</code> as a LoadBalancingPolicy.
+            This value is useful in a multi-DC Cassandra setup. Be aware of limitations of multi-DC setups for James.
+            Not specifying this value results in the driver's default load balancing policy to be used.</dd>
       </dl>
 
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org