You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/08/05 17:56:16 UTC
camel git commit: CAMEL-9050 Camel-cassandraql: Add
loadbalancingpolicy as uri option
Repository: camel
Updated Branches:
refs/heads/master 90121e537 -> aa0bb9277
CAMEL-9050 Camel-cassandraql: Add loadbalancingpolicy as uri option
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aa0bb927
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aa0bb927
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aa0bb927
Branch: refs/heads/master
Commit: aa0bb927747d2215273482775f1aa348eb0f2e5b
Parents: 90121e5
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Aug 5 17:50:06 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Aug 5 17:55:53 2015 +0200
----------------------------------------------------------------------
.../component/cassandra/CassandraEndpoint.java | 22 +++++++++
.../CassandraLoadBalancingPolicies.java | 48 ++++++++++++++++++++
.../CassandraComponentProducerTest.java | 21 +++++++++
3 files changed, 91 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
index 0e28d9f..95ab660 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
@@ -21,6 +21,8 @@ import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Message;
@@ -31,6 +33,7 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.utils.cassandra.CassandraLoadBalancingPolicies;
import org.apache.camel.utils.cassandra.CassandraSessionHolder;
/**
@@ -65,6 +68,8 @@ public class CassandraEndpoint extends DefaultEndpoint {
private Session session;
@UriParam
private ConsistencyLevel consistencyLevel;
+ @UriParam
+ private String loadBalancingPolicy;
/**
* How many rows should be retrieved in message body
@@ -140,6 +145,7 @@ public class CassandraEndpoint extends DefaultEndpoint {
}
protected Cluster.Builder createClusterBuilder() throws Exception {
+ CassandraLoadBalancingPolicies cassLoadBalancingPolicies = new CassandraLoadBalancingPolicies();
Cluster.Builder clusterBuilder = Cluster.builder();
for (String host : hosts.split(",")) {
clusterBuilder = clusterBuilder.addContactPoint(host);
@@ -153,6 +159,9 @@ public class CassandraEndpoint extends DefaultEndpoint {
if (username != null && !username.isEmpty() && password != null) {
clusterBuilder.withCredentials(username, password);
}
+ if (loadBalancingPolicy != null && !loadBalancingPolicy.isEmpty()) {
+ clusterBuilder.withLoadBalancingPolicy(cassLoadBalancingPolicies.getLoadBalancingPolicy(loadBalancingPolicy));
+ }
return clusterBuilder;
}
@@ -337,4 +346,17 @@ public class CassandraEndpoint extends DefaultEndpoint {
public void setPrepareStatements(boolean prepareStatements) {
this.prepareStatements = prepareStatements;
}
+
+ /**
+ * To use a specific LoadBalancingPolicy
+ */
+ public String getLoadBalancingPolicy() {
+ return loadBalancingPolicy;
+ }
+
+ public void setLoadBalancingPolicy(String loadBalancingPolicy) {
+ this.loadBalancingPolicy = loadBalancingPolicy;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java
new file mode 100644
index 0000000..30b735c
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.utils.cassandra;
+
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+
+public class CassandraLoadBalancingPolicies {
+
+ public final String roundRobinPolicy = "RoundRobinPolicy";
+ public final String tokenAwarePolicy = "TokenAwarePolicy";
+ public final String dcAwareRoundRobinPolicy = "DcAwareRoundRobinPolicy";
+
+ public LoadBalancingPolicy getLoadBalancingPolicy(String policy) {
+ LoadBalancingPolicy loadBalancingPolicy = new RoundRobinPolicy();
+ switch (policy) {
+ case roundRobinPolicy:
+ loadBalancingPolicy = new RoundRobinPolicy();
+ break;
+ case tokenAwarePolicy:
+ loadBalancingPolicy = new TokenAwarePolicy(new RoundRobinPolicy());
+ break;
+ case dcAwareRoundRobinPolicy:
+ loadBalancingPolicy = new DCAwareRoundRobinPolicy();
+ break;
+ default:
+ throw new IllegalArgumentException("Cassandra load balancing policy can be " + roundRobinPolicy + " ," + tokenAwarePolicy
+ + " ," + dcAwareRoundRobinPolicy);
+ }
+ return loadBalancingPolicy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
index f5825ab..71f02e5 100644
--- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
@@ -58,6 +58,9 @@ public class CassandraComponentProducerTest extends CamelTestSupport {
@Produce(uri = "direct:inputNotConsistent")
ProducerTemplate notConsistentProducerTemplate;
+
+ @Produce(uri = "direct:loadBalancingPolicy")
+ ProducerTemplate loadBalancingPolicyTemplate;
@BeforeClass
public static void setUpClass() throws Exception {
@@ -78,6 +81,8 @@ public class CassandraComponentProducerTest extends CamelTestSupport {
.to("cql://localhost/camel_ks?cql=" + CQL);
from("direct:inputNoParameter")
.to("cql://localhost/camel_ks?cql=" + NO_PARAMETER_CQL);
+ from("direct:loadBalancingPolicy")
+ .to("cql://localhost/camel_ks?cql=" + NO_PARAMETER_CQL + "&loadBalancingPolicy=RoundRobinPolicy");
from("direct:inputNotConsistent")
.to(NOT_CONSISTENT_URI);
}
@@ -132,6 +137,22 @@ public class CassandraComponentProducerTest extends CamelTestSupport {
session.close();
cluster.close();
}
+
+ @Test
+ public void testLoadBalancing() throws Exception {
+ Object response = loadBalancingPolicyTemplate.requestBodyAndHeader(new Object[]{"Claus 2", "Ibsen 2", "c_ibsen"},
+ CassandraConstants.CQL_QUERY, "update camel_user set first_name=?, last_name=? where login=?");
+
+ Cluster cluster = CassandraUnitUtils.cassandraCluster();
+ Session session = cluster.connect(CassandraUnitUtils.KEYSPACE);
+ ResultSet resultSet = session.execute("select login, first_name, last_name from camel_user where login = ?", "c_ibsen");
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Claus 2", row.getString("first_name"));
+ assertEquals("Ibsen 2", row.getString("last_name"));
+ session.close();
+ cluster.close();
+ }
/**
* Test with incoming message containing a header with RegularStatement.