You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/09 14:47:52 UTC

[camel-kafka-connector] 01/01: Camel-AWS S3 example: Use a Transformation instead of a converter

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch transform
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit eaf5968cb23c2b3490e3932cb1282547a5a122e1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 15:45:40 2019 +0100

    Camel-AWS S3 example: Use a Transformation instead of a converter
---
 .../converters/S3ObjectConverter.java              | 43 -----------------
 .../converters/S3ObjectTransformer.java            | 56 ++++++++++++++++++++++
 examples/CamelAWSS3SourceConnector.properties      | 12 +++--
 .../source/aws/s3/CamelAWSS3PropertyFactory.java   |  4 +-
 4 files changed, 65 insertions(+), 50 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java
deleted file mode 100644
index 59678d6..0000000
--- a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kafkaconnector.converters;
-
-import java.util.Map;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaAndValue;
-import org.apache.kafka.connect.storage.Converter;
-
-public class S3ObjectConverter implements Converter {
-
-    private final S3ObjectSerializer serializer = new S3ObjectSerializer();
-
-    @Override
-    public void configure(Map<String, ?> arg0, boolean arg1) {
-    }
-
-    @Override
-    public byte[] fromConnectData(String topic, Schema schema, Object value) {
-        return serializer.serialize(topic, (S3ObjectInputStream) value);
-    }
-
-    @Override
-    public SchemaAndValue toConnectData(String arg0, byte[] arg1) {
-        return null;
-    }
-
-}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java
new file mode 100644
index 0000000..1692e51
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.converters;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
+public class S3ObjectTransformer<R extends ConnectRecord<R>> implements Transformation<R> {
+
+	private final S3ObjectSerializer serializer = new S3ObjectSerializer();
+	
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM,
+                    "Transform the content of a bucket into a string ");
+	
+	@Override
+	public void configure(Map<String, ?> configs) {
+	}
+
+	@Override
+	public R apply(R record) {
+		byte[] v = serializer.serialize(record.topic(), (S3ObjectInputStream) record.value());
+		String finalValue = new String(v);
+		return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp());
+	}
+
+	@Override
+	public void close() {	
+	}
+
+	@Override
+	public ConfigDef config() {
+		return CONFIG_DEF;
+	}
+
+}
diff --git a/examples/CamelAWSS3SourceConnector.properties b/examples/CamelAWSS3SourceConnector.properties
index bfee4e3..12f9efc 100644
--- a/examples/CamelAWSS3SourceConnector.properties
+++ b/examples/CamelAWSS3SourceConnector.properties
@@ -18,14 +18,16 @@
 name=CamelAWSS3SourceConnector
 connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector
 key.converter=org.apache.kafka.connect.storage.StringConverter
-value.converter=org.apache.camel.kafkaconnector.converters.S3ObjectConverter
+transforms=S3ObjectTransformer
+transforms.S3ObjectTransformer.type=org.apache.camel.kafkaconnector.converters.S3ObjectTransformer
+
 camel.source.maxPollDuration=10000
 
 camel.source.kafka.topic=mytopic
 
-camel.source.url=aws-s3://bucket?autocloseBody=false
+camel.source.url=aws-s3://camel-kafka-connector?autocloseBody=false
 
-camel.component.aws-s3.configuration.access-key=<youraccesskey>
-camel.component.aws-s3.configuration.secret-key=<yoursecretkey>
-camel.component.aws-s3.configuration.region=<yourregion>
+camel.component.aws-s3.configuration.access-key=AKIAJ2ZAQSIYAPK4EX2Q
+camel.component.aws-s3.configuration.secret-key=Z7qFOGWw9SiX5AjOBlzok1Nr6mQGEk27lI6FaPef
+camel.component.aws-s3.configuration.region=EU_WEST_1
 
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
index 28fda93..d7c6456 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
@@ -48,8 +48,8 @@ class CamelAWSS3PropertyFactory implements ConnectorPropertyFactory {
 
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSourceConnector");
         connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-        connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.camel.kafkaconnector.converters.S3ObjectConverter");
-
+        connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG, "S3ObjectTransformer");
+        connectorProps.put("transforms.S3ObjectTransformer.type", "org.apache.camel.kafkaconnector.converters.S3ObjectTransformer");
         connectorProps.put("camel.source.kafka.topic", topic);
 
         String queueUrl = "aws-s3://" + bucket + "?maxMessagesPerPoll=10";