You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/09 14:47:51 UTC

[camel-kafka-connector] branch transform created (now eaf5968)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch transform
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at eaf5968  Camel-AWS S3 example: Use a Transformation instead of a converter

This branch includes the following new commits:

     new eaf5968  Camel-AWS S3 example: Use a Transformation instead of a converter

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 01/01: Camel-AWS S3 example: Use a Transformation instead of a converter

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch transform
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit eaf5968cb23c2b3490e3932cb1282547a5a122e1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 9 15:45:40 2019 +0100

    Camel-AWS S3 example: Use a Transformation instead of a converter
---
 .../converters/S3ObjectConverter.java              | 43 -----------------
 .../converters/S3ObjectTransformer.java            | 56 ++++++++++++++++++++++
 examples/CamelAWSS3SourceConnector.properties      | 12 +++--
 .../source/aws/s3/CamelAWSS3PropertyFactory.java   |  4 +-
 4 files changed, 65 insertions(+), 50 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java
deleted file mode 100644
index 59678d6..0000000
--- a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kafkaconnector.converters;
-
-import java.util.Map;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaAndValue;
-import org.apache.kafka.connect.storage.Converter;
-
-public class S3ObjectConverter implements Converter {
-
-    private final S3ObjectSerializer serializer = new S3ObjectSerializer();
-
-    @Override
-    public void configure(Map<String, ?> arg0, boolean arg1) {
-    }
-
-    @Override
-    public byte[] fromConnectData(String topic, Schema schema, Object value) {
-        return serializer.serialize(topic, (S3ObjectInputStream) value);
-    }
-
-    @Override
-    public SchemaAndValue toConnectData(String arg0, byte[] arg1) {
-        return null;
-    }
-
-}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java
new file mode 100644
index 0000000..1692e51
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.converters;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
+public class S3ObjectTransformer<R extends ConnectRecord<R>> implements Transformation<R> {
+
+	private final S3ObjectSerializer serializer = new S3ObjectSerializer();
+	
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM,
+                    "Transform the content of a bucket into a string ");
+	
+	@Override
+	public void configure(Map<String, ?> configs) {
+	}
+
+	@Override
+	public R apply(R record) {
+		byte[] v = serializer.serialize(record.topic(), (S3ObjectInputStream) record.value());
+		String finalValue = new String(v);
+		return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp());
+	}
+
+	@Override
+	public void close() {	
+	}
+
+	@Override
+	public ConfigDef config() {
+		return CONFIG_DEF;
+	}
+
+}
diff --git a/examples/CamelAWSS3SourceConnector.properties b/examples/CamelAWSS3SourceConnector.properties
index bfee4e3..12f9efc 100644
--- a/examples/CamelAWSS3SourceConnector.properties
+++ b/examples/CamelAWSS3SourceConnector.properties
@@ -18,14 +18,16 @@
 name=CamelAWSS3SourceConnector
 connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector
 key.converter=org.apache.kafka.connect.storage.StringConverter
-value.converter=org.apache.camel.kafkaconnector.converters.S3ObjectConverter
+transforms=S3ObjectTransformer
+transforms.S3ObjectTransformer.type=org.apache.camel.kafkaconnector.converters.S3ObjectTransformer
+
 camel.source.maxPollDuration=10000
 
 camel.source.kafka.topic=mytopic
 
-camel.source.url=aws-s3://bucket?autocloseBody=false
+camel.source.url=aws-s3://camel-kafka-connector?autocloseBody=false
 
-camel.component.aws-s3.configuration.access-key=<youraccesskey>
-camel.component.aws-s3.configuration.secret-key=<yoursecretkey>
-camel.component.aws-s3.configuration.region=<yourregion>
+camel.component.aws-s3.configuration.access-key=AKIAJ2ZAQSIYAPK4EX2Q
+camel.component.aws-s3.configuration.secret-key=Z7qFOGWw9SiX5AjOBlzok1Nr6mQGEk27lI6FaPef
+camel.component.aws-s3.configuration.region=EU_WEST_1
 
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
index 28fda93..d7c6456 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
@@ -48,8 +48,8 @@ class CamelAWSS3PropertyFactory implements ConnectorPropertyFactory {
 
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSourceConnector");
         connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-        connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.camel.kafkaconnector.converters.S3ObjectConverter");
-
+        connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG, "S3ObjectTransformer");
+        connectorProps.put("transforms.S3ObjectTransformer.type", "org.apache.camel.kafkaconnector.converters.S3ObjectTransformer");
         connectorProps.put("camel.source.kafka.topic", topic);
 
         String queueUrl = "aws-s3://" + bucket + "?maxMessagesPerPoll=10";