You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/11/16 01:38:54 UTC

ignite git commit: IGNITE-4198: Kafka Connect sink option to transform Kafka values. - Fixes #1225.

Repository: ignite
Updated Branches:
  refs/heads/master 085872ec0 -> cbada5964


IGNITE-4198: Kafka Connect sink option to transform Kafka values. - Fixes #1225.

Signed-off-by: shtykh_roman <rs...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cbada596
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cbada596
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cbada596

Branch: refs/heads/master
Commit: cbada5964ee12a8dc44a7db8797f91709b70d831
Parents: 085872e
Author: shtykh_roman <rs...@yahoo.com>
Authored: Wed Nov 16 10:10:43 2016 +0900
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Wed Nov 16 10:10:43 2016 +0900

----------------------------------------------------------------------
 .../kafka/connect/IgniteSinkConstants.java      |  3 +
 .../stream/kafka/connect/IgniteSinkTask.java    | 47 +++++++++--
 .../kafka/connect/IgniteSinkConnectorMock.java  | 30 +++++++
 .../kafka/connect/IgniteSinkConnectorTest.java  | 84 ++++++++++++++++----
 .../kafka/connect/IgniteSinkTaskMock.java       | 29 +++++++
 5 files changed, 171 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
index 7680d96..3fb511a 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
@@ -35,4 +35,7 @@ public class IgniteSinkConstants {
 
     /** Maximum number of parallel stream operations per node. */
     public static final String CACHE_PER_NODE_PAR_OPS = "cachePerNodeParOps";
+
+    /** Class to transform the entry before feeding into cache. */
+    public static final String SINGLE_TUPLE_EXTRACTOR_CLASS = "singleTupleExtractorCls";
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
index 3d9a00d..85971d1 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -46,6 +47,9 @@ public class IgniteSinkTask extends SinkTask {
     /** Cache name. */
     private static String cacheName;
 
+    /** Entry transformer. */
+    private static StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;
+
     /** {@inheritDoc} */
     @Override public String version() {
         return new IgniteSinkConnector().version();
@@ -76,6 +80,22 @@ public class IgniteSinkTask extends SinkTask {
             StreamerContext.getStreamer().perNodeParallelOperations(
                 Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
 
+        if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) {
+            String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
+            if (transformerCls != null && !transformerCls.isEmpty()) {
+                try {
+                    Class<? extends StreamSingleTupleExtractor> clazz =
+                        (Class<? extends StreamSingleTupleExtractor<SinkRecord, Object, Object>>)
+                            Class.forName(transformerCls);
+
+                    extractor = clazz.newInstance();
+                }
+                catch (Exception e) {
+                    throw new ConnectException("Failed to instantiate the provided transformer!", e);
+                }
+            }
+        }
+
         stopped = false;
     }
 
@@ -88,14 +108,19 @@ public class IgniteSinkTask extends SinkTask {
     @Override public void put(Collection<SinkRecord> records) {
         try {
             for (SinkRecord record : records) {
-                if (record.key() != null) {
-                    // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
-                    StreamerContext.getStreamer().addData(record.key(), record.value());
+                // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
+                if (extractor != null) {
+                    Map.Entry<Object, Object> entry = extractor.extract(record);
+                    StreamerContext.getStreamer().addData(entry.getKey(), entry.getValue());
                 }
                 else {
-                    log.error("Failed to stream a record with null key!");
+                    if (record.key() != null) {
+                        StreamerContext.getStreamer().addData(record.key(), record.value());
+                    }
+                    else {
+                        log.error("Failed to stream a record with null key!");
+                    }
                 }
-
             }
         }
         catch (ConnectException e) {
@@ -126,11 +151,21 @@ public class IgniteSinkTask extends SinkTask {
 
         stopped = true;
 
-        StreamerContext.getStreamer().close();
         StreamerContext.getIgnite().close();
     }
 
     /**
+     * Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag.
+     *
+     * @param stopped Stopped flag.
+     */
+    protected static void setStopped(boolean stopped) {
+        IgniteSinkTask.stopped = stopped;
+
+        extractor = null;
+    }
+
+    /**
      * Streamer context initializing grid and data streamer instances on demand.
      */
     public static class StreamerContext {

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
new file mode 100644
index 0000000..4c912b9
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorMock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * Sink connector mock for tests for using the task mock.
+ */
+public class IgniteSinkConnectorMock extends IgniteSinkConnector {
+    /** {@inheritDoc} */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSinkTaskMock.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index efa2fa2..440a7d5 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.stream.kafka.connect;
 
+import java.lang.reflect.Field;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -29,6 +31,7 @@ import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.stream.kafka.TestKafkaBroker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -42,6 +45,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.FutureCallback;
@@ -81,17 +85,11 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
     private Herder herder;
 
     /** Ignite server node. */
-    private Ignite grid;
+    private static Ignite grid;
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected void beforeTest() throws Exception {
-        IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
-
-        cfg.setClientMode(false);
-
-        grid = startGrid("igniteServerNode", cfg);
-
         kafkaBroker = new TestKafkaBroker();
 
         for (String topic : TOPICS)
@@ -117,18 +115,51 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
 
         kafkaBroker.shutdown();
 
+        grid.cache(CACHE_NAME).removeAll();
+
+        // reset cache name to overwrite task configurations.
+        Field field = IgniteSinkTask.class.getDeclaredField("cacheName");
+
+        field.setAccessible(true);
+        field.set(IgniteSinkTask.class, null);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
+
+        cfg.setClientMode(false);
+
+        grid = startGrid("igniteServerNode", cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
     }
 
+    public void testSinkPutsWithoutTransformation() throws Exception {
+        Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+
+        sinkProps.remove(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
+
+        testSinkPuts(sinkProps, false);
+    }
+
+    public void testSinkPutsWithTransformation() throws Exception {
+        testSinkPuts(makeSinkProps(Utils.join(TOPICS, ",")), true);
+    }
+
     /**
      * Tests the whole data flow from injecting data to Kafka to transferring it to the grid. It reads from two
      * specified Kafka topics, because a sink task can read from multiple topics.
      *
+     * @param sinkProps Sink properties.
+     * @param keyless Tests on Kafka stream with null keys if true.
      * @throws Exception Thrown in case of the failure.
      */
-    public void testSinkPuts() throws Exception {
-        Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
-
+    private void testSinkPuts(Map<String, String> sinkProps, boolean keyless) throws Exception {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
             @Override
             public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
@@ -165,7 +196,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
 
         // Produces events for the specified number of topics
         for (String topic : TOPICS)
-            keyValMap.putAll(produceStream(topic));
+            keyValMap.putAll(produceStream(topic, keyless));
 
         // Checks all events successfully processed in 10 seconds.
         assertTrue(latch.await(10, TimeUnit.SECONDS));
@@ -183,9 +214,10 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
      * Sends messages to Kafka.
      *
      * @param topic Topic name.
+     * @param keyless Indicates whether a Kafka key is specified.
      * @return Map of key value messages.
      */
-    private Map<String, String> produceStream(String topic) {
+    private Map<String, String> produceStream(String topic, boolean keyless) {
         List<ProducerRecord<String, String>> messages = new ArrayList<>(EVENT_CNT);
 
         Map<String, String> keyValMap = new HashMap<>();
@@ -193,12 +225,18 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
         for (int evt = 0; evt < EVENT_CNT; evt++) {
             long runtime = System.currentTimeMillis();
 
-            String key = topic + "_" + String.valueOf(evt);
-            String msg = runtime + String.valueOf(evt);
+            String key = null;
+            if (!keyless)
+                key = topic + ":" + String.valueOf(evt);
+
+            String msg = topic + ":" + String.valueOf(evt) + "_" + runtime;
 
             messages.add(new ProducerRecord<>(topic, key, msg));
 
-            keyValMap.put(key, msg);
+            if (!keyless)
+                keyValMap.put(key, msg);
+            else
+                keyValMap.put(topic + ":" + String.valueOf(evt), String.valueOf(runtime));
         }
 
         kafkaBroker.sendMessages(messages);
@@ -218,10 +256,12 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
         props.put(SinkConnector.TOPICS_CONFIG, topics);
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
         props.put(ConnectorConfig.NAME_CONFIG, "test-sink-connector");
-        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnectorMock.class.getName());
         props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
         props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");
         props.put(IgniteSinkConstants.CACHE_CFG_PATH, "example-ignite.xml");
+        props.put(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS,
+            "org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest$TestExtractor");
 
         return props;
     }
@@ -249,4 +289,16 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
 
         return props;
     }
+
+    /**
+     * Test transformer.
+     */
+    static class TestExtractor implements StreamSingleTupleExtractor<SinkRecord, String, String> {
+
+        /** {@inheritDoc} */
+        @Override public Map.Entry<String, String> extract(SinkRecord msg) {
+            String[] parts = ((String)msg.value()).split("_");
+            return new AbstractMap.SimpleEntry<String, String>(parts[0], parts[1]);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbada596/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
new file mode 100644
index 0000000..58a59b5
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTaskMock.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink task mock for tests. It avoids closing the grid from test to test.
+ */
+public class IgniteSinkTaskMock extends IgniteSinkTask {
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // Don't stop the grid for tests.
+        setStopped(true);
+    }
+}