You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/12 09:52:00 UTC

[2/2] camel git commit: CAMEL-10697 - Workaround KAFKA-1894 by calling the wakeup method

CAMEL-10697 - Workaround KAFKA-1894 by calling the wakeup method


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5a4f641b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5a4f641b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5a4f641b

Branch: refs/heads/camel-2.18.x
Commit: 5a4f641b6c0145e30c104059ff3228cf0bcb3c6d
Parents: 70d4750
Author: Antoine DESSAIGNE <an...@gmail.com>
Authored: Wed Jan 11 17:25:56 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 12 10:51:51 2017 +0100

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java       | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5a4f641b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 66c4335..1614f5e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -43,6 +44,8 @@ public class KafkaConsumer extends DefaultConsumer {
     private final KafkaEndpoint endpoint;
     private final Processor processor;
     private final Long pollTimeoutMs;
+    // This list helps working around the infinite loop of KAFKA-1894
+    private final List<KafkaFetchRecords> tasks = new ArrayList<>();
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -73,7 +76,9 @@ public class KafkaConsumer extends DefaultConsumer {
 
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
-            executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps()));
+            KafkaFetchRecords task = new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps());
+            executor.submit(task);
+            tasks.add(task);
         }
     }
 
@@ -87,7 +92,12 @@ public class KafkaConsumer extends DefaultConsumer {
             } else {
                 executor.shutdownNow();
             }
+            if (!executor.isTerminated()) {
+                tasks.forEach(KafkaFetchRecords::shutdown);
+                executor.shutdownNow();
+            }
         }
+        tasks.clear();
         executor = null;
 
         super.doStop();
@@ -167,6 +177,11 @@ public class KafkaConsumer extends DefaultConsumer {
                 IOHelper.close(consumer);
             }
         }
+
+        private void shutdown() {
+            // As advised in the KAFKA-1894 ticket, calling this wakeup method breaks the infinite loop
+            consumer.wakeup();
+        }
     }
 
 }