You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/11 14:13:09 UTC

[camel-kafka-connector] branch master updated: Adds support for ElasticSearch and include integration tests

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 729cc55  Adds support for ElasticSearch and include integration tests
     new 4808e77  Merge pull request #36 from orpiske/elasticsearch
729cc55 is described below

commit 729cc5538561ca1b4540b6de5a38eec1bbd6c779
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Dec 4 17:20:15 2019 +0100

    Adds support for ElasticSearch and include integration tests
---
 core/pom.xml                                       |   4 +
 parent/pom.xml                                     |  14 +-
 tests/pom.xml                                      |   7 +
 .../apache/camel/kafkaconnector/TestCommon.java    |  10 ++
 .../clients/elasticsearch/ElasticSearchClient.java | 173 +++++++++++++++++++++
 .../CamelElasticSearchIndexPropertyFactory.java    |  56 +++++++
 .../CamelElasticSearchPropertyFactory.java         |  75 +++++++++
 .../CamelSinkElasticSearchITCase.java              | 146 +++++++++++++++++
 .../ConnectRecordValueToMapTransformer.java        |  70 +++++++++
 tests/src/test/resources/log4j2.properties         |   4 +
 10 files changed, 558 insertions(+), 1 deletion(-)

diff --git a/core/pom.xml b/core/pom.xml
index 1763235..ab2e4d8 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -90,6 +90,10 @@
             <artifactId>camel-http</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-elasticsearch-rest</artifactId>
+        </dependency>
 
         <!-- Kafka -->
         <dependency>
