You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/27 07:17:09 UTC

[camel-kafka-connector-examples] branch master updated: Slack Source Example with Apicurio Registry

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new 550262d  Slack Source Example with Apicurio Registry
550262d is described below

commit 550262d697f8e6cc696ed5fa8e383d6dcc1b6702
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Oct 27 08:15:32 2020 +0100

    Slack Source Example with Apicurio Registry
---
 .../README.adoc                                    | 337 +++++++++++++++++++++
 .../CamelSlackSourceApicurioConnector.properties   |  30 ++
 2 files changed, 367 insertions(+)

diff --git a/slack/slack-source-apicurio-schema-registry/README.adoc b/slack/slack-source-apicurio-schema-registry/README.adoc
new file mode 100644
index 0000000..339f1f6
--- /dev/null
+++ b/slack/slack-source-apicurio-schema-registry/README.adoc
@@ -0,0 +1,337 @@
+# Camel-Kafka-connector Slack Source
+
+This is an example for Camel-Kafka-connector Slack Source 
+
+## Standalone
+
+### What is needed
+
+- A Slack app
+- A Slack channel
+- An Apicurio registry instance
+
+### Setting up Slack
+
+You'll need a workspace and a channel.
+
+In your Slack settings, create an app.
+
+Add the following permissions to your Bot Token scopes:
+* channels:history
+* channels:read
+
+Install the app on your workspace and select the channel you want to consume from. 
+
+Use the Bot User OAuth Access Token as token for this example.
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+### Running Apicurio Registry
+
+In this case we'll use the in-memory docker image
+
+```
+docker run -it -p 8080:8080 apicurio/apicurio-registry-mem:1.3.1.Final
+exec java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -javaagent:/opt/agent-bond/agent-bond.jar=jmx_exporter{{9779:/opt/agent-bond/jmx_exporter_config.yml}} -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/apicurio-registry-app-1.3.1.Final-runner.jar
+__  ____  __  _____   ___  __ ____  ______ 
+ --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
+ -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
+--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.username" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.driver" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.url" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.hibernate-orm.database.generation" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.password" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:53,806 INFO  [io.quarkus] (main) apicurio-registry-app 1.3.1.Final on JVM (powered by Quarkus 1.8.0.Final) started in 1.233s. Listening on: http://0.0.0.0:8080
+2020-10-27 06:29:53,806 INFO  [io.quarkus] (main) Profile prod activated. 
+2020-10-27 06:29:53,806 INFO  [io.quarkus] (main) Installed features: [cdi, resteasy, resteasy-jackson, servlet, smallrye-health, smallrye-metrics, smallrye-openapi]
+```
+
+In terms of needed running bits we are now on track.
+
+### Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+You'll need to build your connector starting from an archetype:
+
+```
+> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.5.0
+[INFO] Scanning for projects...
+[INFO] 
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO] 
+[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+[INFO] 
+[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+[INFO] 
+[INFO] 
+[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
+[INFO] Generating project in Interactive mode
+[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote
+Define value for property 'groupId': org.apache.camel.kafkaconnector
+Define value for property 'artifactId': slack-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0
+Define value for property 'package' org.apache.camel.kafkaconnector: : 
+Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.5.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector
+artifactId: slack-extended
+version: 0.5.0
+package: org.apache.camel.kafkaconnector
+camel-kafka-connector-name: camel-slack-kafka-connector
+camel-kafka-connector-version: 0.5.0
+ Y: : y
+[INFO] ----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0
+[INFO] ----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: artifactId, Value: slack-extended
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0
+[INFO] Parameter: artifactId, Value: slack-extended
+[INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time:  39.295 s
+[INFO] Finished at: 2020-10-13T09:16:51+02:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/workspace/miscellanea/slack-extended
+```
+
+Now we need to edit the POM
+
+
+```
+  .
+  .
+  .
+  <version>0.5.0</version>
+  <name>A Camel Kafka Connector extended</name>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <camel-kafka-connector-version>${project.version}</camel-kafka-connector-version>
+  </properties>
+
+    <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-api</artifactId>
+      <scope>provided</scope>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-transforms</artifactId>
+      <scope>provided</scope>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel.kafkaconnector</groupId>
+      <artifactId>camel-kafka-connector</artifactId>
+      <version>0.5.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel.kafkaconnector</groupId>
+      <artifactId>camel-slack-kafka-connector</artifactId>
+      <version>0.5.0</version>
+    </dependency>
+    <dependency>
+       <groupId>io.apicurio</groupId>
+       <artifactId>apicurio-registry-utils-converter</artifactId>
+       <version>1.3.1.Final</version>
+    </dependency>
+    <dependency>
+       <groupId>io.apicurio</groupId>
+       <artifactId>apicurio-registry-rest-client</artifactId>
+       <version>1.3.1.Final</version>
+    </dependency>
+  </dependencies>
+  .
+  .
+  .
+```
+
+and add the following class in the main package
+
+```
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.slack.source;
+
+import java.util.Map;
+
+import org.apache.camel.component.slack.helper.SlackMessage;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Transforms String-based content from Kafka into a map");
+
+    private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class);
+
+    @Override
+    public R apply(R r) {
+        Object value = r.value();
+
+        if (r.value() instanceof SlackMessage) {
+            LOG.debug("Converting record from SlackMessage to text");
+            SlackMessage message = (SlackMessage) r.value();
+
+            LOG.debug("Received text: {}", message.getText());
+
+            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+                    SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp());
+
+        } else {
+            LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}
+```
+
+Now we need to build the connector:
+
+```
+> mvn clean package
+```
+
+In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previois build
+
+```
+> cd /home/oscerd/connectors/
+> cp /home/workspace/miscellanea/slack-extended/target/slack-extended-0.5.0-package.zip .
+> unzip slack-extended-0.5.0-package.zip
+```
+
+Now it's time to setup the connector
+
+Open the Slack source apicurio configuration file
+
+```
+name=CamelSlackSourceConnector
+connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+transforms=SlackTransformer
+transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.SlackTransformer
+value.converter.apicurio.registry.url=http://localhost:8080/api
+value.converter=io.apicurio.registry.utils.converter.ExtJsonConverter
+value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
+
+topics=mytopic
+
+camel.source.path.channel=general
+camel.source.endpoint.token=<the token created for your Bot>
+```
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSlackSourceApicurioConnector.properties
+```
+
+Add messages to your channel for example "Hello"
+
+In another terminal, using kafkacat, you should be able to see body.
+
+```
+> kafkacat -b localhost:9092 -t mytopic
+{"schemaId":1,"payload":"Hello"}
+```
+
+### What happened at registry level
+
+The transform will take the text field from the SlackMessage pojo and set it as value with a schema determined by the type of the text field, so basically a String.
+
+```
+>  curl -X GET http://localhost:8080/api/artifacts/
+["mytopic-value"]
+```
+
+We have just one artifact in the registry and in the apicurio logs we should see just one single reference:
+
+```
+2020-10-27 06:30:08,175 WARN  [io.api.reg.res.ArtifactsResourceImpl] (executor-thread-1) Artifact mytopic-value/1 not indexed, status: 0
+```
+
+We can also collect some version meta info for the schema
+
+```
+curl -X GET http://localhost:8080/api/artifacts/mytopic-value/versions/1/meta
+{"version":1,"createdOn":1603780208148,"type":"KCONNECT","globalId":1,"state":"ENABLED","id":"mytopic-value"}
+```
+
+and some meta info too
+
+```
+curl -X GET http://localhost:8080/api/artifacts/mytopic-value/meta
+{"createdOn":1603780208148,"modifiedOn":1603780208148,"id":"mytopic-value","version":1,"type":"KCONNECT","globalId":1,"state":"ENABLED"}
+```
+
+and finally the schema content
+
+```
+curl -X GET http://localhost:8080/api/artifacts/mytopic-value
+{"type":"string","optional":false}
+```
diff --git a/slack/slack-source-apicurio-schema-registry/config/CamelSlackSourceApicurioConnector.properties b/slack/slack-source-apicurio-schema-registry/config/CamelSlackSourceApicurioConnector.properties
new file mode 100644
index 0000000..3fc2c1b
--- /dev/null
+++ b/slack/slack-source-apicurio-schema-registry/config/CamelSlackSourceApicurioConnector.properties
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelSlackSourceConnector
+connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+transforms=SlackTransformer
+transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.SlackTransformer
+value.converter.apicurio.registry.url=http://localhost:8080/api
+value.converter=io.apicurio.registry.utils.converter.ExtJsonConverter
+value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
+
+topics=mytopic
+
+camel.source.path.channel=general
+camel.source.endpoint.token=<token>