You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/17 08:26:52 UTC

[camel-kafka-connector-examples] 01/01: Added an Infinispan Source Connector Example with auth

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch inf-source-auth
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git

commit f6f9999e85e76c822137c8430b09c45cceec9647
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Nov 17 09:26:11 2020 +0100

    Added an Infinispan Source Connector Example with auth
---
 .../README.adoc                                    | 171 +++++++++++++++++++++
 .../CamelInfinispanSourceConnector.properties      |  34 ++++
 2 files changed, 205 insertions(+)

diff --git a/infinispan/infinispan-source-with-authentication/README.adoc b/infinispan/infinispan-source-with-authentication/README.adoc
new file mode 100644
index 0000000..3152640
--- /dev/null
+++ b/infinispan/infinispan-source-with-authentication/README.adoc
@@ -0,0 +1,171 @@
+# Camel-Kafka-connector Infinispan Source With Authentication
+
+This is an example for Camel-Kafka-connector Infinispan Source With Authentication
+
+## Standalone
+
+### What is needed
+
+- An Infinispan instance
+
+### Setting up Infinispan
+
+As first step you need to download the Infinispan Server with version 11.0.5.Final.
+
+You can now start your server
+
+```
+> $INFINISPAN_HOME/bin/server.sh
+bin/server.sh 
+06:57:51,378 INFO  (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09
+06:57:51,395 INFO  (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.5.Final]
+06:57:51,396 INFO  (main) [BOOT] PID = 9678
+06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
+06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.5.Final/server/conf/infinispan.xml
+06:57:51,441 INFO  (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.5.Final/server/conf/log4j2.xml
+06:57:51,959 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
+06:57:51,960 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
+06:57:51,961 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
+06:57:51,961 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
+06:57:52,367 WARN  (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
+06:57:52,919 INFO  (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.5.Final
+06:57:52,921 INFO  (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.5.Final timestamp=2020-09-30T21:04:46.511Z
+06:57:53,046 INFO  (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
+06:57:55,138 INFO  (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator
+06:57:55,150 INFO  (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169]
+06:57:55,156 INFO  (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800]
+06:57:55,810 INFO  (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
+```
+
+So, you'll need to run
+
+```
+> $INFINISPAN_HOME/bin/cli.sh user create admin -p "password"
+> $INFINISPAN_HOME/bin/cli.sh
+[disconnected]> connect
+Username: admin
+Password: ********
+[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
+[ghost-35169@cluster//containers/default]> describe caches/mycache
+{
+  "distributed-cache" : {
+    "mode" : "SYNC",
+    "remote-timeout" : 17500,
+    "state-transfer" : {
+      "timeout" : 60000
+    },
+    "transaction" : {
+      "mode" : "NONE"
+    },
+    "locking" : {
+      "concurrency-level" : 1000,
+      "acquire-timeout" : 15000,
+      "striping" : false
+    },
+    "statistics" : true
+  }
+}
+
+```
+
+It's important to add encoding to your cache configuration, otherwise consuming events won't work.
+You'll need to:
+
+- Edit $INFINISPAN_HOME/server/data/caches.xml
+
+and add the encoding section in the configuration
+
+```
+<?xml version="1.0" ?>
+
+<infinispan>
+    <cache-container>
+        <distributed-cache mode="SYNC" remote-timeout="17500" name="mycache" statistics="true">
+            <encoding>
+                <key media-type="text/plain; charset=UTF-8"/>
+                <value media-type="text/plain; charset=UTF-8"/>
+            </encoding>
+            <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
+            <transaction mode="NONE"/>
+            <state-transfer timeout="60000"/>
+        </distributed-cache>
+    </cache-container></infinispan>
+```
+
+Now you should be able to run this example.
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+## Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+In this example we'll use `/home/oscerd/connectors/`
+
+```
+> cd /home/oscerd/connectors/
+> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.6.0/camel-infinispan-kafka-connector-0.6.0-package.zip
+> unzip camel-infinispan-kafka-connector-0.6.0-package.zip
+```
+
+Now it's time to setup the connectors
+
+Open the Infinispan source configuration file
+
+```
+name=CamelInfinispanSourceConnector
+connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+
+topics=mytopic
+
+camel.source.endpoint.hosts=localhost
+camel.source.path.cacheName=mycache
+camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED
+camel.source.endpoint.sync=false
+camel.component.infinispan.secure=true
+camel.source.endpoint.username=admin
+camel.component.infinispan.password=password
+camel.component.infinispan.saslMechanism=DIGEST-MD5
+camel.component.infinispan.securityRealm=default
+camel.component.infinispan.securityServerName=infinispan
+```
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSourceConnector.properties
+```
+
+On a different terminal run your cli.sh from the Infinispan server
+
+```
+> $INFINISPAN_HOME/bin/cli.sh
+[disconnected]> connect
+Username: admin
+Password: ********
+[ghost-43981@cluster//containers/default]> cache mycache
+[ghost-43981@cluster//containers/default/caches/mycache]> put test test
+```
+
+In another terminal, using kafkacta, you should be able to see the headers.
+
+```
+> kafkacat -b localhost:9092 -t mytopic -C -f 'Headers: %h\n'
+
+Headers: CamelHeader.CamelInfinispanCacheName=mycache,CamelHeader.CamelInfinispanEventType=CLIENT_CACHE_ENTRY_CREATED,CamelHeader.CamelInfinispanIsPre=false,CamelHeader.CamelInfinispanKey=test,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000
+% Reached end of topic mytopic [0] at offset 1
+
+```
+
diff --git a/infinispan/infinispan-source-with-authentication/config/CamelInfinispanSourceConnector.properties b/infinispan/infinispan-source-with-authentication/config/CamelInfinispanSourceConnector.properties
new file mode 100644
index 0000000..7d41e33
--- /dev/null
+++ b/infinispan/infinispan-source-with-authentication/config/CamelInfinispanSourceConnector.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelInfinispanSourceConnector
+connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+
+topics=mytopic
+
+camel.source.endpoint.hosts=localhost
+camel.source.path.cacheName=mycache
+camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED
+camel.source.endpoint.sync=false
+camel.component.infinispan.secure=true
+camel.source.endpoint.username=admin
+camel.component.infinispan.password=password
+camel.component.infinispan.saslMechanism=DIGEST-MD5
+camel.component.infinispan.securityRealm=default
+camel.component.infinispan.securityServerName=infinispan