You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/22 16:44:22 UTC

[camel-kafka-connector-examples] branch master updated: Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new cef55e5  Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component
cef55e5 is described below

commit cef55e5570168527b28fe27a492fbfbff657a084
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 22 18:42:30 2020 +0200

    Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component
---
 cql/cql-source/README.adoc                         | 106 ++++++++++++++++++++-
 .../CamelCassandraQLSourceConnector.properties     |   1 +
 2 files changed, 103 insertions(+), 4 deletions(-)

diff --git a/cql/cql-source/README.adoc b/cql/cql-source/README.adoc
index 8c874e4..76362d0 100644
--- a/cql/cql-source/README.adoc
+++ b/cql/cql-source/README.adoc
@@ -24,14 +24,112 @@ Open the `$KAFKA_HOME/config/connect-standalone.properties`
 
 and set the `plugin.path` property to your choosen location
 
-In this example we'll use `/home/oscerd/connectors/`
+You'll need to build your connector starting from an archetype:
+
+```
+> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.5.0
+[INFO] Scanning for projects...
+[INFO] 
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO] 
+[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+[INFO] 
+[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+[INFO] 
+[INFO] 
+[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
+[INFO] Generating project in Interactive mode
+Define value for property 'groupId': org.apache.camel.kafkaconnector.cql.extended
+Define value for property 'artifactId': cql-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0
+Define value for property 'package' org.apache.camel.kafkaconnector.cql.extended: : 
+Define value for property 'camel-kafka-connector-name': camel-cql-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.5.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector.cql.extended
+artifactId: cql-extended
+version: 0.5.0
+package: org.apache.camel.kafkaconnector.cql.extended
+camel-kafka-connector-name: camel-cql-kafka-connector
+camel-kafka-connector-version: 0.5.0
+ Y: : Y
+[INFO] ----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0
+[INFO] ----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended
+[INFO] Parameter: artifactId, Value: cql-extended
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/cql/extended
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended
+[INFO] Parameter: camel-kafka-connector-name, Value: camel-cql-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0
+[INFO] Parameter: artifactId, Value: cql-extended
+[INFO] Project created from Archetype in dir: /home/oscerd/playground/cql-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time:  55.314 s
+[INFO] Finished at: 2020-10-22T18:06:34+02:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/oscerd/playground/cql-extended
+```
+
+Import the cql-extended project in your favorite IDE and create the following class as RowConversionStrategy
+
+```
+package org.apache.camel.kafkaconnector.cql.extended;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import org.apache.camel.component.cassandra.ResultSetConversionStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RowConversionStrategy implements ResultSetConversionStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(RowConversionStrategy.class);
+
+    @Override
+    public Object getBody(ResultSet resultSet) {
+        List<String> ret = new ArrayList<>();
+
+        Iterator<Row> iterator = resultSet.iterator();
+        while (iterator.hasNext()) {
+            Row row = iterator.next();
+            int id = row.getInt("id");
+            String name = row.getString("name");
+            ret.add("Row[" + String.valueOf(id) + ", " + name +"]");
+        }
+
+        return ret;
+    }
+}
+```
+
+Now we need to build the connector:
+
+```
+> mvn clean package
+```
+
+In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previous build
 
 ```
 > cd /home/oscerd/connectors/
-> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-cql-kafka-connector/0.5.0/camel-cql-kafka-connector-0.5.0-package.zip
-> unzip camel-cql-kafka-connector-0.5.0-package.zip
+> cp /home/oscerd/playground/cql-extended/target/cql-extended-0.5.0-package.zip .
+> unzip cql-extended-0.5.0-package.zip
 ```
 
+and we're now ready to setting up the Cassandra cluster.
+
 ## Setting up Apache Cassandra
 
 This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you'll need to run a Cassandra instance:
@@ -95,6 +193,7 @@ camel.source.path.hosts=172.17.0.2
 camel.source.path.port=9042
 camel.source.path.keyspace=test
 camel.source.endpoint.cql=select * from users
+camel.source.endpoint.resultSetConversionStrategy=#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy
 ```
 
 Now you can run the example
@@ -109,5 +208,4 @@ On a different terminal run the kafka-consumer and you should see messages to Ka
 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
 [Row[1, oscerd]]
 ```
-You can verify the behavior through the following command
 
diff --git a/cql/cql-source/config/CamelCassandraQLSourceConnector.properties b/cql/cql-source/config/CamelCassandraQLSourceConnector.properties
index c56e032..691ed12 100644
--- a/cql/cql-source/config/CamelCassandraQLSourceConnector.properties
+++ b/cql/cql-source/config/CamelCassandraQLSourceConnector.properties
@@ -27,5 +27,6 @@ camel.source.path.port=9042
 camel.source.path.keyspace=test
 camel.source.endpoint.cql=select * from users
 camel.source.endpoint.delay=10000
+camel.source.endpoint.resultSetConversionStrategy=#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy