You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/01 06:31:37 UTC

[camel-kafka-connector-examples] branch master updated: Added an Infinispan Sink Example

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new 846d576  Added an Infinispan Sink Example
846d576 is described below

commit 846d5761ada795314f0422e56abad2c2793d9b27
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 1 08:30:58 2020 +0200

    Added an Infinispan Sink Example
---
 infinispan/infinispan-sink/README.adoc             | 195 +++++++++++++++++++++
 .../config/CamelInfinispanSinkConnector.properties |  26 +++
 2 files changed, 221 insertions(+)

diff --git a/infinispan/infinispan-sink/README.adoc b/infinispan/infinispan-sink/README.adoc
new file mode 100644
index 0000000..afc6899
--- /dev/null
+++ b/infinispan/infinispan-sink/README.adoc
@@ -0,0 +1,195 @@
+# Camel-Kafka-connector Infinispan Sink
+
+This is an example for Camel-Kafka-connector Infinispan Sink 
+
+## Standalone
+
+### What is needed
+
+- An Infinispan instance
+
+### Setting up Infinispan
+
+As first step you need to download the Infinispan Server with version 11.0.3.Final.
+
+Infinispan 11.x is secured by default. For the purpose of this example we'll remove the security check, so we won't need any authentication.
+
+If you have Infinispan unzipped in $INFINISPAN_HOME, you'll need to:
+
+- Edit $INFINISPAN_HOME/server/config/infinispan.xml
+
+and the file content should be
+
+```
+<infinispan
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="urn:infinispan:config:11.0 https://infinispan.org/schemas/infinispan-config-11.0.xsd
+                            urn:infinispan:server:11.0 https://infinispan.org/schemas/infinispan-server-11.0.xsd"
+        xmlns="urn:infinispan:config:11.0"
+        xmlns:server="urn:infinispan:server:11.0">
+
+   <cache-container name="default" statistics="true">
+      <transport cluster="${infinispan.cluster.name}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
+   </cache-container>
+
+   <server xmlns="urn:infinispan:server:11.0">
+      <interfaces>
+         <interface name="public">
+            <inet-address value="${infinispan.bind.address:127.0.0.1}"/>
+         </interface>
+      </interfaces>
+
+      <socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
+         <socket-binding name="default" port="${infinispan.bind.port:11222}"/>
+         <socket-binding name="memcached" port="11221"/>
+      </socket-bindings>
+
+      <endpoints socket-binding="default">
+         <hotrod-connector name="hotrod"/>
+         <rest-connector name="rest"/>
+      </endpoints>
+   </server>
+</infinispan>
+```
+
+Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11.
+
+You can now start your server
+
+```
+> $INFINISPAN_HOME/bin/server.sh
+bin/server.sh 
+06:57:51,378 INFO  (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09
+06:57:51,395 INFO  (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.3.Final]
+06:57:51,396 INFO  (main) [BOOT] PID = 9678
+06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
+06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/infinispan.xml
+06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/log4j2.xml
+06:57:51,959 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
+06:57:51,960 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
+06:57:51,961 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
+06:57:51,961 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
+06:57:52,367 WARN  (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
+06:57:52,919 INFO  (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.3.Final
+06:57:52,921 INFO  (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.3.Final timestamp=2020-09-30T21:04:46.511Z
+06:57:53,046 INFO  (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
+06:57:55,138 INFO  (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator
+06:57:55,150 INFO  (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169]
+06:57:55,156 INFO  (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800]
+06:57:55,810 INFO  (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
+```
+
+So, you'll need to run
+
+```
+> cd $INFINISPAN_HOME/bin/cli.sh
+[disconnected]> connect
+[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
+[ghost-35169@cluster//containers/default]> describe caches/mycache
+{
+  "distributed-cache" : {
+    "mode" : "SYNC",
+    "remote-timeout" : 17500,
+    "state-transfer" : {
+      "timeout" : 60000
+    },
+    "transaction" : {
+      "mode" : "NONE"
+    },
+    "locking" : {
+      "concurrency-level" : 1000,
+      "acquire-timeout" : 15000,
+      "striping" : false
+    },
+    "statistics" : true
+  }
+}
+```
+Now we should be ready to work on this.
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+## Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+In this example we'll use `/home/oscerd/connectors/`
+
+```
+> cd /home/oscerd/connectors/
+> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.5.0/camel-infinispan-kafka-connector-0.5.0-package.zip
+> unzip camel-infinispan-kafka-connector-0.5.0-package.zip
+```
+
+Now it's time to setup the connectors
+
+Open the AWS2 SNS configuration file
+
+```
+name=CamelInfinispanSinkConnector
+connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.endpoint.hosts=localhost
+camel.sink.path.cacheName=mycache
+```
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSinkConnector.properties
+```
+
+On a different terminal run the kafka-producer and send messages to your Kafka Broker.
+
+```
+bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic
+Kafka to Infinispan message 1
+Kafka to Infinispan message 2
+```
+
+You should see the stats of cache changing. You can check this by running
+
+```
+> $INFINISPAN_HOME/bin/cli.sh
+[disconnected]> connect
+[ghost-35169@cluster//containers/default]> cache mycache
+[ghost-35169@cluster//containers/default/caches/mycache]> stats
+{
+  "total_number_of_entries" : 0,
+  "off_heap_memory_used" : 0,
+  "time_since_start" : 350,
+  "time_since_reset" : 350,
+  "stores" : 0,
+  "current_number_of_entries" : 0,
+  "data_memory_used" : 0,
+  "misses" : 0,
+  "remove_hits" : 0,
+  "remove_misses" : 0,
+  "evictions" : 0,
+  "average_read_time" : 0,
+  "average_read_time_nanos" : 0,
+  "average_write_time" : 0,
+  "average_write_time_nanos" : 0,
+  "average_remove_time" : 0,
+  "average_remove_time_nanos" : 0,
+  "required_minimum_number_of_nodes" : 1,
+  "current_number_of_entries_in_memory" : 0,
+  "retrievals" : 0,
+  "hits" : 0
+}
+```
+
diff --git a/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties b/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties
new file mode 100644
index 0000000..260c7ed
--- /dev/null
+++ b/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelInfinispanSinkConnector
+connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.endpoint.hosts=localhost
+camel.sink.path.cacheName=mycache