You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/01 06:31:37 UTC
[camel-kafka-connector-examples] branch master updated: Added an
Infinispan Sink Example
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
The following commit(s) were added to refs/heads/master by this push:
new 846d576 Added an Infinispan Sink Example
846d576 is described below
commit 846d5761ada795314f0422e56abad2c2793d9b27
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 1 08:30:58 2020 +0200
Added an Infinispan Sink Example
---
infinispan/infinispan-sink/README.adoc | 195 +++++++++++++++++++++
.../config/CamelInfinispanSinkConnector.properties | 26 +++
2 files changed, 221 insertions(+)
diff --git a/infinispan/infinispan-sink/README.adoc b/infinispan/infinispan-sink/README.adoc
new file mode 100644
index 0000000..afc6899
--- /dev/null
+++ b/infinispan/infinispan-sink/README.adoc
@@ -0,0 +1,195 @@
+# Camel-Kafka-connector Infinispan Sink
+
+This is an example for Camel-Kafka-connector Infinispan Sink
+
+## Standalone
+
+### What is needed
+
+- An Infinispan instance
+
+### Setting up Infinispan
+
+As first step you need to download the Infinispan Server with version 11.0.3.Final.
+
+Infinispan 11.x is secured by default. For the purpose of this example we'll remove the security check, so we won't need any authentication.
+
+If you have Infinispan unzipped in $INFINISPAN_HOME, you'll need to:
+
+- Edit $INFINISPAN_HOME/server/config/infinispan.xml
+
+and the file content should be
+
+```
+<infinispan
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:infinispan:config:11.0 https://infinispan.org/schemas/infinispan-config-11.0.xsd
+ urn:infinispan:server:11.0 https://infinispan.org/schemas/infinispan-server-11.0.xsd"
+ xmlns="urn:infinispan:config:11.0"
+ xmlns:server="urn:infinispan:server:11.0">
+
+ <cache-container name="default" statistics="true">
+ <transport cluster="${infinispan.cluster.name}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
+ </cache-container>
+
+ <server xmlns="urn:infinispan:server:11.0">
+ <interfaces>
+ <interface name="public">
+ <inet-address value="${infinispan.bind.address:127.0.0.1}"/>
+ </interface>
+ </interfaces>
+
+ <socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
+ <socket-binding name="default" port="${infinispan.bind.port:11222}"/>
+ <socket-binding name="memcached" port="11221"/>
+ </socket-bindings>
+
+ <endpoints socket-binding="default">
+ <hotrod-connector name="hotrod"/>
+ <rest-connector name="rest"/>
+ </endpoints>
+ </server>
+</infinispan>
+```
+
+Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11.
+
+You can now start your server
+
+```
+> $INFINISPAN_HOME/bin/server.sh
+bin/server.sh
+06:57:51,378 INFO (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09
+06:57:51,395 INFO (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.3.Final]
+06:57:51,396 INFO (main) [BOOT] PID = 9678
+06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
+06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/infinispan.xml
+06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/log4j2.xml
+06:57:51,959 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
+06:57:51,960 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
+06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
+06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
+06:57:52,367 WARN (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
+06:57:52,919 INFO (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.3.Final
+06:57:52,921 INFO (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.3.Final timestamp=2020-09-30T21:04:46.511Z
+06:57:53,046 INFO (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
+06:57:55,138 INFO (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator
+06:57:55,150 INFO (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169]
+06:57:55,156 INFO (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800]
+06:57:55,810 INFO (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
+```
+
+So, you'll need to run
+
+```
+> cd $INFINISPAN_HOME/bin/cli.sh
+[disconnected]> connect
+[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
+[ghost-35169@cluster//containers/default]> describe caches/mycache
+{
+ "distributed-cache" : {
+ "mode" : "SYNC",
+ "remote-timeout" : 17500,
+ "state-transfer" : {
+ "timeout" : 60000
+ },
+ "transaction" : {
+ "mode" : "NONE"
+ },
+ "locking" : {
+ "concurrency-level" : 1000,
+ "acquire-timeout" : 15000,
+ "striping" : false
+ },
+ "statistics" : true
+ }
+}
+```
+Now we should be ready to work on this.
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+## Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+In this example we'll use `/home/oscerd/connectors/`
+
+```
+> cd /home/oscerd/connectors/
+> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.5.0/camel-infinispan-kafka-connector-0.5.0-package.zip
+> unzip camel-infinispan-kafka-connector-0.5.0-package.zip
+```
+
+Now it's time to setup the connectors
+
+Open the AWS2 SNS configuration file
+
+```
+name=CamelInfinispanSinkConnector
+connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.endpoint.hosts=localhost
+camel.sink.path.cacheName=mycache
+```
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSinkConnector.properties
+```
+
+On a different terminal run the kafka-producer and send messages to your Kafka Broker.
+
+```
+bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic
+Kafka to Infinispan message 1
+Kafka to Infinispan message 2
+```
+
+You should see the stats of cache changing. You can check this by running
+
+```
+> $INFINISPAN_HOME/bin/cli.sh
+[disconnected]> connect
+[ghost-35169@cluster//containers/default]> cache mycache
+[ghost-35169@cluster//containers/default/caches/mycache]> stats
+{
+ "total_number_of_entries" : 0,
+ "off_heap_memory_used" : 0,
+ "time_since_start" : 350,
+ "time_since_reset" : 350,
+ "stores" : 0,
+ "current_number_of_entries" : 0,
+ "data_memory_used" : 0,
+ "misses" : 0,
+ "remove_hits" : 0,
+ "remove_misses" : 0,
+ "evictions" : 0,
+ "average_read_time" : 0,
+ "average_read_time_nanos" : 0,
+ "average_write_time" : 0,
+ "average_write_time_nanos" : 0,
+ "average_remove_time" : 0,
+ "average_remove_time_nanos" : 0,
+ "required_minimum_number_of_nodes" : 1,
+ "current_number_of_entries_in_memory" : 0,
+ "retrievals" : 0,
+ "hits" : 0
+}
+```
+
diff --git a/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties b/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties
new file mode 100644
index 0000000..260c7ed
--- /dev/null
+++ b/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelInfinispanSinkConnector
+connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.endpoint.hosts=localhost
+camel.sink.path.cacheName=mycache