diff --git a/parent/pom.xml b/parent/pom.xml
index b719242..ce526c8 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -149,6 +149,13 @@
                 <artifactId>camel-http</artifactId>
                 <version>${camel.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-elasticsearch-rest</artifactId>
+                <version>${camel.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.fasterxml.jackson.dataformat</groupId>
                 <artifactId>jackson-dataformat-cbor</artifactId>
@@ -241,6 +248,12 @@
                 <version>${version.testcontainers}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>elasticsearch</artifactId>
+                <version>${version.testcontainers}</version>
+                <scope>test</scope>
+            </dependency>
 
             <dependency>
                 <groupId>org.apache.camel</groupId>
@@ -254,7 +267,6 @@
                 <version>${camel.version}</version>
                 <scope>test</scope>
             </dependency>
-
         </dependencies>
     </dependencyManagement>
     <build>
diff --git a/tests/pom.xml b/tests/pom.xml
index 2255c52..b40be9f 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -136,6 +136,13 @@
             <artifactId>localstack</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java b/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java
index 706ceb3..05719e6 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java
@@ -53,6 +53,16 @@ public final class TestCommon {
      */
     public static final String DEFAULT_KINESIS_STREAM = "ckc-kin-stream";
 
+    /**
+     * The default ElasticSearch cluster name for usage during the tests
+     */
+    public static final String DEFAULT_ELASTICSEARCH_CLUSTER = "docker-cluster";
+
+    /**
+     * The default ElasticSearch index for usage during the tests
+     */
+    public static final String DEFAULT_ELASTICSEARCH_INDEX = "ckc-index";
+
     private static final Logger LOG = LoggerFactory.getLogger(TestCommon.class);
 
     private TestCommon() {
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java
new file mode 100644
index 0000000..93ccbbb
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.clients.elasticsearch;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.function.Predicate;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchClient {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClient.class);
+
+    private final RestHighLevelClient client;
+    private final String index;
+
+    public ElasticSearchClient(int port, String index) {
+        client = new RestHighLevelClient(
+                RestClient.builder(
+                        new HttpHost("localhost", port, "http")));
+
+        this.index = index;
+    }
+
+    public boolean indexExists() {
+        try {
+            GetIndexRequest indexRequest = new GetIndexRequest(index);
+
+            return client.indices().exists(indexRequest, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            /*
+                  It may return if failed to parse the response, on timeout or no response from the ES instance.
+                  Assuming it is more likely to timeout or provide no reply either the during the start up or
+                  on overloaded CI environments, we log the I/O error and try again
+                 */
+            LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e);
+        }
+
+        return false;
+    }
+
+    public SearchHits getData() {
+        try {
+            SearchRequest searchRequest = new SearchRequest(index);
+            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+            searchSourceBuilder.query(QueryBuilders.matchAllQuery());
+
+            searchRequest.source(searchSourceBuilder);
+
+            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+
+            return response.getHits();
+
+        } catch (IOException e) {
+            /*
+              It may return if failed to parse the response, on timeout or no response from the ES instance.
+              Assuming it is more likely to timeout or provide no reply either the during the start up or
+              on overloaded CI environments, we log the I/O error and try again
+             */
+            LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e);
+        } catch (Throwable e) {
+            LOG.error("Unhandled error trying to query for index existence: {}", e.getMessage(), e);
+        }
+
+        return null;
+    }
+
+    private boolean hasData(int expect) {
+        SearchHits searchHits = getData();
+
+        if (searchHits == null) {
+            LOG.debug("There are not search hit to return");
+
+            return false;
+        }
+
+        SearchHit[] hits = searchHits.getHits();
+        if (hits == null) {
+            LOG.debug("Empty data set");
+
+            return false;
+        }
+
+        int count = hits.length;
+
+        if (count != expect) {
+            LOG.debug("Not enough records: {} available, but {} expected", count, expect);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    private <T> void waitFor(Predicate<T> resourceCheck, T payload) {
+        boolean state;
+        int retries = 30;
+        int waitTime = 1000;
+        do {
+            try {
+                state = resourceCheck.test(payload);
+
+                if (!state) {
+                    LOG.debug("The resource is not yet available. Waiting {} seconds before retrying",
+                            TimeUnit.MILLISECONDS.toSeconds(waitTime));
+                    retries--;
+                    Thread.sleep(waitTime);
+                }
+            } catch (InterruptedException e) {
+                break;
+            }
+
+        } while (!state && retries > 0);
+    }
+
+    private void waitFor(BooleanSupplier resourceCheck) {
+        boolean state;
+        int retries = 30;
+        int waitTime = 1000;
+        do {
+            try {
+                state = resourceCheck.getAsBoolean();
+
+                if (!state) {
+                    LOG.debug("The resource is not yet available. Waiting {} seconds before retrying",
+                            TimeUnit.MILLISECONDS.toSeconds(waitTime));
+                    retries--;
+                    Thread.sleep(waitTime);
+                }
+            } catch (InterruptedException e) {
+                break;
+            }
+
+        } while (!state && retries > 0);
+    }
+
+    public void waitForIndex() {
+        waitFor(this::indexExists);
+    }
+
+    public void waitForData(int expect) {
+        waitFor(this::hasData, expect);
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java
new file mode 100644
index 0000000..500a0bc
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sink.elasticsearch;
+
+import java.util.Properties;
+
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+public class CamelElasticSearchIndexPropertyFactory extends CamelElasticSearchPropertyFactory {
+    private final String index;
+    private final String transformerKey;
+
+
+    CamelElasticSearchIndexPropertyFactory(int tasksMax, String topic, String clusterName, String hostAddress,
+                                           String index, String transformerKey) {
+        super(tasksMax, topic, clusterName, hostAddress, index);
+        this.index = index;
+        this.transformerKey = transformerKey;
+    }
+
+
+    @Override
+    public Properties getProperties() {
+        Properties connectorProps = super.getProperties();
+
+        connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG, "ElasticSearchTransformer");
+        connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG + ".ElasticSearchTransformer.type",
+                "org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer");
+        connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG + ".ElasticSearchTransformer.key",
+                transformerKey);
+
+        String queueUrl = "elasticsearch-rest://" + getClusterName() + "?hostAddresses=" + getHostAddress()
+                + "&operation=Index"
+                + "&indexName=" + index;
+
+        connectorProps.put("camel.sink.url", queueUrl);
+
+
+        return connectorProps;
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java
new file mode 100644
index 0000000..b5bb19e
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sink.elasticsearch;
+
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+public class CamelElasticSearchPropertyFactory implements ConnectorPropertyFactory {
+    private final int tasksMax;
+    private final String topic;
+    private final String clusterName;
+    private final String hostAddress;
+    private final String index;
+
+
+    CamelElasticSearchPropertyFactory(int tasksMax, String topic, String clusterName, String hostAddress, String index) {
+        this.tasksMax = tasksMax;
+        this.topic = topic;
+        this.clusterName = clusterName;
+        this.hostAddress = hostAddress;
+        this.index = index;
+    }
+
+    protected int getTasksMax() {
+        return tasksMax;
+    }
+
+    protected String getTopic() {
+        return topic;
+    }
+
+    protected String getClusterName() {
+        return clusterName;
+    }
+
+    protected String getHostAddress() {
+        return hostAddress;
+    }
+
+    protected String getIndex() {
+        return index;
+    }
+
+    @Override
+    public Properties getProperties() {
+        Properties connectorProps = new Properties();
+        connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelElasticSearchSinkConnector");
+        connectorProps.put("tasks.max", String.valueOf(tasksMax));
+
+        connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
+        connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+
+        connectorProps.put("topics", topic);
+
+        return connectorProps;
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java
new file mode 100644
index 0000000..600c00e
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sink.elasticsearch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.ContainerUtil;
+import org.apache.camel.kafkaconnector.KafkaConnectRunner;
+import org.apache.camel.kafkaconnector.TestCommon;
+import org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient;
+import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import static org.junit.Assert.fail;
+
+public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);
+    // This is required in order to use the Open Source one by default
+    private static final String ELASTIC_SEARCH_CONTAINER = "docker.elastic.co/elasticsearch/elasticsearch-oss:7.3.2";
+
+    private static final int ELASTIC_SEARCH_PORT = 9200;
+
+    @Rule
+    public ElasticsearchContainer elasticsearch = new ElasticsearchContainer(ELASTIC_SEARCH_CONTAINER);
+
+    private KafkaConnectRunner kafkaConnectRunner;
+    private ElasticSearchClient client;
+
+    private final int expect = 10;
+    private int received;
+    private final String transformKey = "index-test";
+
+    @Before
+    public void setUp() {
+        ContainerUtil.waitForHttpInitialization(elasticsearch, elasticsearch.getMappedPort(ELASTIC_SEARCH_PORT));
+
+        final String elasticSearchInstance = elasticsearch
+                .getHttpHostAddress();
+
+        LOG.info("ElasticSearch instance running at {}", elasticSearchInstance);
+
+        String topic = TestCommon.getDefaultTestTopic(this.getClass());
+        CamelElasticSearchPropertyFactory testProperties = new CamelElasticSearchIndexPropertyFactory(1, topic,
+                TestCommon.DEFAULT_ELASTICSEARCH_CLUSTER,
+                elasticSearchInstance, TestCommon.DEFAULT_ELASTICSEARCH_INDEX, transformKey);
+
+        kafkaConnectRunner = getKafkaConnectRunner();
+        kafkaConnectRunner.getConnectorPropertyProducers().add(testProperties);
+
+        client = new ElasticSearchClient(elasticsearch.getMappedPort(ELASTIC_SEARCH_PORT),
+                TestCommon.DEFAULT_ELASTICSEARCH_INDEX);
+    }
+
+    private void putRecords(CountDownLatch latch) {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        try {
+            for (int i = 0; i < expect; i++) {
+                try {
+                    kafkaClient.produce(TestCommon.getDefaultTestTopic(this.getClass()), "test");
+                } catch (ExecutionException e) {
+                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+
+    }
+
+    private void verifyHit(SearchHit searchHit) {
+        String source = searchHit.getSourceAsString();
+
+        Assert.assertTrue(source != null);
+        Assert.assertFalse(source.isEmpty());
+
+        // TODO: this is not enough, we need to parse the json and check the key itself
+        Assert.assertTrue(source.contains(transformKey));
+
+        LOG.debug("Search hit: {} ", searchHit.getSourceAsString());
+        received++;
+    }
+
+
+
+    @Test(timeout = 90000)
+    public void testIndexOperation() {
+        try {
+            CountDownLatch latch = new CountDownLatch(2);
+            ExecutorService service = Executors.newCachedThreadPool();
+            service.submit(() -> kafkaConnectRunner.run(latch));
+            service.submit(() -> putRecords(latch));
+
+            latch.await(30, TimeUnit.SECONDS);
+
+            LOG.debug("Waiting for indices");
+
+            client.waitForIndex();
+
+            LOG.debug("Waiting for data");
+            client.waitForData(expect);
+
+            SearchHits hits = client.getData();
+
+            hits.forEach(this::verifyHit);
+            Assert.assertEquals("Did not receive the same amount of messages sent", expect, received);
+
+            LOG.debug("Created the consumer ... About to receive messages");
+        } catch (Exception e) {
+            LOG.error("ElasticSearch test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        } finally {
+            kafkaConnectRunner.stop();
+        }
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java
new file mode 100644
index 0000000..44ef5ac
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sink.elasticsearch.transforms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+public class ConnectRecordValueToMapTransformer<R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Transforms String-based content from Kafka into a map");
+
+    private String key;
+
+    @Override
+    public R apply(R r) {
+        Map<String, Object> targetMap = new HashMap<>();
+
+        Object value = r.value();
+
+        targetMap.put(key, value);
+        return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+                SchemaHelper.buildSchemaBuilderForType(value), targetMap, r.timestamp());
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map);
+
+        this.key = config.getString(FIELD_KEY_CONFIG);
+
+        if (this.key == null) {
+            throw new ConfigException("The ElasticSearch transformer requires a 'key'");
+        }
+    }
+}
diff --git a/tests/src/test/resources/log4j2.properties b/tests/src/test/resources/log4j2.properties
index eefb917..b3e5e53 100644
--- a/tests/src/test/resources/log4j2.properties
+++ b/tests/src/test/resources/log4j2.properties
@@ -35,3 +35,7 @@ logger.reflections.appenderRef.file.ref = file
 logger.camel-aws.name = org.apache.camel.component.aws
 logger.camel-aws.level  = WARN
 logger.camel-aws.appenderRef.file.ref = file
+
+logger.camel-elasticsearch.name = org.apache.camel.component.elasticsearch
+logger.camel-elasticsearch.level  = DEBUG
+logger.camel-elasticsearch.appenderRef.file.ref = file