You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/11 14:13:09 UTC
[camel-kafka-connector] branch master updated: Adds support for
ElasticSearch and include integration tests
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 729cc55 Adds support for ElasticSearch and include integration tests
new 4808e77 Merge pull request #36 from orpiske/elasticsearch
729cc55 is described below
commit 729cc5538561ca1b4540b6de5a38eec1bbd6c779
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Dec 4 17:20:15 2019 +0100
Adds support for ElasticSearch and include integration tests
---
core/pom.xml | 4 +
parent/pom.xml | 14 +-
tests/pom.xml | 7 +
.../apache/camel/kafkaconnector/TestCommon.java | 10 ++
.../clients/elasticsearch/ElasticSearchClient.java | 173 +++++++++++++++++++++
.../CamelElasticSearchIndexPropertyFactory.java | 56 +++++++
.../CamelElasticSearchPropertyFactory.java | 75 +++++++++
.../CamelSinkElasticSearchITCase.java | 146 +++++++++++++++++
.../ConnectRecordValueToMapTransformer.java | 70 +++++++++
tests/src/test/resources/log4j2.properties | 4 +
10 files changed, 558 insertions(+), 1 deletion(-)
diff --git a/core/pom.xml b/core/pom.xml
index 1763235..ab2e4d8 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -90,6 +90,10 @@
<artifactId>camel-http</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-elasticsearch-rest</artifactId>
+ </dependency>
<!-- Kafka -->
<dependency>
diff --git a/parent/pom.xml b/parent/pom.xml
index b719242..ce526c8 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -149,6 +149,13 @@
<artifactId>camel-http</artifactId>
<version>${camel.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-elasticsearch-rest</artifactId>
+ <version>${camel.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
@@ -241,6 +248,12 @@
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${version.testcontainers}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -254,7 +267,6 @@
<version>${camel.version}</version>
<scope>test</scope>
</dependency>
-
</dependencies>
</dependencyManagement>
<build>
diff --git a/tests/pom.xml b/tests/pom.xml
index 2255c52..b40be9f 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -136,6 +136,13 @@
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java b/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java
index 706ceb3..05719e6 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java
@@ -53,6 +53,16 @@ public final class TestCommon {
*/
public static final String DEFAULT_KINESIS_STREAM = "ckc-kin-stream";
+ /**
+ * The default ElasticSearch cluster name for usage during the tests
+ */
+ public static final String DEFAULT_ELASTICSEARCH_CLUSTER = "docker-cluster";
+
+ /**
+ * The default ElasticSearch index for usage during the tests
+ */
+ public static final String DEFAULT_ELASTICSEARCH_INDEX = "ckc-index";
+
private static final Logger LOG = LoggerFactory.getLogger(TestCommon.class);
private TestCommon() {
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java
new file mode 100644
index 0000000..93ccbbb
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.clients.elasticsearch;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.function.Predicate;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClient.class);
+
+ private final RestHighLevelClient client;
+ private final String index;
+
+ public ElasticSearchClient(int port, String index) {
+ client = new RestHighLevelClient(
+ RestClient.builder(
+ new HttpHost("localhost", port, "http")));
+
+ this.index = index;
+ }
+
+ public boolean indexExists() {
+ try {
+ GetIndexRequest indexRequest = new GetIndexRequest(index);
+
+ return client.indices().exists(indexRequest, RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ /*
+ It may return if failed to parse the response, on timeout or no response from the ES instance.
+ Assuming it is more likely to timeout or provide no reply either the during the start up or
+ on overloaded CI environments, we log the I/O error and try again
+ */
+ LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e);
+ }
+
+ return false;
+ }
+
+ public SearchHits getData() {
+ try {
+ SearchRequest searchRequest = new SearchRequest(index);
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+ searchSourceBuilder.query(QueryBuilders.matchAllQuery());
+
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+
+ return response.getHits();
+
+ } catch (IOException e) {
+ /*
+ It may return if failed to parse the response, on timeout or no response from the ES instance.
+ Assuming it is more likely to timeout or provide no reply either the during the start up or
+ on overloaded CI environments, we log the I/O error and try again
+ */
+ LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e);
+ } catch (Throwable e) {
+ LOG.error("Unhandled error trying to query for index existence: {}", e.getMessage(), e);
+ }
+
+ return null;
+ }
+
+ private boolean hasData(int expect) {
+ SearchHits searchHits = getData();
+
+ if (searchHits == null) {
+ LOG.debug("There are not search hit to return");
+
+ return false;
+ }
+
+ SearchHit[] hits = searchHits.getHits();
+ if (hits == null) {
+ LOG.debug("Empty data set");
+
+ return false;
+ }
+
+ int count = hits.length;
+
+ if (count != expect) {
+ LOG.debug("Not enough records: {} available, but {} expected", count, expect);
+
+ return false;
+ }
+
+ return true;
+ }
+
+ private <T> void waitFor(Predicate<T> resourceCheck, T payload) {
+ boolean state;
+ int retries = 30;
+ int waitTime = 1000;
+ do {
+ try {
+ state = resourceCheck.test(payload);
+
+ if (!state) {
+ LOG.debug("The resource is not yet available. Waiting {} seconds before retrying",
+ TimeUnit.MILLISECONDS.toSeconds(waitTime));
+ retries--;
+ Thread.sleep(waitTime);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ } while (!state && retries > 0);
+ }
+
+ private void waitFor(BooleanSupplier resourceCheck) {
+ boolean state;
+ int retries = 30;
+ int waitTime = 1000;
+ do {
+ try {
+ state = resourceCheck.getAsBoolean();
+
+ if (!state) {
+ LOG.debug("The resource is not yet available. Waiting {} seconds before retrying",
+ TimeUnit.MILLISECONDS.toSeconds(waitTime));
+ retries--;
+ Thread.sleep(waitTime);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ } while (!state && retries > 0);
+ }
+
+ public void waitForIndex() {
+ waitFor(this::indexExists);
+ }
+
+ public void waitForData(int expect) {
+ waitFor(this::hasData, expect);
+ }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java
new file mode 100644
index 0000000..500a0bc
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sink.elasticsearch;
+
+import java.util.Properties;
+
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+public class CamelElasticSearchIndexPropertyFactory extends CamelElasticSearchPropertyFactory {
+ private final String index;
+ private final String transformerKey;
+
+
+ CamelElasticSearchIndexPropertyFactory(int tasksMax, String topic, String clusterName, String hostAddress,
+ String index, String transformerKey) {
+ super(tasksMax, topic, clusterName, hostAddress, index);
+ this.index = index;
+ this.transformerKey = transformerKey;
+ }
+
+
+ @Override
+ public Properties getProperties() {
+ Properties connectorProps = super.getProperties();
+
+ connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG, "ElasticSearchTransformer");
+ connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG + ".ElasticSearchTransformer.type",
+ "org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer");
+ connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG + ".ElasticSearchTransformer.key",
+ transformerKey);
+
+ String queueUrl = "elasticsearch-rest://" + getClusterName() + "?hostAddresses=" + getHostAddress()
+ + "&operation=Index"
+ + "&indexName=" + index;
+
+ connectorProps.put("camel.sink.url", queueUrl);
+
+
+ return connectorProps;
+ }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java
new file mode 100644
index 0000000..b5bb19e
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sink.elasticsearch;
+
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+public class CamelElasticSearchPropertyFactory implements ConnectorPropertyFactory {
+ private final int tasksMax;
+ private final String topic;
+ private final String clusterName;
+ private final String hostAddress;
+ private final String index;
+
+
+ CamelElasticSearchPropertyFactory(int tasksMax, String topic, String clusterName, String hostAddress, String index) {
+ this.tasksMax = tasksMax;
+ this.topic = topic;
+ this.clusterName = clusterName;
+ this.hostAddress = hostAddress;
+ this.index = index;
+ }
+
+ protected int getTasksMax() {
+ return tasksMax;
+ }
+
+ protected String getTopic() {
+ return topic;
+ }
+
+ protected String getClusterName() {
+ return clusterName;
+ }
+
+ protected String getHostAddress() {
+ return hostAddress;
+ }
+
+ protected String getIndex() {
+ return index;
+ }
+
+ @Override
+ public Properties getProperties() {
+ Properties connectorProps = new Properties();
+ connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelElasticSearchSinkConnector");
+ connectorProps.put("tasks.max", String.valueOf(tasksMax));
+
+ connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
+ connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+
+ connectorProps.put("topics", topic);
+
+ return connectorProps;
+ }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java
new file mode 100644
index 0000000..600c00e
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sink.elasticsearch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.ContainerUtil;
+import org.apache.camel.kafkaconnector.KafkaConnectRunner;
+import org.apache.camel.kafkaconnector.TestCommon;
+import org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient;
+import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import static org.junit.Assert.fail;
+
+public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);
+ // This is required in order to use the Open Source one by default
+ private static final String ELASTIC_SEARCH_CONTAINER = "docker.elastic.co/elasticsearch/elasticsearch-oss:7.3.2";
+
+ private static final int ELASTIC_SEARCH_PORT = 9200;
+
+ @Rule
+ public ElasticsearchContainer elasticsearch = new ElasticsearchContainer(ELASTIC_SEARCH_CONTAINER);
+
+ private KafkaConnectRunner kafkaConnectRunner;
+ private ElasticSearchClient client;
+
+ private final int expect = 10;
+ private int received;
+ private final String transformKey = "index-test";
+
+ @Before
+ public void setUp() {
+ ContainerUtil.waitForHttpInitialization(elasticsearch, elasticsearch.getMappedPort(ELASTIC_SEARCH_PORT));
+
+ final String elasticSearchInstance = elasticsearch
+ .getHttpHostAddress();
+
+ LOG.info("ElasticSearch instance running at {}", elasticSearchInstance);
+
+ String topic = TestCommon.getDefaultTestTopic(this.getClass());
+ CamelElasticSearchPropertyFactory testProperties = new CamelElasticSearchIndexPropertyFactory(1, topic,
+ TestCommon.DEFAULT_ELASTICSEARCH_CLUSTER,
+ elasticSearchInstance, TestCommon.DEFAULT_ELASTICSEARCH_INDEX, transformKey);
+
+ kafkaConnectRunner = getKafkaConnectRunner();
+ kafkaConnectRunner.getConnectorPropertyProducers().add(testProperties);
+
+ client = new ElasticSearchClient(elasticsearch.getMappedPort(ELASTIC_SEARCH_PORT),
+ TestCommon.DEFAULT_ELASTICSEARCH_INDEX);
+ }
+
+ private void putRecords(CountDownLatch latch) {
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+ try {
+ for (int i = 0; i < expect; i++) {
+ try {
+ kafkaClient.produce(TestCommon.getDefaultTestTopic(this.getClass()), "test");
+ } catch (ExecutionException e) {
+ LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ } finally {
+ latch.countDown();
+ }
+
+ }
+
+ private void verifyHit(SearchHit searchHit) {
+ String source = searchHit.getSourceAsString();
+
+ Assert.assertTrue(source != null);
+ Assert.assertFalse(source.isEmpty());
+
+ // TODO: this is not enough, we need to parse the json and check the key itself
+ Assert.assertTrue(source.contains(transformKey));
+
+ LOG.debug("Search hit: {} ", searchHit.getSourceAsString());
+ received++;
+ }
+
+
+
+ @Test(timeout = 90000)
+ public void testIndexOperation() {
+ try {
+ CountDownLatch latch = new CountDownLatch(2);
+ ExecutorService service = Executors.newCachedThreadPool();
+ service.submit(() -> kafkaConnectRunner.run(latch));
+ service.submit(() -> putRecords(latch));
+
+ latch.await(30, TimeUnit.SECONDS);
+
+ LOG.debug("Waiting for indices");
+
+ client.waitForIndex();
+
+ LOG.debug("Waiting for data");
+ client.waitForData(expect);
+
+ SearchHits hits = client.getData();
+
+ hits.forEach(this::verifyHit);
+ Assert.assertEquals("Did not receive the same amount of messages sent", expect, received);
+
+ LOG.debug("Created the consumer ... About to receive messages");
+ } catch (Exception e) {
+ LOG.error("ElasticSearch test failed: {}", e.getMessage(), e);
+ fail(e.getMessage());
+ } finally {
+ kafkaConnectRunner.stop();
+ }
+ }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java
new file mode 100644
index 0000000..44ef5ac
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sink.elasticsearch.transforms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+public class ConnectRecordValueToMapTransformer<R extends ConnectRecord<R>> implements Transformation<R> {
+ public static final String FIELD_KEY_CONFIG = "key";
+
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+ "Transforms String-based content from Kafka into a map");
+
+ private String key;
+
+ @Override
+ public R apply(R r) {
+ Map<String, Object> targetMap = new HashMap<>();
+
+ Object value = r.value();
+
+ targetMap.put(key, value);
+ return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+ SchemaHelper.buildSchemaBuilderForType(value), targetMap, r.timestamp());
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+ final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map);
+
+ this.key = config.getString(FIELD_KEY_CONFIG);
+
+ if (this.key == null) {
+ throw new ConfigException("The ElasticSearch transformer requires a 'key'");
+ }
+ }
+}
diff --git a/tests/src/test/resources/log4j2.properties b/tests/src/test/resources/log4j2.properties
index eefb917..b3e5e53 100644
--- a/tests/src/test/resources/log4j2.properties
+++ b/tests/src/test/resources/log4j2.properties
@@ -35,3 +35,7 @@ logger.reflections.appenderRef.file.ref = file
logger.camel-aws.name = org.apache.camel.component.aws
logger.camel-aws.level = WARN
logger.camel-aws.appenderRef.file.ref = file
+
+logger.camel-elasticsearch.name = org.apache.camel.component.elasticsearch
+logger.camel-elasticsearch.level = DEBUG
+logger.camel-elasticsearch.appenderRef.file.ref = file