You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gi...@apache.org on 2020/09/03 01:21:41 UTC

[camel-kafka-connector] 04/08: Fix cassandra for camel 3.5

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ea4f0bdae35a76559bc812fb0b95f3bca73d7a25
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Aug 25 09:49:41 2020 +0200

    Fix cassandra for camel 3.5
    
    Adjusts the client code to use the newer Cassandra client version from
    camel-cassandraql
---
 .../cassandra/clients/CassandraClient.java         | 18 ++++-----
 .../cassandra/clients/dao/TestDataDao.java         | 43 ++++++++++++++--------
 .../dao/TestResultSetConversionStrategy.java       |  4 +-
 .../cassandra/sink/CamelSinkCassandraITCase.java   |  6 ++-
 .../source/CamelSourceCassandraITCase.java         |  8 +++-
 5 files changed, 49 insertions(+), 30 deletions(-)

diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/CassandraClient.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/CassandraClient.java
index 2c26f24..b8b1704 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/CassandraClient.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/CassandraClient.java
@@ -17,24 +17,24 @@
 
 package org.apache.camel.kafkaconnector.cassandra.clients;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
+import java.net.InetSocketAddress;
+
+import com.datastax.oss.driver.api.core.CqlSession;
 import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao;
 
 /**
  * A simple client for Cassandra for testing purposes
  */
 public class CassandraClient {
-    private Cluster cluster;
-    private Session session;
+    private CqlSession session;
 
     public CassandraClient(String host, int port) {
-        cluster = Cluster.builder()
-                .addContactPoint(host)
-                .withPort(port)
-                .build();
+        InetSocketAddress socketAddress = new InetSocketAddress(host, port);
 
-        session = cluster.connect();
+        session = CqlSession.builder()
+                .addContactPoint(socketAddress)
+                .withLocalDatacenter("datacenter1")
+                .build();
     }
 
 
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestDataDao.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestDataDao.java
index 4fcde37..ea8de7a 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestDataDao.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestDataDao.java
@@ -23,11 +23,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,9 +38,9 @@ public class TestDataDao {
     private static final Logger LOG = LoggerFactory.getLogger(TestDataDao.class);
 
 
-    private final Session session;
+    private final CqlSession session;
 
-    public TestDataDao(Session session) {
+    public TestDataDao(CqlSession session) {
         this.session = session;
     }
 
@@ -52,12 +52,16 @@ public class TestDataDao {
 
         String statement = SchemaBuilder.createKeyspace(KEY_SPACE)
                 .ifNotExists()
-                .with()
-                .replication(replication).getQueryString();
+                .withReplicationOptions(replication)
+                .asCql();
 
         LOG.info("Executing {}", statement);
 
-        session.execute(statement);
+        ResultSet rs = session.execute(statement);
+
+        if (!rs.wasApplied()) {
+            LOG.warn("The create key space statement did not execute");
+        }
     }
 
     public void useKeySpace() {
@@ -69,22 +73,29 @@ public class TestDataDao {
 
     public void createTable() {
         String statement = SchemaBuilder.createTable(TABLE_NAME)
-                .addPartitionKey("id", DataType.timeuuid())
-                .addClusteringColumn("text", DataType.text())
-                .getQueryString();
+                .withPartitionKey("id", DataTypes.TIMEUUID)
+                .withClusteringColumn("text", DataTypes.TEXT)
+                .asCql();
+
 
         LOG.info("Executing create table {}", statement);
 
-        session.execute(statement);
+        ResultSet rs = session.execute(statement);
+        if (!rs.wasApplied()) {
+            LOG.warn("The create table statement did not execute");
+        }
     }
 
     public void dropTable() {
         String statement = SchemaBuilder.dropTable(TABLE_NAME)
-                .getQueryString();
+                .asCql();
 
         LOG.info("Executing drop table {}", statement);
 
-        session.execute(statement);
+        ResultSet rs = session.execute(statement);
+        if (!rs.wasApplied()) {
+            LOG.warn("The drop table statement did not execute");
+        }
     }
 
     public boolean hasEnoughData(long expected) {
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestResultSetConversionStrategy.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestResultSetConversionStrategy.java
index 84f57a2..63febeb 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestResultSetConversionStrategy.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestResultSetConversionStrategy.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
 import org.apache.camel.component.cassandra.ResultSetConversionStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index 0cdf1b6..c583d71 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -77,7 +77,11 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
         cassandraClient = cassandraService.getClient();
 
         if (testDataDao != null) {
-            testDataDao.dropTable();
+            try {
+                testDataDao.dropTable();
+            } catch (Exception e) {
+                LOG.warn("Unable to drop the table: {}", e.getMessage(), e);
+            }
         }
     }
 
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
index 707f923..25ef1ee 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
@@ -19,7 +19,7 @@ package org.apache.camel.kafkaconnector.cassandra.source;
 
 import java.util.concurrent.ExecutionException;
 
-import com.datastax.driver.core.Row;
+import com.datastax.oss.driver.api.core.cql.Row;
 import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient;
 import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao;
 import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestResultSetConversionStrategy;
@@ -78,7 +78,11 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest {
         cassandraClient = cassandraService.getClient();
 
         if (testDataDao != null) {
-            testDataDao.dropTable();
+            try {
+                testDataDao.dropTable();
+            } catch (Exception e) {
+                LOG.warn("Unable to drop the table: {}", e.getMessage(), e);
+            }
         }
     }