You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/27 10:48:44 UTC

[camel-kafka-connector-examples] branch master updated: Added a Slack source example with Avro and Apicurio Registry

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b429ab  Added a Slack source example with Avro and Apicurio Registry
9b429ab is described below

commit 9b429ab158f9f7b7e916da2fd6ac185e078cdac2
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Oct 27 11:46:05 2020 +0100

    Added a Slack source example with Avro and Apicurio Registry
---
 .../README.adoc                                    | 342 +++++++++++++++++++++
 .../CamelSlackSourceApicurioConnector.properties   |  30 ++
 .../kafka-avro-basic-consumer/.gitignore           |   4 +
 .../kafka-avro-basic-consumer/README.md            |   9 +
 .../kafka-avro-basic-consumer/pom.xml              |  91 ++++++
 .../camel/kafkaconnector/SimpleConsumer.java       |  52 ++++
 .../src/main/resources/log4j2.properties           |  29 ++
 7 files changed, 557 insertions(+)

diff --git a/slack/slack-source-avro-apicurio-schema-registry/README.adoc b/slack/slack-source-avro-apicurio-schema-registry/README.adoc
new file mode 100644
index 0000000..5900039
--- /dev/null
+++ b/slack/slack-source-avro-apicurio-schema-registry/README.adoc
@@ -0,0 +1,342 @@
+# Camel-Kafka-connector Slack Source with Apicurio Registry and Avro
+
+This is an example for Camel-Kafka-connector Slack Source with Apicurio Registry and Avro
+
+## Standalone
+
+### What is needed
+
+- A Slack app
+- A Slack channel
+- An Apicurio registry instance
+
+### Setting up Slack
+
+You'll need a workspace and a channel.
+
+In your Slack settings, create an app.
+
+Add the following permissions to your Bot Token scopes:
+* channels:history
+* channels:read
+
+Install the app on your workspace and select the channel you want to consume from. 
+
+Use the Bot User OAuth Access Token as token for this example.
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+### Running Apicurio Registry
+
+In this case we'll use the in-memory docker image
+
+```
+docker run -it -p 8080:8080 apicurio/apicurio-registry-mem:1.3.1.Final
+exec java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -javaagent:/opt/agent-bond/agent-bond.jar=jmx_exporter{{9779:/opt/agent-bond/jmx_exporter_config.yml}} -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/apicurio-registry-app-1.3.1.Final-runner.jar
+__  ____  __  _____   ___  __ ____  ______ 
+ --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
+ -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
+--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.username" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.driver" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.url" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.hibernate-orm.database.generation" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:52,739 WARN  [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.password" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
+2020-10-27 06:29:53,806 INFO  [io.quarkus] (main) apicurio-registry-app 1.3.1.Final on JVM (powered by Quarkus 1.8.0.Final) started in 1.233s. Listening on: http://0.0.0.0:8080
+2020-10-27 06:29:53,806 INFO  [io.quarkus] (main) Profile prod activated. 
+2020-10-27 06:29:53,806 INFO  [io.quarkus] (main) Installed features: [cdi, resteasy, resteasy-jackson, servlet, smallrye-health, smallrye-metrics, smallrye-openapi]
+```
+
+In terms of needed running bits we are now on track.
+
+### Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+You'll need to build your connector starting from an archetype:
+
+```
+> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.5.0
+[INFO] Scanning for projects...
+[INFO] 
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO] 
+[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+[INFO] 
+[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+[INFO] 
+[INFO] 
+[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
+[INFO] Generating project in Interactive mode
+[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote
+Define value for property 'groupId': org.apache.camel.kafkaconnector
+Define value for property 'artifactId': slack-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0
+Define value for property 'package' org.apache.camel.kafkaconnector: : 
+Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.5.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector
+artifactId: slack-extended
+version: 0.5.0
+package: org.apache.camel.kafkaconnector
+camel-kafka-connector-name: camel-slack-kafka-connector
+camel-kafka-connector-version: 0.5.0
+ Y: : y
+[INFO] ----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0
+[INFO] ----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: artifactId, Value: slack-extended
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: version, Value: 0.5.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0
+[INFO] Parameter: artifactId, Value: slack-extended
+[INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time:  39.295 s
+[INFO] Finished at: 2020-10-13T09:16:51+02:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/workspace/miscellanea/slack-extended
+```
+
+Now we need to edit the POM
+
+
+```
+  .
+  .
+  .
+  <version>0.5.0</version>
+  <name>A Camel Kafka Connector extended</name>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <camel-kafka-connector-version>${project.version}</camel-kafka-connector-version>
+  </properties>
+
+    <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-api</artifactId>
+      <scope>provided</scope>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-transforms</artifactId>
+      <scope>provided</scope>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel.kafkaconnector</groupId>
+      <artifactId>camel-kafka-connector</artifactId>
+      <version>0.5.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel.kafkaconnector</groupId>
+      <artifactId>camel-slack-kafka-connector</artifactId>
+      <version>0.5.0</version>
+    </dependency>
+    <dependency>
+       <groupId>io.apicurio</groupId>
+       <artifactId>apicurio-registry-utils-converter</artifactId>
+       <version>1.3.1.Final</version>
+    </dependency>
+    <dependency>
+       <groupId>io.apicurio</groupId>
+       <artifactId>apicurio-registry-rest-client</artifactId>
+       <version>1.3.1.Final</version>
+    </dependency>
+  </dependencies>
+  .
+  .
+  .
+```
+
+and add the following class in the main package
+
+```
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.slack.source;
+
+import java.util.Map;
+
+import org.apache.camel.component.slack.helper.SlackMessage;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Transforms String-based content from Kafka into a map");
+
+    private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class);
+
+    @Override
+    public R apply(R r) {
+        Object value = r.value();
+
+        if (r.value() instanceof SlackMessage) {
+            LOG.debug("Converting record from SlackMessage to text");
+            SlackMessage message = (SlackMessage) r.value();
+
+            LOG.debug("Received text: {}", message.getText());
+
+            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+                    SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp());
+
+        } else {
+            LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}
+```
+
+Now we need to build the connector:
+
+```
+> mvn clean package
+```
+
+In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previois build
+
+```
+> cd /home/oscerd/connectors/
+> cp /home/workspace/miscellanea/slack-extended/target/slack-extended-0.5.0-package.zip .
+> unzip slack-extended-0.5.0-package.zip
+```
+
+Now it's time to setup the connector
+
+Open the Slack source apicurio configuration file
+
+```
+name=CamelSlackSourceConnector
+connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+transforms=SlackTransformer
+transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.SlackTransformer
+value.converter.apicurio.registry.url=http://localhost:8080/api
+value.converter=io.apicurio.registry.utils.converter.AvroConverter
+value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
+
+topics=mytopic
+
+camel.source.path.channel=general
+camel.source.endpoint.token=<the token created for your Bot>
+```
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSlackSourceAvroApicurioConnector.properties
+```
+
+You'll need to use a little Java Consumer to consume the messages.
+
+In the folder kafka-avro-basic-consumer run the following command:
+
+```
+mvn clean compile exec:exec -Dkafka.topic.name=mytopic
+```
+
+Send a message in your chat like Hello and you should see it logged
+
+```
+2020-10-27 11:43:42,819 [main           ] INFO  SimpleConsumer                 - Hello
+```
+
+### What happened at registry level
+
+The transform will take the text field from the SlackMessage pojo and set it as value with a schema determined by the type of the text field, so basically a String.
+
+```
+>  curl -X GET http://localhost:8080/api/artifacts/
+["mytopic-value"]
+```
+
+We have just one artifact in the registry and in the apicurio logs we should see just one single reference:
+
+```
+2020-10-27 06:30:08,175 WARN  [io.api.reg.res.ArtifactsResourceImpl] (executor-thread-1) Artifact mytopic-value/1 not indexed, status: 0
+```
+
+We can also collect some version meta info for the schema
+
+```
+curl -X GET http://localhost:8080/api/artifacts/mytopic-value/versions/1/meta
+{"version":1,"createdOn":1603780208148,"type":"KCONNECT","globalId":1,"state":"ENABLED","id":"mytopic-value"}
+```
+
+and some meta info too
+
+```
+curl -X GET http://localhost:8080/api/artifacts/mytopic-value/meta
+{"createdOn":1603780208148,"modifiedOn":1603780208148,"id":"mytopic-value","version":1,"type":"KCONNECT","globalId":1,"state":"ENABLED"}
+```
+
+and finally the schema content
+
+```
+curl -X GET http://localhost:8080/api/artifacts/mytopic-value
+"string"
+```
diff --git a/slack/slack-source-avro-apicurio-schema-registry/config/CamelSlackSourceApicurioConnector.properties b/slack/slack-source-avro-apicurio-schema-registry/config/CamelSlackSourceApicurioConnector.properties
new file mode 100644
index 0000000..3fc2c1b
--- /dev/null
+++ b/slack/slack-source-avro-apicurio-schema-registry/config/CamelSlackSourceApicurioConnector.properties
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelSlackSourceConnector
+connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+transforms=SlackTransformer
+transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.SlackTransformer
+value.converter.apicurio.registry.url=http://localhost:8080/api
+value.converter=io.apicurio.registry.utils.converter.ExtJsonConverter
+value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
+
+topics=mytopic
+
+camel.source.path.channel=general
+camel.source.endpoint.token=<token>
diff --git a/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/.gitignore b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/.gitignore
new file mode 100644
index 0000000..72bba6e
--- /dev/null
+++ b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/.gitignore
@@ -0,0 +1,4 @@
+.project
+.classpath
+.settings/
+/target
diff --git a/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/README.md b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/README.md
new file mode 100644
index 0000000..570e13c
--- /dev/null
+++ b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/README.md
@@ -0,0 +1,9 @@
+## Basic Kafka Consumer
+
+Related to camel-kafka-connector
+
+To run the producer:
+
+mvn clean compile exec:exec -Dkafka.topic.name=mytopic
+
+the default topic name is mytopic
diff --git a/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/pom.xml b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/pom.xml
new file mode 100644
index 0000000..5ba0e2c
--- /dev/null
+++ b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/pom.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>org.apache.camel.kafkaconnector</groupId>
+	<artifactId>camel-kafka-connector-avro-basic-consumer</artifactId>
+	<name>Camel Kafka Connector Avro Basic consumer</name>
+	<version>0.0.1-SNAPSHOT</version>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.12</artifactId>
+			<version>2.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>2.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-slf4j-impl</artifactId>
+			<version>2.13.3</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>2.13.3</version>
+		</dependency>
+		<dependency>
+			<groupId>io.apicurio</groupId>
+			<artifactId>apicurio-registry-utils-converter</artifactId>
+			<version>1.3.1.Final</version>
+		</dependency>
+		<dependency>
+			<groupId>io.apicurio</groupId>
+			<artifactId>apicurio-registry-rest-client</artifactId>
+			<version>1.3.1.Final</version>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>resteasy-client</artifactId>
+			<version>4.5.6.Final</version>
+		</dependency>
+	</dependencies>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<kafka.topic.name>mytopic1</kafka.topic.name>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>exec-maven-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>exec</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<executable>java</executable>
+					<arguments>
+						<argument>-classpath</argument>
+						<classpath />
+						<argument>org.apache.camel.kafkaconnector.SimpleConsumer</argument>
+						<argument>${kafka.topic.name}</argument>
+					</arguments>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.8.1</version>
+				<configuration>
+					<source>1.8</source>
+					<target>1.8</target>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+
+
+</project>
diff --git a/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/src/main/java/org/apache/camel/kafkaconnector/SimpleConsumer.java b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/src/main/java/org/apache/camel/kafkaconnector/SimpleConsumer.java
new file mode 100644
index 0000000..722717f
--- /dev/null
+++ b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/src/main/java/org/apache/camel/kafkaconnector/SimpleConsumer.java
@@ -0,0 +1,52 @@
+package org.apache.camel.kafkaconnector;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.avro.util.Utf8;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
+
+public class SimpleConsumer {
+
+	public static void main(String[] args) throws JsonProcessingException {
+		
+		final Logger LOG = LoggerFactory.getLogger(SimpleConsumer.class);
+
+		Properties props = new Properties();
+		props.put("bootstrap.servers", "localhost:9092");
+		props.put("key.deserializer", StringDeserializer.class.getName());
+		props.put("value.deserializer",AvroKafkaDeserializer.class.getName());
+		props.put("apicurio.registry.url","http://localhost:8080/api");
+		props.put("group.id", UUID.randomUUID().toString());
+		props.put("auto.offset.reset", "earliest");
+
+		KafkaConsumer<String, String> cons = new KafkaConsumer<String, String>(props);
+		List<String> topics = new ArrayList<String>();
+		topics.add(args[0]);
+		cons.subscribe(topics);
+
+
+			  while (true) {
+			    ConsumerRecords<String, String> consumerRecords = cons.poll(Duration.ofMillis(1000L));
+			    if (consumerRecords.count() > 0) {
+			    	for (Iterator iterator = consumerRecords.iterator(); iterator.hasNext();) {
+			    		ConsumerRecord<String, Utf8> rec = (ConsumerRecord<String, Utf8>) iterator.next();
+						LOG.info(((Utf8) rec.value()).toString());
+					}
+			    }
+			  }
+	}
+}
diff --git a/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/src/main/resources/log4j2.properties b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..263e717
--- /dev/null
+++ b/slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/src/main/resources/log4j2.properties
@@ -0,0 +1,29 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-basic-consumer.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = out
+