You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/15 06:11:46 UTC

[camel-kafka-connector-examples] branch slack-source-openshift created (now 247a16b)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch slack-source-openshift
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git.


      at 247a16b  Slack Source Connector example: Added openshift instruction

This branch includes the following new commits:

     new 247a16b  Slack Source Connector example: Added openshift instruction

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector-examples] 01/01: Slack Source Connector example: Added openshift instruction

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch slack-source-openshift
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git

commit 247a16b114533c0d61a6c1c0538d6d92e8419b15
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 15 08:11:03 2020 +0200

    Slack Source Connector example: Added openshift instruction
---
 slack/slack-source/README.adoc                     | 325 +++++++++++++++++++++
 .../config/openshift/slack-source.yaml             |  17 ++
 .../config/openshift/slack-token.properties        |   1 +
 3 files changed, 343 insertions(+)

diff --git a/slack/slack-source/README.adoc b/slack/slack-source/README.adoc
index e83b4a7..8bbd90b 100644
--- a/slack/slack-source/README.adoc
+++ b/slack/slack-source/README.adoc
@@ -218,3 +218,328 @@ In another terminal, using kafkacat, you should be able to see the headers.
 Topic test301[0], offset: 22, key: , payload: {"schema":{"type":"string","optional":false},"payload":"Hello"} 
 ```
 
+## Openshift
+
+### What is needed
+
+- A Slack App
+- A Slack channel
+- An Openshift instance
+
+### Running Kafka using Strimzi Operator
+
+First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project.
+We need to create security objects as part of installation so it is necessary to switch to admin user.
+If you use Minishift, you can do it with the following command:
+
+[source,bash,options="nowrap"]
+----
+oc login -u system:admin
+----
+
+We will use OpenShift project `myproject`.
+If it doesn't exist yet, you can create it using following command:
+
+[source,bash,options="nowrap"]
+----
+oc new-project myproject
+----
+
+If the project already exists, you can switch to it with:
+
+[source,bash,options="nowrap"]
+----
+oc project myproject
+----
+
+We can now install the Strimzi operator into this project:
+
+[source,bash,options="nowrap",subs="attributes"]
+----
+oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.19.0/strimzi-cluster-operator-0.19.0.yaml
+----
+
+Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:
+
+[source,bash,options="nowrap",subs="attributes"]
+----
+# Deploy a single node Kafka broker
+oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/kafka/kafka-persistent-single.yaml
+
+# Deploy a single instance of Kafka Connect with no plug-in installed
+oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/connect/kafka-connect-s2i-single-node-kafka.yaml
+----
+
+Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:
+[source,bash,options="nowrap"]
+----
+oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
+----
+
+### Add Camel Kafka connector binaries
+
+Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images.
+We now need to build the connectors and add them to the image,
+if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`)
+so that each one is in its own subfolder
+(alternatively you can download the latest officially released and packaged connectors from maven):
+
+In this case we need to extend an existing connector and add a Transform, so we need to leverage the archetype
+
+```
+> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.5.0
+[INFO] Scanning for projects...
+[INFO] 
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO] 
+[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+[INFO] 
+[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+[INFO] 
+[INFO] 
+[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
+[INFO] Generating project in Interactive mode
+[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote
+Define value for property 'groupId': org.apache.camel.kafkaconnector
+Define value for property 'artifactId': slack-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0
+Define value for property 'package' org.apache.camel.kafkaconnector: : 
+Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.5.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector
+artifactId: slack-extended
+version: 0.5.0
+package: org.apache.camel.kafkaconnector
+camel-kafka-connector-name: camel-slack-kafka-connector
+camel-kafka-connector-version: 0.5.0
+ Y: : y
+[INFO] ----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0
+[INFO] ----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: artifactId, Value: slack-extended
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0
+[INFO] Parameter: artifactId, Value: slack-extended
+[INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time:  39.295 s
+[INFO] Finished at: 2020-10-13T09:16:51+02:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/workspace/miscellanea/slack-extended
+```
+and add the following class in the main package
+
+```
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.slack.source;
+
+import java.util.Map;
+
+import org.apache.camel.component.slack.helper.SlackMessage;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Transforms String-based content from Kafka into a map");
+
+    private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class);
+
+    @Override
+    public R apply(R r) {
+        Object value = r.value();
+
+        if (r.value() instanceof SlackMessage) {
+            LOG.debug("Converting record from SlackMessage to text");
+            SlackMessage message = (SlackMessage) r.value();
+
+            LOG.debug("Received text: {}", message.getText());
+
+            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+                    SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp());
+
+        } else {
+            LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}
+```
+
+Now we need to build the connector:
+
+```
+> mvn clean package
+```
+And move the zip package in targe to my-connectors folder and unzipped it.
+
+Now we can start the build 
+
+[source,bash,options="nowrap"]
+----
+oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow
+----
+
+We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready.
+Once it is done, we can check that the connectors are available in our Kafka Connect cluster.
+Strimzi is running Kafka Connect in a distributed mode.
+
+To check the available connector plugins, you can run the following command:
+
+[source,bash,options="nowrap"]
+----
+oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
+----
+
+You should see something like this:
+
+[source,json,options="nowrap"]
+----
+[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.slack.CamelSlackSinkConnector","type":"sink","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector","type":"source","version":"0.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","v [...]
+----
+
+### Set the Bot Token as secret (optional)
+
+You can also set the aws creds option as secret, you'll need to edit the file config/aws2-s3-cred.properties with the correct credentials and then execute the following command
+
+[source,bash,options="nowrap"]
+----
+oc create secret generic slack-token --from-file=config/openshift/slack-token.properties
+----
+
+Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example:
+
+[source,bash,options="nowrap"]
+----
+spec:
+  # ...
+  config:
+    config.providers: file
+    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
+  #...
+  externalConfiguration:
+    volumes:
+      - name: slack-token
+        secret:
+          secretName: slack-token
+----
+
+In this way the secret slack-token will be mounted as volume with path /opt/kafka/external-configuration/slack-token/
+
+### Create connector instance
+
+Now we can create some instance of the Slack source connector:
+
+[source,bash,options="nowrap"]
+----
+oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
+    -H "Accept:application/json" \
+    -H "Content-Type:application/json" \
+    http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
+{
+  "name": "slack-source-connector",
+  "config": {
+    "connector.class": "org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector",
+    "tasks.max": "1",
+    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+    "transforms": "SlackTransformer",
+    "transforms.SlackTransformer.type": "org.apache.camel.kafkaconnector.SlackTransformer",
+    "topics": "slack-topic",
+    "camel.source.path.channel": "general",
+    "camel.source.endpoint.token": "<token>"
+  }
+}
+EOF
+----
+
+Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource:
+
+[source,bash,options="nowrap"]
+----
+oc apply -f - << EOF
+apiVersion: kafka.strimzi.io/v1alpha1
+kind: KafkaConnector
+metadata:
+  name: slack-source-connector
+  namespace: myproject
+  labels:
+    strimzi.io/cluster: my-connect-cluster
+spec:
+  class: org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
+  tasksMax: 1
+  config:
+    key.converter: org.apache.kafka.connect.storage.StringConverter
+    transforms: SlackTransformer
+    transforms.SlackTransformer.type: org.apache.camel.kafkaconnector.SlackTransformer
+    topics: slack-topic
+    camel.source.path.channel: general
+    camel.source.endpoint.token: token
+EOF
+----
+
+If you followed the optional step for secret webhook you can run the following command:
+
+[source,bash,options="nowrap"]
+----
+oc apply -f config/openshift/slack-source.yaml
+----
+
+Add messages to your channel for example "Hello"
+
+Using kafkacat, you should be able to see the headers.
+
+```
+>  kafkacat -b localhost:9092 -t mytopic -f 'Topic %t[%p], offset: %o, key: %k, payload: %s \n'
+Topic test301[0], offset: 22, key: , payload: {"schema":{"type":"string","optional":false},"payload":"Hello"} 
+```
+
diff --git a/slack/slack-source/config/openshift/slack-source.yaml b/slack/slack-source/config/openshift/slack-source.yaml
new file mode 100644
index 0000000..15b6e6b
--- /dev/null
+++ b/slack/slack-source/config/openshift/slack-source.yaml
@@ -0,0 +1,17 @@
+apiVersion: kafka.strimzi.io/v1alpha1
+kind: KafkaConnector
+metadata:
+  name: slack-source-connector
+  namespace: myproject
+  labels:
+    strimzi.io/cluster: my-connect-cluster
+spec:
+  class: org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
+  tasksMax: 1
+  config:
+    key.converter: org.apache.kafka.connect.storage.StringConverter
+    transforms: SlackTransformer
+    transforms.SlackTransformer.type: org.apache.camel.kafkaconnector.SlackTransformer
+    topics: slack-topic
+    camel.source.path.channel: general
+    camel.source.endpoint.token: ${file:/opt/kafka/external-configuration/slack-token/slack-token.properties:token}
diff --git a/slack/slack-source/config/openshift/slack-token.properties b/slack/slack-source/config/openshift/slack-token.properties
new file mode 100644
index 0000000..a59d2bb
--- /dev/null
+++ b/slack/slack-source/config/openshift/slack-token.properties
@@ -0,0 +1 @@
+token=xxx