You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/08 13:16:54 UTC
[09/28] ignite git commit: IGNITE-4140 KafkaStreamer should use tuple
extractor instead of decoders
IGNITE-4140 KafkaStreamer should use tuple extractor instead of decoders
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfb44ba2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfb44ba2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfb44ba2
Branch: refs/heads/ignite-4371
Commit: dfb44ba2dca0cec44568239e318cf6863ed0c16e
Parents: ca8ab2d
Author: Anil <an...@anilkd-t450.jnpr.net>
Authored: Wed Dec 7 12:06:38 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Dec 7 12:06:38 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/stream/StreamAdapter.java | 4 +-
.../ignite/stream/kafka/KafkaStreamer.java | 48 +++++---------------
.../kafka/KafkaIgniteStreamerSelfTest.java | 36 +++++++++++----
3 files changed, 40 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index cb9566b..3f1dfad 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -179,8 +179,8 @@ public abstract class StreamAdapter<T, K, V> {
} else {
Map<K, V> m = multipleTupleExtractor.extract(msg);
-
- if (m != null)
+
+ if (m != null && !m.isEmpty())
stmr.addData(m);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index f46ee93..5767790 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -28,7 +28,6 @@ import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
-import kafka.serializer.Decoder;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -45,7 +44,7 @@ import org.apache.ignite.stream.StreamAdapter;
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group
* Example</a>
*/
-public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+public class KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[], byte[]>, K, V> {
/** Retry timeout. */
private static final long DFLT_RETRY_TIMEOUT = 10000;
@@ -64,12 +63,6 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
/** Kafka consumer config. */
private ConsumerConfig consumerCfg;
- /** Key decoder. */
- private Decoder<K> keyDecoder;
-
- /** Value decoder. */
- private Decoder<V> valDecoder;
-
/** Kafka consumer connector. */
private ConsumerConnector consumer;
@@ -107,24 +100,6 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
}
/**
- * Sets the key decoder.
- *
- * @param keyDecoder Key decoder.
- */
- public void setKeyDecoder(Decoder<K> keyDecoder) {
- this.keyDecoder = keyDecoder;
- }
-
- /**
- * Sets the value decoder.
- *
- * @param valDecoder Value decoder.
- */
- public void setValueDecoder(Decoder<V> valDecoder) {
- this.valDecoder = valDecoder;
- }
-
- /**
* Sets the retry timeout.
*
* @param retryTimeout Retry timeout.
@@ -144,10 +119,10 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
A.notNull(topic, "topic");
- A.notNull(keyDecoder, "key decoder");
- A.notNull(valDecoder, "value decoder");
A.notNull(consumerCfg, "kafka consumer config");
A.ensure(threads > 0, "threads > 0");
+ A.ensure(null != getSingleTupleExtractor() || null != getMultipleTupleExtractor(),
+ "Extractor must be configured");
log = getIgnite().log();
@@ -157,10 +132,9 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
topicCntMap.put(topic, threads);
- Map<String, List<KafkaStream<K, V>>> consumerMap =
- consumer.createMessageStreams(topicCntMap, keyDecoder, valDecoder);
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCntMap);
- List<KafkaStream<K, V>> streams = consumerMap.get(topic);
+ List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// Now launch all the consumer threads.
executor = Executors.newFixedThreadPool(threads);
@@ -168,16 +142,18 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
stopped = false;
// Now create an object to consume the messages.
- for (final KafkaStream<K, V> stream : streams) {
+ for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new Runnable() {
@Override public void run() {
while (!stopped) {
try {
- for (ConsumerIterator<K, V> it = stream.iterator(); it.hasNext() && !stopped; ) {
- MessageAndMetadata<K, V> msg = it.next();
+ MessageAndMetadata<byte[], byte[]> msg;
+
+ for (ConsumerIterator<byte[], byte[]> it = stream.iterator(); it.hasNext() && !stopped; ) {
+ msg = it.next();
try {
- getStreamer().addData(msg.key(), msg.message());
+ addMessage(msg);
}
catch (Exception e) {
U.error(log, "Message is ignored due to an error [msg=" + msg + ']', e);
@@ -224,4 +200,4 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
}
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 4918f87..102b647 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -28,14 +28,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.ConsumerConfig;
-import kafka.serializer.StringDecoder;
-import kafka.utils.VerifiableProperties;
+import kafka.message.MessageAndMetadata;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -146,7 +146,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
*/
private void consumerStream(String topic, Map<String, String> keyValMap)
throws TimeoutException, InterruptedException {
- KafkaStreamer<String, String, String> kafkaStmr = null;
+ KafkaStreamer<String, String> kafkaStmr = null;
Ignite ignite = grid();
@@ -173,13 +173,29 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
kafkaStmr.setThreads(4);
// Set the consumer configuration.
- kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
-
- // Set the decoders.
- StringDecoder strDecoder = new StringDecoder(new VerifiableProperties());
-
- kafkaStmr.setKeyDecoder(strDecoder);
- kafkaStmr.setValueDecoder(strDecoder);
+ kafkaStmr.setConsumerConfig(
+ createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
+
+ kafkaStmr.setMultipleTupleExtractor(
+ new StreamMultipleTupleExtractor<MessageAndMetadata<byte[], byte[]>, String, String>() {
+ @Override public Map<String, String> extract(MessageAndMetadata<byte[], byte[]> msg) {
+ Map<String, String> entries = new HashMap<>();
+
+ try {
+ String key = new String(msg.key());
+ String val = new String(msg.message());
+
+ // Convert the message into number of cache entries with same key or dynamic key from actual message.
+ // For now using key as cache entry key and value as cache entry value - for test purpose.
+ entries.put(key, val);
+ }
+ catch (Exception ex) {
+ fail("Unexpected error." + ex);
+ }
+
+ return entries;
+ }
+ });
// Start kafka streamer.
kafkaStmr.start();