You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/22 16:43:35 UTC

[camel-kafka-connector-examples] branch cql-source-fix created (now 342bb5a)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch cql-source-fix
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git.


      at 342bb5a  Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component

This branch includes the following new commits:

     new 342bb5a  Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector-examples] 01/01: Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch cql-source-fix
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git

commit 342bb5abf1c21d9b3c53636d2a37437e7911b66c
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 22 18:42:30 2020 +0200

    Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component
---
 cql/cql-source/README.adoc                         | 106 ++++++++++++++++++++-
 .../CamelCassandraQLSourceConnector.properties     |   1 +
 2 files changed, 103 insertions(+), 4 deletions(-)

diff --git a/cql/cql-source/README.adoc b/cql/cql-source/README.adoc
index 8c874e4..76362d0 100644
--- a/cql/cql-source/README.adoc
+++ b/cql/cql-source/README.adoc
@@ -24,14 +24,112 @@ Open the `$KAFKA_HOME/config/connect-standalone.properties`
 
 and set the `plugin.path` property to your choosen location
 
-In this example we'll use `/home/oscerd/connectors/`
+You'll need to build your connector starting from an archetype:
+
+```
+> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.5.0
+[INFO] Scanning for projects...
+[INFO] 
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO] 
+[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+[INFO] 
+[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+[INFO] 
+[INFO] 
+[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
+[INFO] Generating project in Interactive mode
+Define value for property 'groupId': org.apache.camel.kafkaconnector.cql.extended
+Define value for property 'artifactId': cql-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0
+Define value for property 'package' org.apache.camel.kafkaconnector.cql.extended: : 
+Define value for property 'camel-kafka-connector-name': camel-cql-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.5.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector.cql.extended
+artifactId: cql-extended
+version: 0.5.0
+package: org.apache.camel.kafkaconnector.cql.extended
+camel-kafka-connector-name: camel-cql-kafka-connector
+camel-kafka-connector-version: 0.5.0
+ Y: : Y
+[INFO] ----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0
+[INFO] ----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended
+[INFO] Parameter: artifactId, Value: cql-extended
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/cql/extended
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended
+[INFO] Parameter: camel-kafka-connector-name, Value: camel-cql-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0
+[INFO] Parameter: artifactId, Value: cql-extended
+[INFO] Project created from Archetype in dir: /home/oscerd/playground/cql-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time:  55.314 s
+[INFO] Finished at: 2020-10-22T18:06:34+02:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/oscerd/playground/cql-extended
+```
+
+Import the cql-extended project in your favorite IDE and create the following class as RowConversionStrategy
+
+```
+package org.apache.camel.kafkaconnector.cql.extended;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import org.apache.camel.component.cassandra.ResultSetConversionStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RowConversionStrategy implements ResultSetConversionStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(RowConversionStrategy.class);
+
+    @Override
+    public Object getBody(ResultSet resultSet) {
+        List<String> ret = new ArrayList<>();
+
+        Iterator<Row> iterator = resultSet.iterator();
+        while (iterator.hasNext()) {
+            Row row = iterator.next();
+            int id = row.getInt("id");
+            String name = row.getString("name");
+            ret.add("Row[" + String.valueOf(id) + ", " + name +"]");
+        }
+
+        return ret;
+    }
+}
+```
+
+Now we need to build the connector:
+
+```
+> mvn clean package
+```
+
+In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previous build
 
 ```
 > cd /home/oscerd/connectors/
-> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-cql-kafka-connector/0.5.0/camel-cql-kafka-connector-0.5.0-package.zip
-> unzip camel-cql-kafka-connector-0.5.0-package.zip
+> cp /home/oscerd/playground/cql-extended/target/cql-extended-0.5.0-package.zip .
+> unzip cql-extended-0.5.0-package.zip
 ```
 
+and we're now ready to setting up the Cassandra cluster.
+
 ## Setting up Apache Cassandra
 
 This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you'll need to run a Cassandra instance:
@@ -95,6 +193,7 @@ camel.source.path.hosts=172.17.0.2
 camel.source.path.port=9042
 camel.source.path.keyspace=test
 camel.source.endpoint.cql=select * from users
+camel.source.endpoint.resultSetConversionStrategy=#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy
 ```
 
 Now you can run the example
@@ -109,5 +208,4 @@ On a different terminal run the kafka-consumer and you should see messages to Ka
 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
 [Row[1, oscerd]]
 ```
-You can verify the behavior through the following command
 
diff --git a/cql/cql-source/config/CamelCassandraQLSourceConnector.properties b/cql/cql-source/config/CamelCassandraQLSourceConnector.properties
index c56e032..691ed12 100644
--- a/cql/cql-source/config/CamelCassandraQLSourceConnector.properties
+++ b/cql/cql-source/config/CamelCassandraQLSourceConnector.properties
@@ -27,5 +27,6 @@ camel.source.path.port=9042
 camel.source.path.keyspace=test
 camel.source.endpoint.cql=select * from users
 camel.source.endpoint.delay=10000
+camel.source.endpoint.resultSetConversionStrategy=#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy