You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/09 14:47:52 UTC
[camel-kafka-connector] 01/01: Camel-AWS S3 example: Use a
Transformation instead of a converter
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch transform
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit eaf5968cb23c2b3490e3932cb1282547a5a122e1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 15:45:40 2019 +0100
Camel-AWS S3 example: Use a Transformation instead of a converter
---
.../converters/S3ObjectConverter.java | 43 -----------------
.../converters/S3ObjectTransformer.java | 56 ++++++++++++++++++++++
examples/CamelAWSS3SourceConnector.properties | 12 +++--
.../source/aws/s3/CamelAWSS3PropertyFactory.java | 4 +-
4 files changed, 65 insertions(+), 50 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java
deleted file mode 100644
index 59678d6..0000000
--- a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kafkaconnector.converters;
-
-import java.util.Map;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaAndValue;
-import org.apache.kafka.connect.storage.Converter;
-
-public class S3ObjectConverter implements Converter {
-
- private final S3ObjectSerializer serializer = new S3ObjectSerializer();
-
- @Override
- public void configure(Map<String, ?> arg0, boolean arg1) {
- }
-
- @Override
- public byte[] fromConnectData(String topic, Schema schema, Object value) {
- return serializer.serialize(topic, (S3ObjectInputStream) value);
- }
-
- @Override
- public SchemaAndValue toConnectData(String arg0, byte[] arg1) {
- return null;
- }
-
-}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java
new file mode 100644
index 0000000..1692e51
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.converters;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
+public class S3ObjectTransformer<R extends ConnectRecord<R>> implements Transformation<R> {
+
+ private final S3ObjectSerializer serializer = new S3ObjectSerializer();
+
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM,
+ "Transform the content of a bucket into a string ");
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ }
+
+ @Override
+ public R apply(R record) {
+ byte[] v = serializer.serialize(record.topic(), (S3ObjectInputStream) record.value());
+ String finalValue = new String(v);
+ return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp());
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+}
diff --git a/examples/CamelAWSS3SourceConnector.properties b/examples/CamelAWSS3SourceConnector.properties
index bfee4e3..12f9efc 100644
--- a/examples/CamelAWSS3SourceConnector.properties
+++ b/examples/CamelAWSS3SourceConnector.properties
@@ -18,14 +18,16 @@
name=CamelAWSS3SourceConnector
connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
-value.converter=org.apache.camel.kafkaconnector.converters.S3ObjectConverter
+transforms=S3ObjectTransformer
+transforms.S3ObjectTransformer.type=org.apache.camel.kafkaconnector.converters.S3ObjectTransformer
+
camel.source.maxPollDuration=10000
camel.source.kafka.topic=mytopic
-camel.source.url=aws-s3://bucket?autocloseBody=false
+camel.source.url=aws-s3://camel-kafka-connector?autocloseBody=false
-camel.component.aws-s3.configuration.access-key=<youraccesskey>
-camel.component.aws-s3.configuration.secret-key=<yoursecretkey>
-camel.component.aws-s3.configuration.region=<yourregion>
+camel.component.aws-s3.configuration.access-key=AKIAJ2ZAQSIYAPK4EX2Q
+camel.component.aws-s3.configuration.secret-key=Z7qFOGWw9SiX5AjOBlzok1Nr6mQGEk27lI6FaPef
+camel.component.aws-s3.configuration.region=EU_WEST_1
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
index 28fda93..d7c6456 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
@@ -48,8 +48,8 @@ class CamelAWSS3PropertyFactory implements ConnectorPropertyFactory {
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSourceConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
- connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.camel.kafkaconnector.converters.S3ObjectConverter");
-
+ connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG, "S3ObjectTransformer");
+ connectorProps.put("transforms.S3ObjectTransformer.type", "org.apache.camel.kafkaconnector.converters.S3ObjectTransformer");
connectorProps.put("camel.source.kafka.topic", topic);
String queueUrl = "aws-s3://" + bucket + "?maxMessagesPerPoll=10";