You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/08/05 17:56:16 UTC

camel git commit: CAMEL-9050 Camel-cassandraql: Add loadbalancingpolicy as uri option

Repository: camel
Updated Branches:
  refs/heads/master 90121e537 -> aa0bb9277


CAMEL-9050 Camel-cassandraql: Add loadbalancingpolicy as uri option


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aa0bb927
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aa0bb927
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aa0bb927

Branch: refs/heads/master
Commit: aa0bb927747d2215273482775f1aa348eb0f2e5b
Parents: 90121e5
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Aug 5 17:50:06 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Aug 5 17:55:53 2015 +0200

----------------------------------------------------------------------
 .../component/cassandra/CassandraEndpoint.java  | 22 +++++++++
 .../CassandraLoadBalancingPolicies.java         | 48 ++++++++++++++++++++
 .../CassandraComponentProducerTest.java         | 21 +++++++++
 3 files changed, 91 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
index 0e28d9f..95ab660 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
@@ -21,6 +21,8 @@ import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Message;
@@ -31,6 +33,7 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.utils.cassandra.CassandraLoadBalancingPolicies;
 import org.apache.camel.utils.cassandra.CassandraSessionHolder;
 
 /**
@@ -65,6 +68,8 @@ public class CassandraEndpoint extends DefaultEndpoint {
     private Session session;
     @UriParam
     private ConsistencyLevel consistencyLevel;
+    @UriParam
+    private String loadBalancingPolicy;
 
     /**
      * How many rows should be retrieved in message body
@@ -140,6 +145,7 @@ public class CassandraEndpoint extends DefaultEndpoint {
     }
 
     protected Cluster.Builder createClusterBuilder() throws Exception {
+        CassandraLoadBalancingPolicies cassLoadBalancingPolicies = new CassandraLoadBalancingPolicies();
         Cluster.Builder clusterBuilder = Cluster.builder();
         for (String host : hosts.split(",")) {
             clusterBuilder = clusterBuilder.addContactPoint(host);
@@ -153,6 +159,9 @@ public class CassandraEndpoint extends DefaultEndpoint {
         if (username != null && !username.isEmpty() && password != null) {
             clusterBuilder.withCredentials(username, password);
         }
+        if (loadBalancingPolicy != null && !loadBalancingPolicy.isEmpty()) {
+            clusterBuilder.withLoadBalancingPolicy(cassLoadBalancingPolicies.getLoadBalancingPolicy(loadBalancingPolicy));
+        }
         return clusterBuilder;
     }
 
@@ -337,4 +346,17 @@ public class CassandraEndpoint extends DefaultEndpoint {
     public void setPrepareStatements(boolean prepareStatements) {
         this.prepareStatements = prepareStatements;
     }
+
+    /**
+     * To use a specific LoadBalancingPolicy
+     */
+    public String getLoadBalancingPolicy() {
+        return loadBalancingPolicy;
+    }
+
+    public void setLoadBalancingPolicy(String loadBalancingPolicy) {
+        this.loadBalancingPolicy = loadBalancingPolicy;
+    }
+    
+    
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java
new file mode 100644
index 0000000..30b735c
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraLoadBalancingPolicies.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.utils.cassandra;
+
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+
+public class CassandraLoadBalancingPolicies {
+
+    public final String roundRobinPolicy = "RoundRobinPolicy";
+    public final String tokenAwarePolicy = "TokenAwarePolicy";
+    public final String dcAwareRoundRobinPolicy = "DcAwareRoundRobinPolicy";
+    
+    public LoadBalancingPolicy getLoadBalancingPolicy(String policy) {
+        LoadBalancingPolicy loadBalancingPolicy = new RoundRobinPolicy();
+        switch (policy) {
+        case roundRobinPolicy:
+            loadBalancingPolicy = new RoundRobinPolicy();
+            break;
+        case tokenAwarePolicy:
+            loadBalancingPolicy = new TokenAwarePolicy(new RoundRobinPolicy());
+            break;
+        case dcAwareRoundRobinPolicy:
+            loadBalancingPolicy = new DCAwareRoundRobinPolicy();
+            break;
+        default:
+            throw new IllegalArgumentException("Cassandra load balancing policy can be " + roundRobinPolicy + " ," + tokenAwarePolicy 
+                   + " ," + dcAwareRoundRobinPolicy);
+        }
+        return loadBalancingPolicy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/aa0bb927/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
index f5825ab..71f02e5 100644
--- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
@@ -58,6 +58,9 @@ public class CassandraComponentProducerTest extends CamelTestSupport {
 
     @Produce(uri = "direct:inputNotConsistent")
     ProducerTemplate notConsistentProducerTemplate;
+    
+    @Produce(uri = "direct:loadBalancingPolicy")
+    ProducerTemplate loadBalancingPolicyTemplate;
 
     @BeforeClass
     public static void setUpClass() throws Exception {
@@ -78,6 +81,8 @@ public class CassandraComponentProducerTest extends CamelTestSupport {
                         .to("cql://localhost/camel_ks?cql=" + CQL);
                 from("direct:inputNoParameter")
                         .to("cql://localhost/camel_ks?cql=" + NO_PARAMETER_CQL);
+                from("direct:loadBalancingPolicy")
+                        .to("cql://localhost/camel_ks?cql=" + NO_PARAMETER_CQL + "&loadBalancingPolicy=RoundRobinPolicy");
                 from("direct:inputNotConsistent")
                         .to(NOT_CONSISTENT_URI);
             }
@@ -132,6 +137,22 @@ public class CassandraComponentProducerTest extends CamelTestSupport {
         session.close();
         cluster.close();
     }
+    
+    @Test
+    public void testLoadBalancing() throws Exception {
+        Object response = loadBalancingPolicyTemplate.requestBodyAndHeader(new Object[]{"Claus 2", "Ibsen 2", "c_ibsen"},
+                CassandraConstants.CQL_QUERY, "update camel_user set first_name=?, last_name=? where login=?");
+
+        Cluster cluster = CassandraUnitUtils.cassandraCluster();
+        Session session = cluster.connect(CassandraUnitUtils.KEYSPACE);
+        ResultSet resultSet = session.execute("select login, first_name, last_name from camel_user where login = ?", "c_ibsen");
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+        session.close();
+        cluster.close();
+    }
 
     /**
      * Test with incoming message containing a header with RegularStatement.