You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/03/28 10:12:00 UTC
[1/2] incubator-apex-malhar git commit: APEXMALHAR-1948:
CassandraStore Should Allow You To Specify Protocol Version.
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master d9abee962 -> c829adaf1
APEXMALHAR-1948: CassandraStore Should Allow You To Specify Protocol Version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/84131850
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/84131850
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/84131850
Branch: refs/heads/master
Commit: 84131850c4bea86d5d08ddbaa1254027ccba2492
Parents: 5373a3c
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed Feb 10 15:14:08 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Fri Mar 18 15:32:20 2016 +0530
----------------------------------------------------------------------
contrib/pom.xml | 9 +++-
.../contrib/cassandra/CassandraStore.java | 49 ++++++++++++++++----
.../cassandra/CassandraOperatorTest.java | 29 +++++++++++-
3 files changed, 74 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 687e9c9..4bbd8f5 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -486,7 +486,14 @@
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
- <version>2.0.2</version>
+ <version>2.1.8</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ <scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
index 49ed20c..5d9178c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
@@ -24,9 +24,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
-
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.db.Connectable;
@@ -47,6 +47,7 @@ public class CassandraStore implements Connectable
private String node;
protected transient Cluster cluster = null;
protected transient Session session = null;
+ private String protocolVersion;
@NotNull
protected String keyspace;
@@ -90,6 +91,20 @@ public class CassandraStore implements Connectable
this.password = password;
}
+ public String getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
+ /**
+ * Sets the protocolVersion of Cassandra
+ * @param protocolVersion as V1, V2, V3 etc
+ */
+ public void setProtocolVersion(String protocolVersion)
+ {
+ this.protocolVersion = protocolVersion;
+ }
+
@NotNull
public String getNode() {
return node;
@@ -115,21 +130,35 @@ public class CassandraStore implements Connectable
/**
* Creates a cluster object.
*/
- public void buildCluster(){
-
+ public void buildCluster()
+ {
try {
-
- cluster = Cluster.builder()
- .addContactPoint(node).withCredentials(userName, password).build();
- }
- catch (DriverException ex) {
+ if (protocolVersion != null && protocolVersion.length() != 0) {
+ ProtocolVersion version = getCassandraProtocolVersion();
+ cluster = Cluster.builder().addContactPoint(node).withCredentials(userName, password).withProtocolVersion(version).build();
+ } else {
+ cluster = Cluster.builder().addContactPoint(node).withCredentials(userName, password).build();
+ }
+ } catch (DriverException ex) {
throw new RuntimeException("closing database resource", ex);
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
DTThrowable.rethrow(t);
}
}
+ private ProtocolVersion getCassandraProtocolVersion()
+ {
+ switch (protocolVersion.toUpperCase()) {
+ case "V1":
+ return ProtocolVersion.V1;
+ case "V2":
+ return ProtocolVersion.V2;
+ case "V3":
+ return ProtocolVersion.V3;
+ default:
+ throw new RuntimeException("Unsupported Cassandra Protocol Version.");
+ }
+ }
/**
* Create connection with database.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 68a1e5c..56d9857 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -24,19 +24,18 @@ import com.datatorrent.api.Attribute;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
-
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Lists;
+
import java.util.*;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -219,6 +218,32 @@ public class CassandraOperatorTest
}
@Test
+ public void testCassandraProtocolVersion()
+ {
+ CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
+ transactionalStore.setNode(NODE);
+ transactionalStore.setKeyspace(KEYSPACE);
+ transactionalStore.setProtocolVersion("v2");
+
+ AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+
+ TestOutputOperator outputOperator = new TestOutputOperator();
+
+ outputOperator.setTablename(TABLE_NAME);
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("id", "id", null));
+
+ outputOperator.setStore(transactionalStore);
+ outputOperator.setFieldInfos(fieldInfos);
+ outputOperator.setup(context);
+
+ Configuration config = outputOperator.getStore().getCluster().getConfiguration();
+ Assert.assertEquals("Procotol version was not set to V2.", ProtocolVersion.V2, config.getProtocolOptions().getProtocolVersionEnum());
+ }
+
+ @Test
public void testCassandraOutputOperator()
{
CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
[2/2] incubator-apex-malhar git commit: Merge branch
'APEXMALHAR-1948-cassandra-protocolVersion' of
https://github.com/DT-Priyanka/incubator-apex-malhar
Posted by bh...@apache.org.
Merge branch 'APEXMALHAR-1948-cassandra-protocolVersion' of https://github.com/DT-Priyanka/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c829adaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c829adaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c829adaf
Branch: refs/heads/master
Commit: c829adaf177f29241fb64221010bc911e8d7d907
Parents: d9abee9 8413185
Author: bhupesh <bh...@gmail.com>
Authored: Mon Mar 28 11:22:45 2016 +0530
Committer: bhupesh <bh...@gmail.com>
Committed: Mon Mar 28 11:22:45 2016 +0530
----------------------------------------------------------------------
contrib/pom.xml | 9 +++-
.../contrib/cassandra/CassandraStore.java | 49 ++++++++++++++++----
.../cassandra/CassandraOperatorTest.java | 29 +++++++++++-
3 files changed, 74 insertions(+), 13 deletions(-)
----------------------------------------------------------------------