You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/26 18:43:34 UTC

[20/20] incubator-ignite git commit: ignite-428 Retry message consuming in case of error

ignite-428 Retry message consuming in case of error


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ad5e99eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ad5e99eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ad5e99eb

Branch: refs/heads/ignite-428
Commit: ad5e99eb7678134ccae8af68db82f05a8b495b42
Parents: 6ca6a0c
Author: agura <ag...@gridgain.com>
Authored: Fri Jun 26 19:39:41 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jun 26 19:39:41 2015 +0300

----------------------------------------------------------------------
 .../ignite/stream/kafka/KafkaStreamer.java      | 37 ++++++++++++++++++--
 1 file changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad5e99eb/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index 5761209..bc618e3 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -39,6 +39,9 @@ import java.util.concurrent.*;
  * Example</a>
  */
 public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+    /** Retry timeout. */
+    private static final int RETRY_TIMEOUT = 10000;
+
     /** Logger. */
     private IgniteLogger log;
 
@@ -63,6 +66,9 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
     /** Kafka consumer connector. */
     private ConsumerConnector consumer;
 
+    /** Stopped. */
+    private volatile boolean stopped;
+
     /**
      * Sets the topic name.
      *
@@ -138,12 +144,37 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
         // Now launch all the consumer threads.
         executor = Executors.newFixedThreadPool(threads);
 
+        stopped = false;
+
         // Now create an object to consume the messages.
         for (final KafkaStream<K, V> stream : streams) {
             executor.submit(new Runnable() {
                 @Override public void run() {
-                    for (MessageAndMetadata<K, V> messageAndMetadata : stream)
-                        getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message());
+                    while (!stopped) {
+                        try {
+                            for (ConsumerIterator<K, V> it = stream.iterator(); it.hasNext() && !stopped; ) {
+                                MessageAndMetadata<K, V> msg = it.next();
+
+                                try {
+                                    getStreamer().addData(msg.key(), msg.message());
+                                }
+                                catch (Exception e) {
+                                    log.warning("Message is ignored due to an error, msg = [" + msg + ']', e);
+                                }
+                            }
+                        }
+                        catch (Exception e) {
+                            log.warning("Message can't be consumed from stream. Retry after " +
+                                RETRY_TIMEOUT + " ms.", e);
+
+                            try {
+                                Thread.sleep(RETRY_TIMEOUT);
+                            }
+                            catch (InterruptedException ie) {
+                                // No-op.
+                            }
+                        }
+                    }
                 }
             });
         }
@@ -153,6 +184,8 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      * Stops streamer.
      */
     public void stop() {
+        stopped = true;
+
         if (consumer != null)
             consumer.shutdown();