You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/15 06:11:47 UTC

[camel-kafka-connector-examples] 01/01: Slack Source Connector example: Added openshift instruction

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch slack-source-openshift
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git

commit 247a16b114533c0d61a6c1c0538d6d92e8419b15
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 15 08:11:03 2020 +0200

    Slack Source Connector example: Added openshift instruction
---
 slack/slack-source/README.adoc                     | 325 +++++++++++++++++++++
 .../config/openshift/slack-source.yaml             |  17 ++
 .../config/openshift/slack-token.properties        |   1 +
 3 files changed, 343 insertions(+)

diff --git a/slack/slack-source/README.adoc b/slack/slack-source/README.adoc
index e83b4a7..8bbd90b 100644
--- a/slack/slack-source/README.adoc
+++ b/slack/slack-source/README.adoc
@@ -218,3 +218,328 @@ In another terminal, using kafkacat, you should be able to see the headers.
 Topic test301[0], offset: 22, key: , payload: {"schema":{"type":"string","optional":false},"payload":"Hello"} 
 ```
 
+## Openshift
+
+### What is needed
+
+- A Slack App
+- A Slack channel
+- An Openshift instance
+
+### Running Kafka using Strimzi Operator
+
+First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project.
+We need to create security objects as part of installation so it is necessary to switch to admin user.
+If you use Minishift, you can do it with the following command:
+
+[source,bash,options="nowrap"]
+----
+oc login -u system:admin
+----
+
+We will use OpenShift project `myproject`.
+If it doesn't exist yet, you can create it using following command:
+
+[source,bash,options="nowrap"]
+----
+oc new-project myproject
+----
+
+If the project already exists, you can switch to it with:
+
+[source,bash,options="nowrap"]
+----
+oc project myproject
+----
+
+We can now install the Strimzi operator into this project:
+
+[source,bash,options="nowrap",subs="attributes"]
+----
+oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.19.0/strimzi-cluster-operator-0.19.0.yaml
+----
+
+Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:
+
+[source,bash,options="nowrap",subs="attributes"]
+----
+# Deploy a single node Kafka broker
+oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/kafka/kafka-persistent-single.yaml
+
+# Deploy a single instance of Kafka Connect with no plug-in installed
+oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/connect/kafka-connect-s2i-single-node-kafka.yaml
+----
+
+Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:
+[source,bash,options="nowrap"]
+----
+oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
+----
+
+### Add Camel Kafka connector binaries
+
+Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images.
+We now need to build the connectors and add them to the image,
+if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`)
+so that each one is in its own subfolder
+(alternatively you can download the latest officially released and packaged connectors from maven):
+
+In this case we need to extend an existing connector and add a Transform, so we need to leverage the archetype
+
+```
+> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.5.0
+[INFO] Scanning for projects...
+[INFO] 
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO] 
+[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+[INFO] 
+[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+[INFO] 
+[INFO] 
+[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
+[INFO] Generating project in Interactive mode
+[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote
+Define value for property 'groupId': org.apache.camel.kafkaconnector
+Define value for property 'artifactId': slack-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0
+Define value for property 'package' org.apache.camel.kafkaconnector: : 
+Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.5.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector
+artifactId: slack-extended
+version: 0.5.0
+package: org.apache.camel.kafkaconnector
+camel-kafka-connector-name: camel-slack-kafka-connector
+camel-kafka-connector-version: 0.5.0
+ Y: : y
+[INFO] ----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0
+[INFO] ----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: artifactId, Value: slack-extended
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0
+[INFO] Parameter: artifactId, Value: slack-extended
+[INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time:  39.295 s
+[INFO] Finished at: 2020-10-13T09:16:51+02:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/workspace/miscellanea/slack-extended
+```
+and add the following class in the main package
+
+```
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.slack.source;
+
+import java.util.Map;
+
+import org.apache.camel.component.slack.helper.SlackMessage;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Transforms String-based content from Kafka into a map");
+
+    private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class);
+
+    @Override
+    public R apply(R r) {
+        Object value = r.value();
+
+        if (r.value() instanceof SlackMessage) {
+            LOG.debug("Converting record from SlackMessage to text");
+            SlackMessage message = (SlackMessage) r.value();
+
+            LOG.debug("Received text: {}", message.getText());
+
+            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+                    SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp());
+
+        } else {
+            LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}
+```
+
+Now we need to build the connector:
+
+```
+> mvn clean package
+```
+And move the zip package in targe to my-connectors folder and unzipped it.
+
+Now we can start the build 
+
+[source,bash,options="nowrap"]
+----
+oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow
+----
+
+We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready.
+Once it is done, we can check that the connectors are available in our Kafka Connect cluster.
+Strimzi is running Kafka Connect in a distributed mode.
+
+To check the available connector plugins, you can run the following command:
+
+[source,bash,options="nowrap"]
+----
+oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
+----
+
+You should see something like this:
+
+[source,json,options="nowrap"]
+----
+[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.slack.CamelSlackSinkConnector","type":"sink","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector","type":"source","version":"0.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","v [...]
+----
+
+### Set the Bot Token as secret (optional)
+
+You can also set the aws creds option as secret, you'll need to edit the file config/aws2-s3-cred.properties with the correct credentials and then execute the following command
+
+[source,bash,options="nowrap"]
+----
+oc create secret generic slack-token --from-file=config/openshift/slack-token.properties
+----
+
+Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example:
+
+[source,bash,options="nowrap"]
+----
+spec:
+  # ...
+  config:
+    config.providers: file
+    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
+  #...
+  externalConfiguration:
+    volumes:
+      - name: slack-token
+        secret:
+          secretName: slack-token
+----
+
+In this way the secret slack-token will be mounted as volume with path /opt/kafka/external-configuration/slack-token/
+
+### Create connector instance
+
+Now we can create some instance of the Slack source connector:
+
+[source,bash,options="nowrap"]
+----
+oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
+    -H "Accept:application/json" \
+    -H "Content-Type:application/json" \
+    http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
+{
+  "name": "slack-source-connector",
+  "config": {
+    "connector.class": "org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector",
+    "tasks.max": "1",
+    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+    "transforms": "SlackTransformer",
+    "transforms.SlackTransformer.type": "org.apache.camel.kafkaconnector.SlackTransformer",
+    "topics": "slack-topic",
+    "camel.source.path.channel": "general",
+    "camel.source.endpoint.token": "<token>"
+  }
+}
+EOF
+----
+
+Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource:
+
+[source,bash,options="nowrap"]
+----
+oc apply -f - << EOF
+apiVersion: kafka.strimzi.io/v1alpha1
+kind: KafkaConnector
+metadata:
+  name: slack-source-connector
+  namespace: myproject
+  labels:
+    strimzi.io/cluster: my-connect-cluster
+spec:
+  class: org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
+  tasksMax: 1
+  config:
+    key.converter: org.apache.kafka.connect.storage.StringConverter
+    transforms: SlackTransformer
+    transforms.SlackTransformer.type: org.apache.camel.kafkaconnector.SlackTransformer
+    topics: slack-topic
+    camel.source.path.channel: general
+    camel.source.endpoint.token: token
+EOF
+----
+
+If you followed the optional step for secret webhook you can run the following command:
+
+[source,bash,options="nowrap"]
+----
+oc apply -f config/openshift/slack-source.yaml
+----
+
+Add messages to your channel for example "Hello"
+
+Using kafkacat, you should be able to see the headers.
+
+```
+>  kafkacat -b localhost:9092 -t mytopic -f 'Topic %t[%p], offset: %o, key: %k, payload: %s \n'
+Topic test301[0], offset: 22, key: , payload: {"schema":{"type":"string","optional":false},"payload":"Hello"} 
+```
+
diff --git a/slack/slack-source/config/openshift/slack-source.yaml b/slack/slack-source/config/openshift/slack-source.yaml
new file mode 100644
index 0000000..15b6e6b
--- /dev/null
+++ b/slack/slack-source/config/openshift/slack-source.yaml
@@ -0,0 +1,17 @@
+apiVersion: kafka.strimzi.io/v1alpha1
+kind: KafkaConnector
+metadata:
+  name: slack-source-connector
+  namespace: myproject
+  labels:
+    strimzi.io/cluster: my-connect-cluster
+spec:
+  class: org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
+  tasksMax: 1
+  config:
+    key.converter: org.apache.kafka.connect.storage.StringConverter
+    transforms: SlackTransformer
+    transforms.SlackTransformer.type: org.apache.camel.kafkaconnector.SlackTransformer
+    topics: slack-topic
+    camel.source.path.channel: general
+    camel.source.endpoint.token: ${file:/opt/kafka/external-configuration/slack-token/slack-token.properties:token}
diff --git a/slack/slack-source/config/openshift/slack-token.properties b/slack/slack-source/config/openshift/slack-token.properties
new file mode 100644
index 0000000..a59d2bb
--- /dev/null
+++ b/slack/slack-source/config/openshift/slack-token.properties
@@ -0,0 +1 @@
+token=xxx