You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/07/22 06:47:50 UTC

[camel-kafka-connector] branch main updated (ebef8e6 -> 67a3f42)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from ebef8e6  Updated CHANGELOG.md
     new c13dd1f  Add RecordToJSONTransforms and JSONToRecordTransforms transforms
     new d25529d  Fix InputStream
     new 5fb0350  Regen and fix the SMT for AWS2-s3 from json to record
     new ecfd725  Camel-Kafka-Connector-catalog: Fixed test
     new 67a3f42  Camel-AWS2-S3: Fixed codestyle

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../resources/connectors/camel-aws2-s3-sink.json   |  4 +-
 .../resources/connectors/camel-aws2-s3-source.json |  4 +-
 .../catalog/CamelKafkaConnectorCatalogTest.java    |  2 +-
 connectors/camel-aws2-s3-kafka-connector/pom.xml   |  5 ++
 .../generated/resources/camel-aws2-s3-sink.json    |  4 +-
 .../generated/resources/camel-aws2-s3-source.json  |  4 +-
 .../docs/camel-aws2-s3-kafka-sink-connector.adoc   |  6 +-
 .../docs/camel-aws2-s3-kafka-source-connector.adoc |  6 +-
 .../aws2s3/models/StorageHeader.java               | 15 ++--
 .../aws2s3/models/StorageRecord.java               | 17 +++--
 .../transformers/JSONToRecordTransforms.java       | 76 +++++++++++++++++++
 .../transformers/RecordToJSONTransforms.java       | 86 ++++++++++++++++++++++
 docs/modules/ROOT/nav.adoc                         |  5 ++
 .../camel-aws2-s3-kafka-sink-connector.adoc        |  6 +-
 .../camel-aws2-s3-kafka-source-connector.adoc      |  6 +-
 docs/modules/ROOT/pages/reference/index.adoc       |  4 +-
 16 files changed, 226 insertions(+), 24 deletions(-)
 copy tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/common/SshProperties.java => connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java (76%)
 copy tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/common/SshProperties.java => connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java (70%)
 create mode 100644 connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
 create mode 100644 connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java

[camel-kafka-connector] 03/05: Regen and fix the SMT for AWS2-s3 from json to record

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 5fb0350578b3c0e953454d035052e18e6f0c2b2a
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Jul 22 08:31:10 2021 +0200

    Regen and fix the SMT for AWS2-s3 from json to record
---
 .../src/generated/resources/connectors/camel-aws2-s3-sink.json |  4 +++-
 .../generated/resources/connectors/camel-aws2-s3-source.json   |  4 +++-
 connectors/camel-aws2-s3-kafka-connector/pom.xml               | 10 +++++-----
 .../src/generated/resources/camel-aws2-s3-sink.json            |  4 +++-
 .../src/generated/resources/camel-aws2-s3-source.json          |  4 +++-
 .../src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc      |  6 +++++-
 .../src/main/docs/camel-aws2-s3-kafka-source-connector.adoc    |  6 +++++-
 .../aws2s3/transformers/JSONToRecordTransforms.java            |  2 ++
 docs/modules/ROOT/nav.adoc                                     |  5 +++++
 .../pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc   |  6 +++++-
 .../pages/connectors/camel-aws2-s3-kafka-source-connector.adoc |  6 +++++-
 docs/modules/ROOT/pages/reference/index.adoc                   |  4 +++-
 12 files changed, 47 insertions(+), 14 deletions(-)

diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-sink.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-sink.json
index 22c27d8..ffd4262 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-sink.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-sink.json
@@ -519,7 +519,9 @@
 		"org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter"
 	],
 	"transforms": [
-		"org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms"
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms",
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.JSONToRecordTransforms",
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms"
 	],
 	"aggregationStrategies": [
 		"org.apache.camel.kafkaconnector.aws2s3.aggregation.NewlineAggregationStrategy"
diff --git a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-source.json b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-source.json
index 10b67dd..034319c 100644
--- a/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-source.json
+++ b/camel-kafka-connector-catalog/src/generated/resources/connectors/camel-aws2-s3-source.json
@@ -605,7 +605,9 @@
 		"org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter"
 	],
 	"transforms": [
-		"org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms"
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms",
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.JSONToRecordTransforms",
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms"
 	],
 	"aggregationStrategies": [
 		"org.apache.camel.kafkaconnector.aws2s3.aggregation.NewlineAggregationStrategy"
diff --git a/connectors/camel-aws2-s3-kafka-connector/pom.xml b/connectors/camel-aws2-s3-kafka-connector/pom.xml
index d7318a8..3090b8b 100644
--- a/connectors/camel-aws2-s3-kafka-connector/pom.xml
+++ b/connectors/camel-aws2-s3-kafka-connector/pom.xml
@@ -44,6 +44,11 @@
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-aws2-s3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.8.7</version>
+    </dependency>
     <!--START OF GENERATED CODE-->
     <dependency>
       <groupId>org.apache.camel.kafkaconnector</groupId>
@@ -54,11 +59,6 @@
       <artifactId>camel-jackson</artifactId>
     </dependency>
     <!--END OF GENERATED CODE-->
-    <dependency>
-      <groupId>com.google.code.gson</groupId>
-      <artifactId>gson</artifactId>
-      <version>2.8.7</version>
-    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-sink.json b/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-sink.json
index 22c27d8..ffd4262 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-sink.json
+++ b/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-sink.json
@@ -519,7 +519,9 @@
 		"org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter"
 	],
 	"transforms": [
-		"org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms"
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms",
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.JSONToRecordTransforms",
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms"
 	],
 	"aggregationStrategies": [
 		"org.apache.camel.kafkaconnector.aws2s3.aggregation.NewlineAggregationStrategy"
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-source.json b/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-source.json
index 10b67dd..034319c 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-source.json
+++ b/connectors/camel-aws2-s3-kafka-connector/src/generated/resources/camel-aws2-s3-source.json
@@ -605,7 +605,9 @@
 		"org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter"
 	],
 	"transforms": [
-		"org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms"
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms",
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.JSONToRecordTransforms",
+		"org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms"
 	],
 	"aggregationStrategies": [
 		"org.apache.camel.kafkaconnector.aws2s3.aggregation.NewlineAggregationStrategy"
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc b/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc
index badf7a8..6840a85 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-sink-connector.adoc
@@ -119,7 +119,7 @@ org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter
 
 
 
-The camel-aws2-s3 sink connector supports 1 transforms out of the box, which are listed below.
+The camel-aws2-s3 sink connector supports 3 transforms out of the box, which are listed below.
 
 
 
@@ -128,6 +128,10 @@ The camel-aws2-s3 sink connector supports 1 transforms out of the box, which are
 
 org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms
 
+org.apache.camel.kafkaconnector.aws2s3.transformers.JSONToRecordTransforms
+
+org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms
+
 ----
 
 
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-source-connector.adoc b/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-source-connector.adoc
index f1fd218..9e58a4a 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-source-connector.adoc
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/docs/camel-aws2-s3-kafka-source-connector.adoc
@@ -135,7 +135,7 @@ org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter
 
 
 
-The camel-aws2-s3 source connector supports 1 transforms out of the box, which are listed below.
+The camel-aws2-s3 source connector supports 3 transforms out of the box, which are listed below.
 
 
 
@@ -144,6 +144,10 @@ The camel-aws2-s3 source connector supports 1 transforms out of the box, which a
 
 org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms
 
+org.apache.camel.kafkaconnector.aws2s3.transformers.JSONToRecordTransforms
+
+org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms
+
 ----
 
 
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
index 2d28425..8ef4c53 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
@@ -19,6 +19,8 @@ package org.apache.camel.kafkaconnector.aws2s3.transformers;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import java.util.Map;
+
+import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.header.ConnectHeaders;
diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc
index dce5c21..46c36f7 100644
--- a/docs/modules/ROOT/nav.adoc
+++ b/docs/modules/ROOT/nav.adoc
@@ -37,6 +37,9 @@
 *** camel-amqp-kafka-connector
 **** xref:reference/connectors/camel-amqp-kafka-source-connector.adoc[Source Docs]
 **** xref:reference/connectors/camel-amqp-kafka-sink-connector.adoc[Sink Docs]
+*** camel-apns-kafka-connector
+**** xref:reference/connectors/camel-apns-kafka-source-connector.adoc[Source Docs]
+**** xref:reference/connectors/camel-apns-kafka-sink-connector.adoc[Sink Docs]
 *** camel-arangodb-kafka-connector
 **** xref:reference/connectors/camel-arangodb-kafka-sink-connector.adoc[Sink Docs]
 *** camel-as2-kafka-connector
@@ -198,6 +201,8 @@
 **** xref:reference/connectors/camel-cql-kafka-sink-connector.adoc[Sink Docs]
 *** camel-cron-kafka-connector
 **** xref:reference/connectors/camel-cron-kafka-source-connector.adoc[Source Docs]
+*** camel-crypto-cms-kafka-connector
+**** xref:reference/connectors/camel-crypto-cms-kafka-sink-connector.adoc[Sink Docs]
 *** camel-crypto-kafka-connector
 **** xref:reference/connectors/camel-crypto-kafka-sink-connector.adoc[Sink Docs]
 *** camel-cxf-kafka-connector
diff --git a/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc
index badf7a8..6840a85 100644
--- a/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-sink-connector.adoc
@@ -119,7 +119,7 @@ org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter
 
 
 
-The camel-aws2-s3 sink connector supports 1 transforms out of the box, which are listed below.
+The camel-aws2-s3 sink connector supports 3 transforms out of the box, which are listed below.
 
 
 
@@ -128,6 +128,10 @@ The camel-aws2-s3 sink connector supports 1 transforms out of the box, which are
 
 org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms
 
+org.apache.camel.kafkaconnector.aws2s3.transformers.JSONToRecordTransforms
+
+org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms
+
 ----
 
 
diff --git a/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-source-connector.adoc
index f1fd218..9e58a4a 100644
--- a/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-source-connector.adoc
+++ b/docs/modules/ROOT/pages/connectors/camel-aws2-s3-kafka-source-connector.adoc
@@ -135,7 +135,7 @@ org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter
 
 
 
-The camel-aws2-s3 source connector supports 1 transforms out of the box, which are listed below.
+The camel-aws2-s3 source connector supports 3 transforms out of the box, which are listed below.
 
 
 
@@ -144,6 +144,10 @@ The camel-aws2-s3 source connector supports 1 transforms out of the box, which a
 
 org.apache.camel.kafkaconnector.aws2s3.transformers.S3ObjectTransforms
 
+org.apache.camel.kafkaconnector.aws2s3.transformers.JSONToRecordTransforms
+
+org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms
+
 ----
 
 
diff --git a/docs/modules/ROOT/pages/reference/index.adoc b/docs/modules/ROOT/pages/reference/index.adoc
index 04e976a..1eab44f 100644
--- a/docs/modules/ROOT/pages/reference/index.adoc
+++ b/docs/modules/ROOT/pages/reference/index.adoc
@@ -2,7 +2,7 @@
 = Supported connectors and documentation
 
 // kafka-connectors list: START
-Number of Camel Kafka connectors: 340 
+Number of Camel Kafka connectors: 342 
 
 [width="100%",cols="4,1,1,1,1,1",options="header"]
 |===
@@ -12,6 +12,7 @@ Number of Camel Kafka connectors: 340
 | *camel-ahc-ws-kafka-connector* | true | true | xref:reference/connectors/camel-ahc-ws-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-ahc-ws-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-ahc-ws-kafka-connector/0.10.0/camel-ahc-ws-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-ahc-wss-kafka-connector* | true | true | xref:reference/connectors/camel-ahc-wss-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-ahc-wss-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-ahc-wss-kafka-connector/0.10.0/camel-ahc-wss-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-amqp-kafka-connector* | true | true | xref:reference/connectors/camel-amqp-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-amqp-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-amqp-kafka-connector/0.10.0/camel-amqp-kafka-connector-0.10.0-package.tar.gz[Download]
+| *camel-apns-kafka-connector* | true | true | xref:reference/connectors/camel-apns-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-apns-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-apns-kafka-connector/0.10.0/camel-apns-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-arangodb-kafka-connector* | true | false | xref:reference/connectors/camel-arangodb-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-arangodb-kafka-connector/0.10.0/camel-arangodb-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-as2-kafka-connector* | true | true | xref:reference/connectors/camel-as2-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-as2-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-as2-kafka-connector/0.10.0/camel-as2-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-asterisk-kafka-connector* | true | true | xref:reference/connectors/camel-asterisk-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-asterisk-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-asterisk-kafka-connector/0.10.0/camel-asterisk-kafka-connector-0.10.0-package.tar.gz[Download]
@@ -76,6 +77,7 @@ Number of Camel Kafka connectors: 340
 | *camel-couchdb-kafka-connector* | true | true | xref:reference/connectors/camel-couchdb-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-couchdb-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-couchdb-kafka-connector/0.10.0/camel-couchdb-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-cql-kafka-connector* | true | true | xref:reference/connectors/camel-cql-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-cql-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-cql-kafka-connector/0.10.0/camel-cql-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-cron-kafka-connector* | false | true |  | xref:reference/connectors/camel-cron-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-cron-kafka-connector/0.10.0/camel-cron-kafka-connector-0.10.0-package.tar.gz[Download]
+| *camel-crypto-cms-kafka-connector* | true | false | xref:reference/connectors/camel-crypto-cms-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-crypto-cms-kafka-connector/0.10.0/camel-crypto-cms-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-crypto-kafka-connector* | true | false | xref:reference/connectors/camel-crypto-kafka-sink-connector.adoc[Sink Docs] |  | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-crypto-kafka-connector/0.10.0/camel-crypto-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-cxf-kafka-connector* | true | true | xref:reference/connectors/camel-cxf-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-cxf-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-cxf-kafka-connector/0.10.0/camel-cxf-kafka-connector-0.10.0-package.tar.gz[Download]
 | *camel-cxfrs-kafka-connector* | true | true | xref:reference/connectors/camel-cxfrs-kafka-sink-connector.adoc[Sink Docs] | xref:reference/connectors/camel-cxfrs-kafka-source-connector.adoc[Source Docs] | https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-cxfrs-kafka-connector/0.10.0/camel-cxfrs-kafka-connector-0.10.0-package.tar.gz[Download]

[camel-kafka-connector] 05/05: Camel-AWS2-S3: Fixed codestyle

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 67a3f42b08c617f3a0e0e20abc86f933d88ba254
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Jul 22 08:47:29 2021 +0200

    Camel-AWS2-S3: Fixed codestyle
---
 .../aws2s3/models/StorageHeader.java               |  13 ++-
 .../aws2s3/models/StorageRecord.java               |  16 ++--
 .../transformers/JSONToRecordTransforms.java       |  82 +++++++++--------
 .../transformers/RecordToJSONTransforms.java       | 101 +++++++++++----------
 4 files changed, 108 insertions(+), 104 deletions(-)

diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java
index c3526dc..3d22841 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java
@@ -16,14 +16,13 @@
  */
 package org.apache.camel.kafkaconnector.aws2s3.models;
 
-
 public class StorageHeader {
-  public final String key;
-  public final String value;
+    public final String key;
+    public final String value;
 
-  public StorageHeader(String key, String value) {
-    this.key = key;
-    this.value = value;
-  }
+    public StorageHeader(String key, String value) {
+        this.key = key;
+        this.value = value;
+    }
 }
 
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java
index 3d74597..9af3482 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java
@@ -17,14 +17,14 @@
 package org.apache.camel.kafkaconnector.aws2s3.models;
 
 public class StorageRecord {
-  public final String key;
-  public final String body;
-  public final StorageHeader[] headers;
+    public final String key;
+    public final String body;
+    public final StorageHeader[] headers;
 
-  public StorageRecord(String key, String body, StorageHeader[] headers) {
-    this.key = key;
-    this.body = body;
-    this.headers = headers;
-  }
+    public StorageRecord(String key, String body, StorageHeader[] headers) {
+        this.key = key;
+        this.body = body;
+        this.headers = headers;
+    }
 }
 
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
index 8ef4c53..882f224 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
@@ -16,10 +16,10 @@
  */
 package org.apache.camel.kafkaconnector.aws2s3.transformers;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import java.util.Map;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.ConnectRecord;
@@ -28,47 +28,49 @@ import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.transforms.Transformation;
 
 public class JSONToRecordTransforms<R extends ConnectRecord<R>> implements Transformation<R> {
-  public static final String FIELD_KEY_CONFIG = "key";
-  public static final ConfigDef CONFIG_DEF =
-      new ConfigDef()
-          .define(
-              FIELD_KEY_CONFIG,
-              ConfigDef.Type.STRING,
-              null,
-              ConfigDef.Importance.MEDIUM,
-              "Add the key and the header to the record value");
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF =
+            new ConfigDef()
+                    .define(
+                            FIELD_KEY_CONFIG,
+                            ConfigDef.Type.STRING,
+                            null,
+                            ConfigDef.Importance.MEDIUM,
+                            "Add the key and the header to the record value");
 
-  @Override
-  public void configure(Map<String, ?> configs) {}
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
 
-  @Override
-  public R apply(R record) {
-    String str = new String((byte[]) record.value());
-    GsonBuilder gsonBuilder = new GsonBuilder();
-    Gson gson = gsonBuilder.create();
-    StorageRecord storageRecord = gson.fromJson(str, StorageRecord.class);
-    // Header format conversion
-    Headers headers = new ConnectHeaders();
-    for (int i = 0; i < storageRecord.headers.length; i++) {
-      headers.add(storageRecord.headers[i].key, storageRecord.headers[i].value, null);
+    @Override
+    public R apply(R record) {
+        String str = new String((byte[]) record.value());
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        Gson gson = gsonBuilder.create();
+        StorageRecord storageRecord = gson.fromJson(str, StorageRecord.class);
+        // Header format conversion
+        Headers headers = new ConnectHeaders();
+        for (int i = 0; i < storageRecord.headers.length; i++) {
+            headers.add(storageRecord.headers[i].key, storageRecord.headers[i].value, null);
+        }
+        headers.forEach(h -> record.headers().add(h));
+        return record.newRecord(
+                record.topic(),
+                record.kafkaPartition(),
+                record.keySchema(),
+                storageRecord.key,
+                record.valueSchema(),
+                storageRecord.body,
+                record.timestamp(),
+                headers);
     }
-    headers.forEach(h -> record.headers().add(h));
-    return record.newRecord(
-        record.topic(),
-        record.kafkaPartition(),
-        record.keySchema(),
-        storageRecord.key,
-        record.valueSchema(),
-        storageRecord.body,
-        record.timestamp(),
-        headers);
-  }
 
-  @Override
-  public void close() {}
+    @Override
+    public void close() {
+    }
 
-  @Override
-  public ConfigDef config() {
-    return CONFIG_DEF;
-  }
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
 }
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
index 203ab14..f97e15b 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
@@ -16,68 +16,71 @@
  */
 package org.apache.camel.kafkaconnector.aws2s3.transformers;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Map;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.camel.kafkaconnector.aws2s3.models.StorageHeader;
+import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.transforms.Transformation;
-import org.apache.camel.kafkaconnector.aws2s3.models.StorageHeader;
-import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
 
 public class RecordToJSONTransforms<R extends ConnectRecord<R>> implements Transformation<R> {
-  public static final String FIELD_KEY_CONFIG = "key";
-  public static final ConfigDef CONFIG_DEF =
-      new ConfigDef()
-          .define(
-              FIELD_KEY_CONFIG,
-              ConfigDef.Type.STRING,
-              null,
-              ConfigDef.Importance.MEDIUM,
-              "Add the key and the header to the record value");
-
-  @Override
-  public void configure(Map<String, ?> configs) {}
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF =
+            new ConfigDef()
+                    .define(
+                            FIELD_KEY_CONFIG,
+                            ConfigDef.Type.STRING,
+                            null,
+                            ConfigDef.Importance.MEDIUM,
+                            "Add the key and the header to the record value");
 
-  @Override
-  public R apply(R record) {
-    // Convert headers to StorageHeader format
-    Headers headers = record.headers();
-    ArrayList<StorageHeader> headerList = new ArrayList<StorageHeader>(headers.size());
-    for (Header h : headers) {
-      headerList.add(new StorageHeader(h.key(), (String) h.value()));
+    @Override
+    public void configure(Map<String, ?> configs) {
     }
-    StorageHeader[] storageHeaders = new StorageHeader[headers.size()];
-    StorageRecord storageRecord =
-        new StorageRecord(
-            (String) record.key(), (String) record.value(), headerList.toArray(storageHeaders));
 
-    // Serialize
-    GsonBuilder gsonBuilder = new GsonBuilder();
-    Gson gson = gsonBuilder.create();
-    String storageRecordJSON = gson.toJson(storageRecord, StorageRecord.class);
-    InputStream storageRecordStream = new ByteArrayInputStream(storageRecordJSON.getBytes());
-    return record.newRecord(
-        record.topic(),
-        record.kafkaPartition(),
-        null,
-        record.key(),
-        Schema.STRING_SCHEMA,
-        storageRecordStream,
-        record.timestamp());
-  }
+    @Override
+    public R apply(R record) {
+        // Convert headers to StorageHeader format
+        Headers headers = record.headers();
+        ArrayList<StorageHeader> headerList = new ArrayList<StorageHeader>(headers.size());
+        for (Header h : headers) {
+            headerList.add(new StorageHeader(h.key(), (String) h.value()));
+        }
+        StorageHeader[] storageHeaders = new StorageHeader[headers.size()];
+        StorageRecord storageRecord =
+                new StorageRecord(
+                        (String) record.key(), (String) record.value(), headerList.toArray(storageHeaders));
 
-  @Override
-  public void close() {}
+        // Serialize
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        Gson gson = gsonBuilder.create();
+        String storageRecordJSON = gson.toJson(storageRecord, StorageRecord.class);
+        InputStream storageRecordStream = new ByteArrayInputStream(storageRecordJSON.getBytes());
+        return record.newRecord(
+                record.topic(),
+                record.kafkaPartition(),
+                null,
+                record.key(),
+                Schema.STRING_SCHEMA,
+                storageRecordStream,
+                record.timestamp());
+    }
+
+    @Override
+    public void close() {
+    }
 
-  @Override
-  public ConfigDef config() {
-    return CONFIG_DEF;
-  }
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
 }

[camel-kafka-connector] 02/05: Fix InputStream

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d25529dc92f03b1fb2afd3b9883ce2a8f5a86a9c
Author: Mathieu <ma...@gmail.com>
AuthorDate: Wed Jun 23 23:02:50 2021 +0200

    Fix InputStream
---
 .../kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
index fdb2e9f..203ab14 100644
--- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.camel.kafkaconnector.aws2s3.models.StorageHeader;
 import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord;
 import java.io.ByteArrayInputStream;
+import java.io.InputStream;
 
 public class RecordToJSONTransforms<R extends ConnectRecord<R>> implements Transformation<R> {
   public static final String FIELD_KEY_CONFIG = "key";
@@ -61,7 +62,7 @@ public class RecordToJSONTransforms<R extends ConnectRecord<R>> implements Trans
     GsonBuilder gsonBuilder = new GsonBuilder();
     Gson gson = gsonBuilder.create();
     String storageRecordJSON = gson.toJson(storageRecord, StorageRecord.class);
-    InputStream storageRecordStream = new ByteArrayInputStream(storageRecordJSON.getBytes())
+    InputStream storageRecordStream = new ByteArrayInputStream(storageRecordJSON.getBytes());
     return record.newRecord(
         record.topic(),
         record.kafkaPartition(),

[camel-kafka-connector] 04/05: Camel-Kafka-Connector-catalog: Fixed test

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ecfd7253cd0ed32edf1abcc2e4032b1cbaa7b977
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Jul 22 08:40:06 2021 +0200

    Camel-Kafka-Connector-catalog: Fixed test
---
 .../camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java b/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
index 114e311..4177192 100644
--- a/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
+++ b/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
@@ -63,7 +63,7 @@ class CamelKafkaConnectorCatalogTest {
         assertEquals("camel.sink.endpoint.amazonS3Client", model.getOptions().get(1).getName());
         assertEquals("camel.sink.endpoint.amazonS3Presigner", model.getOptions().get(2).getName());
         assertEquals(1, model.getConverters().size());
-        assertEquals(1, model.getTransforms().size());
+        assertEquals(3, model.getTransforms().size());
         assertEquals(1, model.getAggregationStrategies().size());
     }
     

[camel-kafka-connector] 01/05: Add RecordToJSONTransforms and JSONToRecordTransforms transforms

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c13dd1f849925da51b0a1a92f30cc2611888cad3
Author: Mathieu <ma...@gmail.com>
AuthorDate: Fri Jun 11 15:39:49 2021 +0200

    Add RecordToJSONTransforms and JSONToRecordTransforms transforms
---
 connectors/camel-aws2-s3-kafka-connector/pom.xml   |  5 ++
 .../aws2s3/models/StorageHeader.java               | 29 ++++++++
 .../aws2s3/models/StorageRecord.java               | 30 ++++++++
 .../transformers/JSONToRecordTransforms.java       | 72 +++++++++++++++++++
 .../transformers/RecordToJSONTransforms.java       | 82 ++++++++++++++++++++++
 5 files changed, 218 insertions(+)

diff --git a/connectors/camel-aws2-s3-kafka-connector/pom.xml b/connectors/camel-aws2-s3-kafka-connector/pom.xml
index 6650a70..d7318a8 100644
--- a/connectors/camel-aws2-s3-kafka-connector/pom.xml
+++ b/connectors/camel-aws2-s3-kafka-connector/pom.xml
@@ -54,6 +54,11 @@
       <artifactId>camel-jackson</artifactId>
     </dependency>
     <!--END OF GENERATED CODE-->
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.8.7</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java
new file mode 100644
index 0000000..c3526dc
--- /dev/null
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.aws2s3.models;
+
+
+public class StorageHeader {
+  public final String key;
+  public final String value;
+
+  public StorageHeader(String key, String value) {
+    this.key = key;
+    this.value = value;
+  }
+}
+
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java
new file mode 100644
index 0000000..3d74597
--- /dev/null
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.aws2s3.models;
+
+public class StorageRecord {
+  public final String key;
+  public final String body;
+  public final StorageHeader[] headers;
+
+  public StorageRecord(String key, String body, StorageHeader[] headers) {
+    this.key = key;
+    this.body = body;
+    this.headers = headers;
+  }
+}
+
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
new file mode 100644
index 0000000..2d28425
--- /dev/null
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.aws2s3.transformers;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.Transformation;
+
+public class JSONToRecordTransforms<R extends ConnectRecord<R>> implements Transformation<R> {
+  public static final String FIELD_KEY_CONFIG = "key";
+  public static final ConfigDef CONFIG_DEF =
+      new ConfigDef()
+          .define(
+              FIELD_KEY_CONFIG,
+              ConfigDef.Type.STRING,
+              null,
+              ConfigDef.Importance.MEDIUM,
+              "Add the key and the header to the record value");
+
+  @Override
+  public void configure(Map<String, ?> configs) {}
+
+  @Override
+  public R apply(R record) {
+    String str = new String((byte[]) record.value());
+    GsonBuilder gsonBuilder = new GsonBuilder();
+    Gson gson = gsonBuilder.create();
+    StorageRecord storageRecord = gson.fromJson(str, StorageRecord.class);
+    // Header format conversion
+    Headers headers = new ConnectHeaders();
+    for (int i = 0; i < storageRecord.headers.length; i++) {
+      headers.add(storageRecord.headers[i].key, storageRecord.headers[i].value, null);
+    }
+    headers.forEach(h -> record.headers().add(h));
+    return record.newRecord(
+        record.topic(),
+        record.kafkaPartition(),
+        record.keySchema(),
+        storageRecord.key,
+        record.valueSchema(),
+        storageRecord.body,
+        record.timestamp(),
+        headers);
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public ConfigDef config() {
+    return CONFIG_DEF;
+  }
+}
diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
new file mode 100644
index 0000000..fdb2e9f
--- /dev/null
+++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.aws2s3.transformers;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.camel.kafkaconnector.aws2s3.models.StorageHeader;
+import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord;
+import java.io.ByteArrayInputStream;
+
+public class RecordToJSONTransforms<R extends ConnectRecord<R>> implements Transformation<R> {
+  public static final String FIELD_KEY_CONFIG = "key";
+  public static final ConfigDef CONFIG_DEF =
+      new ConfigDef()
+          .define(
+              FIELD_KEY_CONFIG,
+              ConfigDef.Type.STRING,
+              null,
+              ConfigDef.Importance.MEDIUM,
+              "Add the key and the header to the record value");
+
+  @Override
+  public void configure(Map<String, ?> configs) {}
+
+  @Override
+  public R apply(R record) {
+    // Convert headers to StorageHeader format
+    Headers headers = record.headers();
+    ArrayList<StorageHeader> headerList = new ArrayList<StorageHeader>(headers.size());
+    for (Header h : headers) {
+      headerList.add(new StorageHeader(h.key(), (String) h.value()));
+    }
+    StorageHeader[] storageHeaders = new StorageHeader[headers.size()];
+    StorageRecord storageRecord =
+        new StorageRecord(
+            (String) record.key(), (String) record.value(), headerList.toArray(storageHeaders));
+
+    // Serialize
+    GsonBuilder gsonBuilder = new GsonBuilder();
+    Gson gson = gsonBuilder.create();
+    String storageRecordJSON = gson.toJson(storageRecord, StorageRecord.class);
+    InputStream storageRecordStream = new ByteArrayInputStream(storageRecordJSON.getBytes())
+    return record.newRecord(
+        record.topic(),
+        record.kafkaPartition(),
+        null,
+        record.key(),
+        Schema.STRING_SCHEMA,
+        storageRecordStream,
+        record.timestamp());
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public ConfigDef config() {
+    return CONFIG_DEF;
+  }
+}