You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/12/07 09:06:50 UTC

ignite git commit: IGNITE-4140 KafkaStreamer should use tuple extractor instead of decoders

Repository: ignite
Updated Branches:
  refs/heads/master ca8ab2d55 -> dfb44ba2d


IGNITE-4140 KafkaStreamer should use tuple extractor instead of decoders


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfb44ba2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfb44ba2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfb44ba2

Branch: refs/heads/master
Commit: dfb44ba2dca0cec44568239e318cf6863ed0c16e
Parents: ca8ab2d
Author: Anil <an...@anilkd-t450.jnpr.net>
Authored: Wed Dec 7 12:06:38 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Dec 7 12:06:38 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/stream/StreamAdapter.java |  4 +-
 .../ignite/stream/kafka/KafkaStreamer.java      | 48 +++++---------------
 .../kafka/KafkaIgniteStreamerSelfTest.java      | 36 +++++++++++----
 3 files changed, 40 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index cb9566b..3f1dfad 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -179,8 +179,8 @@ public abstract class StreamAdapter<T, K, V> {
 
         } else {
             Map<K, V> m = multipleTupleExtractor.extract(msg);
-
-            if (m != null)
+            
+            if (m != null && !m.isEmpty())
                 stmr.addData(m);
 
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index f46ee93..5767790 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -28,7 +28,6 @@ import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
-import kafka.serializer.Decoder;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -45,7 +44,7 @@ import org.apache.ignite.stream.StreamAdapter;
  * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group
  * Example</a>
  */
-public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+public class KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[], byte[]>, K, V> {
     /** Retry timeout. */
     private static final long DFLT_RETRY_TIMEOUT = 10000;
 
@@ -64,12 +63,6 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
     /** Kafka consumer config. */
     private ConsumerConfig consumerCfg;
 
-    /** Key decoder. */
-    private Decoder<K> keyDecoder;
-
-    /** Value decoder. */
-    private Decoder<V> valDecoder;
-
     /** Kafka consumer connector. */
     private ConsumerConnector consumer;
 
@@ -107,24 +100,6 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
     }
 
     /**
-     * Sets the key decoder.
-     *
-     * @param keyDecoder Key decoder.
-     */
-    public void setKeyDecoder(Decoder<K> keyDecoder) {
-        this.keyDecoder = keyDecoder;
-    }
-
-    /**
-     * Sets the value decoder.
-     *
-     * @param valDecoder Value decoder.
-     */
-    public void setValueDecoder(Decoder<V> valDecoder) {
-        this.valDecoder = valDecoder;
-    }
-
-    /**
      * Sets the retry timeout.
      *
      * @param retryTimeout Retry timeout.
@@ -144,10 +119,10 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
         A.notNull(getStreamer(), "streamer");
         A.notNull(getIgnite(), "ignite");
         A.notNull(topic, "topic");
-        A.notNull(keyDecoder, "key decoder");
-        A.notNull(valDecoder, "value decoder");
         A.notNull(consumerCfg, "kafka consumer config");
         A.ensure(threads > 0, "threads > 0");
+        A.ensure(null != getSingleTupleExtractor() || null != getMultipleTupleExtractor(),
+            "Extractor must be configured");
 
         log = getIgnite().log();
 
@@ -157,10 +132,9 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
 
         topicCntMap.put(topic, threads);
 
-        Map<String, List<KafkaStream<K, V>>> consumerMap =
-            consumer.createMessageStreams(topicCntMap, keyDecoder, valDecoder);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCntMap);
 
-        List<KafkaStream<K, V>> streams = consumerMap.get(topic);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
         // Now launch all the consumer threads.
         executor = Executors.newFixedThreadPool(threads);
@@ -168,16 +142,18 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
         stopped = false;
 
         // Now create an object to consume the messages.
-        for (final KafkaStream<K, V> stream : streams) {
+        for (final KafkaStream<byte[], byte[]> stream : streams) {
             executor.submit(new Runnable() {
                 @Override public void run() {
                     while (!stopped) {
                         try {
-                            for (ConsumerIterator<K, V> it = stream.iterator(); it.hasNext() && !stopped; ) {
-                                MessageAndMetadata<K, V> msg = it.next();
+                            MessageAndMetadata<byte[], byte[]> msg;
+
+                            for (ConsumerIterator<byte[], byte[]> it = stream.iterator(); it.hasNext() && !stopped; ) {
+                                msg = it.next();
 
                                 try {
-                                    getStreamer().addData(msg.key(), msg.message());
+                                    addMessage(msg);
                                 }
                                 catch (Exception e) {
                                     U.error(log, "Message is ignored due to an error [msg=" + msg + ']', e);
@@ -224,4 +200,4 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 4918f87..102b647 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -28,14 +28,14 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import kafka.consumer.ConsumerConfig;
-import kafka.serializer.StringDecoder;
-import kafka.utils.VerifiableProperties;
+import kafka.message.MessageAndMetadata;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
@@ -146,7 +146,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
      */
     private void consumerStream(String topic, Map<String, String> keyValMap)
         throws TimeoutException, InterruptedException {
-        KafkaStreamer<String, String, String> kafkaStmr = null;
+        KafkaStreamer<String, String> kafkaStmr = null;
 
         Ignite ignite = grid();
 
@@ -173,13 +173,29 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
             kafkaStmr.setThreads(4);
 
             // Set the consumer configuration.
-            kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
-
-            // Set the decoders.
-            StringDecoder strDecoder = new StringDecoder(new VerifiableProperties());
-
-            kafkaStmr.setKeyDecoder(strDecoder);
-            kafkaStmr.setValueDecoder(strDecoder);
+            kafkaStmr.setConsumerConfig(
+                createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
+
+            kafkaStmr.setMultipleTupleExtractor(
+                new StreamMultipleTupleExtractor<MessageAndMetadata<byte[], byte[]>, String, String>() {
+                @Override public Map<String, String> extract(MessageAndMetadata<byte[], byte[]> msg) {
+                    Map<String, String> entries = new HashMap<>();
+
+                    try {
+                        String key = new String(msg.key());
+                        String val = new String(msg.message());
+
+                        // Convert the message into number of cache entries with same key or dynamic key from actual message.
+                        // For now using key as cache entry key and value as cache entry value - for test purpose.
+                        entries.put(key, val);
+                    }
+                    catch (Exception ex) {
+                        fail("Unexpected error." + ex);
+                    }
+
+                    return entries;
+                }
+            });
 
             // Start kafka streamer.
             kafkaStmr.start();