You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/11/16 01:38:54 UTC
ignite git commit: IGNITE-4198: Kafka Connect sink option to
transform Kafka values. - Fixes #1225.
Repository: ignite
Updated Branches:
refs/heads/master 085872ec0 -> cbada5964
IGNITE-4198: Kafka Connect sink option to transform Kafka values. - Fixes #1225.
Signed-off-by: shtykh_roman <rs...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cbada596
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cbada596
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cbada596
Branch: refs/heads/master
Commit: cbada5964ee12a8dc44a7db8797f91709b70d831
Parents: 085872e
Author: shtykh_roman <rs...@yahoo.com>
Authored: Wed Nov 16 10:10:43 2016 +0900
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Wed Nov 16 10:10:43 2016 +0900
----------------------------------------------------------------------
.../kafka/connect/IgniteSinkConstants.java | 3 +
.../stream/kafka/connect/IgniteSinkTask.java | 47 +++++++++--
.../kafka/connect/IgniteSinkConnectorMock.java | 30 +++++++
.../kafka/connect/IgniteSinkConnectorTest.java | 84 ++++++++++++++++----
.../kafka/connect/IgniteSinkTaskMock.java | 29 +++++++
5 files changed, 171 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
index 7680d96..3fb511a 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
@@ -35,4 +35,7 @@ public class IgniteSinkConstants {
/** Maximum number of parallel stream operations per node. */
public static final String CACHE_PER_NODE_PAR_OPS = "cachePerNodeParOps";
+
+ /** Class to transform the entry before feeding into cache. */
+ public static final String SINGLE_TUPLE_EXTRACTOR_CLASS = "singleTupleExtractorCls";
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
index 3d9a00d..85971d1 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
@@ -46,6 +47,9 @@ public class IgniteSinkTask extends SinkTask {
/** Cache name. */
private static String cacheName;
+ /** Entry transformer. */
+ private static StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;
+
/** {@inheritDoc} */
@Override public String version() {
return new IgniteSinkConnector().version();
@@ -76,6 +80,22 @@ public class IgniteSinkTask extends SinkTask {
StreamerContext.getStreamer().perNodeParallelOperations(
Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+ if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) {
+ String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
+ if (transformerCls != null && !transformerCls.isEmpty()) {
+ try {
+ Class<? extends StreamSingleTupleExtractor> clazz =
+ (Class<? extends StreamSingleTupleExtractor<SinkRecord, Object, Object>>)
+ Class.forName(transformerCls);
+
+ extractor = clazz.newInstance();
+ }
+ catch (Exception e) {
+ throw new ConnectException("Failed to instantiate the provided transformer!", e);
+ }
+ }
+ }
+
stopped = false;
}
@@ -88,14 +108,19 @@ public class IgniteSinkTask extends SinkTask {
@Override public void put(Collection<SinkRecord> records) {
try {
for (SinkRecord record : records) {
- if (record.key() != null) {
- // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
- StreamerContext.getStreamer().addData(record.key(), record.value());
+ // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
+ if (extractor != null) {
+ Map.Entry<Object, Object> entry = extractor.extract(record);
+ StreamerContext.getStreamer().addData(entry.getKey(), entry.getValue());
}
else {
- log.error("Failed to stream a record with null key!");
+ if (record.key() != null) {
+ StreamerContext.getStreamer().addData(record.key(), record.value());
+ }
+ else {
+ log.error("Failed to stream a record with null key!");
+ }
}
-
}
}
catch (ConnectException e) {
@@ -126,11 +151,21 @@ public class IgniteSinkTask extends SinkTask {
stopped = true;
- StreamerContext.getStreamer().close();
StreamerContext.getIgnite().close();
}
/**
+ * Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag.
+ *
+ * @param stopped Stopped flag.
+ */
+ protected static void setStopped(boolean stopped) {
+ IgniteSinkTask.stopped = stopped;
+
+ extractor = null;
+ }
+
+ /**
* Streamer context initializing grid and data streamer instances on demand.
*/
public static class StreamerContext {
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
new file mode 100644
index 0000000..4c912b9
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * Sink connector mock for tests for using the task mock.
+ */
+public class IgniteSinkConnectorMock extends IgniteSinkConnector {
+ /** {@inheritDoc} */
+ @Override public Class<? extends Task> taskClass() {
+ return IgniteSinkTaskMock.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index efa2fa2..440a7d5 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.stream.kafka.connect;
+import java.lang.reflect.Field;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -29,6 +31,7 @@ import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.stream.kafka.TestKafkaBroker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -42,6 +45,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.FutureCallback;
@@ -81,17 +85,11 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
private Herder herder;
/** Ignite server node. */
- private Ignite grid;
+ private static Ignite grid;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected void beforeTest() throws Exception {
- IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
-
- cfg.setClientMode(false);
-
- grid = startGrid("igniteServerNode", cfg);
-
kafkaBroker = new TestKafkaBroker();
for (String topic : TOPICS)
@@ -117,18 +115,51 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
kafkaBroker.shutdown();
+ grid.cache(CACHE_NAME).removeAll();
+
+ // reset cache name to overwrite task configurations.
+ Field field = IgniteSinkTask.class.getDeclaredField("cacheName");
+
+ field.setAccessible(true);
+ field.set(IgniteSinkTask.class, null);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTestsStarted() throws Exception {
+ IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
+
+ cfg.setClientMode(false);
+
+ grid = startGrid("igniteServerNode", cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
}
+ public void testSinkPutsWithoutTransformation() throws Exception {
+ Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+
+ sinkProps.remove(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
+
+ testSinkPuts(sinkProps, false);
+ }
+
+ public void testSinkPutsWithTransformation() throws Exception {
+ testSinkPuts(makeSinkProps(Utils.join(TOPICS, ",")), true);
+ }
+
/**
* Tests the whole data flow from injecting data to Kafka to transferring it to the grid. It reads from two
* specified Kafka topics, because a sink task can read from multiple topics.
*
+ * @param sinkProps Sink properties.
+ * @param keyless Tests on Kafka stream with null keys if true.
* @throws Exception Thrown in case of the failure.
*/
- public void testSinkPuts() throws Exception {
- Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
-
+ private void testSinkPuts(Map<String, String> sinkProps, boolean keyless) throws Exception {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
@Override
public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
@@ -165,7 +196,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
// Produces events for the specified number of topics
for (String topic : TOPICS)
- keyValMap.putAll(produceStream(topic));
+ keyValMap.putAll(produceStream(topic, keyless));
// Checks all events successfully processed in 10 seconds.
assertTrue(latch.await(10, TimeUnit.SECONDS));
@@ -183,9 +214,10 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
* Sends messages to Kafka.
*
* @param topic Topic name.
+ * @param keyless Indicates whether a Kafka key is specified.
* @return Map of key value messages.
*/
- private Map<String, String> produceStream(String topic) {
+ private Map<String, String> produceStream(String topic, boolean keyless) {
List<ProducerRecord<String, String>> messages = new ArrayList<>(EVENT_CNT);
Map<String, String> keyValMap = new HashMap<>();
@@ -193,12 +225,18 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
for (int evt = 0; evt < EVENT_CNT; evt++) {
long runtime = System.currentTimeMillis();
- String key = topic + "_" + String.valueOf(evt);
- String msg = runtime + String.valueOf(evt);
+ String key = null;
+ if (!keyless)
+ key = topic + ":" + String.valueOf(evt);
+
+ String msg = topic + ":" + String.valueOf(evt) + "_" + runtime;
messages.add(new ProducerRecord<>(topic, key, msg));
- keyValMap.put(key, msg);
+ if (!keyless)
+ keyValMap.put(key, msg);
+ else
+ keyValMap.put(topic + ":" + String.valueOf(evt), String.valueOf(runtime));
}
kafkaBroker.sendMessages(messages);
@@ -218,10 +256,12 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
props.put(SinkConnector.TOPICS_CONFIG, topics);
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
props.put(ConnectorConfig.NAME_CONFIG, "test-sink-connector");
- props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnectorMock.class.getName());
props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");
props.put(IgniteSinkConstants.CACHE_CFG_PATH, "example-ignite.xml");
+ props.put(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS,
+ "org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest$TestExtractor");
return props;
}
@@ -249,4 +289,16 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
return props;
}
+
+ /**
+ * Test transformer.
+ */
+ static class TestExtractor implements StreamSingleTupleExtractor<SinkRecord, String, String> {
+
+ /** {@inheritDoc} */
+ @Override public Map.Entry<String, String> extract(SinkRecord msg) {
+ String[] parts = ((String)msg.value()).split("_");
+ return new AbstractMap.SimpleEntry<String, String>(parts[0], parts[1]);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
new file mode 100644
index 0000000..58a59b5
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink task mock for tests. It avoids closing the grid from test to test.
+ */
+public class IgniteSinkTaskMock extends IgniteSinkTask {
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // Don't stop the grid for tests.
+ setStopped(true);
+ }
+}