You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/04 06:59:17 UTC
[camel-kafka-connector-examples] 01/02: Slack Source Avro Schema
Reg Example: Use built-in slack Transform
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
commit 3d199b10ee5feb4368772a77cc52c9b7cc1719a8
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Nov 4 07:52:32 2020 +0100
Slack Source Avro Schema Reg Example: Use built-in slack Transform
---
.../README.adoc | 79 +---------------------
1 file changed, 1 insertion(+), 78 deletions(-)
diff --git a/slack/slack-source-avro-apicurio-schema-registry/README.adoc b/slack/slack-source-avro-apicurio-schema-registry/README.adoc
index ba8c139..b22f50f 100644
--- a/slack/slack-source-avro-apicurio-schema-registry/README.adoc
+++ b/slack/slack-source-avro-apicurio-schema-registry/README.adoc
@@ -174,83 +174,6 @@ Now we need to edit the POM
.
```
-and add the following class in the main package
-
-```
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.slack.source;
-
-import java.util.Map;
-
-import org.apache.camel.component.slack.helper.SlackMessage;
-import org.apache.camel.kafkaconnector.utils.SchemaHelper;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.transforms.Transformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> {
- public static final String FIELD_KEY_CONFIG = "key";
- public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
- "Transforms String-based content from Kafka into a map");
-
- private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class);
-
- @Override
- public R apply(R r) {
- Object value = r.value();
-
- if (r.value() instanceof SlackMessage) {
- LOG.debug("Converting record from SlackMessage to text");
- SlackMessage message = (SlackMessage) r.value();
-
- LOG.debug("Received text: {}", message.getText());
-
- return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
- SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp());
-
- } else {
- LOG.debug("Unexpected message type: {}", r.value().getClass());
-
- return r;
- }
- }
-
- @Override
- public ConfigDef config() {
- return CONFIG_DEF;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> map) {
-
- }
-}
-```
-
Now we need to build the connector:
```
@@ -274,7 +197,7 @@ name=CamelSlackSourceConnector
connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=SlackTransformer
-transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.SlackTransformer
+transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.slack.transformers.SlackTransforms
value.converter.apicurio.registry.url=http://localhost:8080/api
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy