You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/15 17:10:57 UTC

[camel-kafka-connector-examples] branch aws2-kinesis-sink created (now e11e59c)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch aws2-kinesis-sink
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git.


      at e11e59c  Added an AWS2-Kinesis sink example

This branch includes the following new commits:

     new e11e59c  Added an AWS2-Kinesis sink example

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector-examples] 01/01: Added an AWS2-Kinesis sink example

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch aws2-kinesis-sink
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git

commit e11e59c000cc51f377fed5bf69516c1f20c376c9
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Jan 15 18:09:23 2021 +0100

    Added an AWS2-Kinesis sink example
---
 aws2-kinesis/aws2-kinesis-sink/README.adoc         | 113 +++++++++++++++++++++
 .../CamelAWS2KinesisSinkConnector.properties       |  27 +++++
 2 files changed, 140 insertions(+)

diff --git a/aws2-kinesis/aws2-kinesis-sink/README.adoc b/aws2-kinesis/aws2-kinesis-sink/README.adoc
new file mode 100644
index 0000000..ca093b5
--- /dev/null
+++ b/aws2-kinesis/aws2-kinesis-sink/README.adoc
@@ -0,0 +1,113 @@
+# Camel-Kafka-connector AWS2 Kinesis Sink
+
+This is an example for Camel-Kafka-connector AWS2-Kinesis Sink
+
+## Standalone
+
+### What is needed
+
+- An AWS Kinesis stream
+- The AWS CLI locally
+- Some work on AWS console
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+### Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+In this example we'll use `/home/oscerd/connectors/`
+
+```
+> cd /home/oscerd/connectors/
+> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-kinesis-kafka-connector/0.7.0/camel-aws2-kinesis-kafka-connector-0.7.0-package.zip
+> unzip camel-aws2-kinesis-kafka-connector-0.7.0-package.zip
+```
+
+On AWS console create a Kinesis stream delivery stream named streamTest.
+
+Now it's time to setup the connectors
+
+Open the AWS2 Kinesis configuration file
+
+```
+name=CamelAws2-kinesisSinkConnector
+connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.path.streamName=streamTest
+camel.sink.endpoint.accessKey=xxxx
+camel.sink.endpoint.secretKey=yyyy
+camel.sink.endpoint.region=region
+```
+
+and add the correct credentials for AWS.
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelAWS2KinesisSinkConnector.properties
+```
+
+Now send message to kafka topic in this way for example:
+
+```
+> echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.CamelAwsKinesisPartitionKey=partition-1" -t mytopic
+% Auto-selecting Producer mode (use -P or -C to override)
+> echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.CamelAwsKinesisPartitionKey=partition-1" -t mytopic
+% Auto-selecting Producer mode (use -P or -C to override)
+```
+
+To verify the record are present in the streamTest stream we can the AWS CLI
+
+First we need to get the shardIterator
+
+```
+> aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name streamTest
+{
+    "ShardIterator": "AAAAAAAAAAGxdqX2OPHzjl3szvOLjdl21ylngnoD9zW3PSvRZHvQu825c0TCgA/M4Z5/dzZzBIJ1JR6h4VF2kmqFsEHOHXQ7gBq1mqXsBxUdk8Xvj1EkzUIbi3tcQFdmXSgW0O+9oTIJZ5ljiWFAwd1Czx1BsiB2c2RcqKUz/nRJjNL5MQBKywKuDEcplfVh+C2NnOCFdKqIamH0KeuK0UXhSHK1ghlW"
+}
+```
+
+After this we'll need to perform a get-records operation
+
+> aws kinesis get-records --shard-iterator AAAAAAAAAAGxdqX2OPHzjl3szvOLjdl21ylngnoD9zW3PSvRZHvQu825c0TCgA/M4Z5/dzZzBIJ1JR6h4VF2kmqFsEHOHXQ7gBq1mqXsBxUdk8Xvj1EkzUIbi3tcQFdmXSgW0O+9oTIJZ5ljiWFAwd1Czx1BsiB2c2RcqKUz/nRJjNL5MQBKywKuDEcplfVh+C2NnOCFdKqIamH0KeuK0UXhSHK1ghlW
+{
+    "Records": [
+        {
+            "Data": "aGVsbG8gdGhlcmU=", 
+            "PartitionKey": "partition-1", 
+            "ApproximateArrivalTimestamp": 1610729857.904, 
+            "SequenceNumber": "49614584677004495689019783087056269304781414429070721026"
+        }, 
+        {
+            "Data": "aGVsbG8gdGhlcmU=", 
+            "PartitionKey": "partition-1", 
+            "ApproximateArrivalTimestamp": 1610729861.765, 
+            "SequenceNumber": "49614584677004495689019783087057478230601029333123334146"
+        }
+    ], 
+    "NextShardIterator": "AAAAAAAAAAFWEhvAPrJc6dctkUTv5cFSIIcaQshFYv5wtlofGWJfmi8NjQljI5B4xzdVTE23zik9sbx+G0+T8CxTXScStjWVcZMNRi0Gt11lE0a8a+WkzP5/Zmm8Gf6X6f3w5P/tNzRUFCQc+Tg7eNOeevjiyRdn0271qOtfk5gS7NVtSaSGq13CwV3FWcCN2FzE9F8K04+8YihNrvBNhcuFIU3jyBhY", 
+    "MillisBehindLatest": 0
+}
+```
+
+As you may see we have now two records.
+
+
+
+
diff --git a/aws2-kinesis/aws2-kinesis-sink/config/CamelAWS2KinesisSinkConnector.properties b/aws2-kinesis/aws2-kinesis-sink/config/CamelAWS2KinesisSinkConnector.properties
new file mode 100644
index 0000000..71b926e
--- /dev/null
+++ b/aws2-kinesis/aws2-kinesis-sink/config/CamelAWS2KinesisSinkConnector.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelAws2-kinesisSinkConnector
+connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+camel.sink.path.streamName=streamTest
+camel.sink.endpoint.accessKey=xxxx
+camel.sink.endpoint.secretKey=xxxx
+camel.sink.endpoint.region=region