You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/12 09:51:59 UTC
[1/2] camel git commit: CAMEL-10697 - Workaround KAFKA-1894 by
calling the wakeup method
Repository: camel
Updated Branches:
refs/heads/camel-2.18.x 70d4750fb -> 5a4f641b6
refs/heads/master 312b57d14 -> d22d0ca06
CAMEL-10697 - Workaround KAFKA-1894 by calling the wakeup method
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d22d0ca0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d22d0ca0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d22d0ca0
Branch: refs/heads/master
Commit: d22d0ca0607e0013d4be1dd3385959ac208ae1b8
Parents: 312b57d
Author: Antoine DESSAIGNE <an...@gmail.com>
Authored: Wed Jan 11 17:25:56 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 12 10:47:54 2017 +0100
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d22d0ca0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 31a4180..4362390 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.kafka;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -45,6 +46,8 @@ public class KafkaConsumer extends DefaultConsumer {
private final KafkaEndpoint endpoint;
private final Processor processor;
private final Long pollTimeoutMs;
+ // This list helps working around the infinite loop of KAFKA-1894
+ private final List<KafkaFetchRecords> tasks = new ArrayList<>();
public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -75,7 +78,9 @@ public class KafkaConsumer extends DefaultConsumer {
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
- executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps()));
+ KafkaFetchRecords task = new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps());
+ executor.submit(task);
+ tasks.add(task);
}
}
@@ -89,7 +94,12 @@ public class KafkaConsumer extends DefaultConsumer {
} else {
executor.shutdownNow();
}
+ if (!executor.isTerminated()) {
+ tasks.forEach(KafkaFetchRecords::shutdown);
+ executor.shutdownNow();
+ }
}
+ tasks.clear();
executor = null;
super.doStop();
@@ -195,6 +205,11 @@ public class KafkaConsumer extends DefaultConsumer {
IOHelper.close(consumer);
}
}
+
+ private void shutdown() {
+ // As advised in the KAFKA-1894 ticket, calling this wakeup method breaks the infinite loop
+ consumer.wakeup();
+ }
}
protected String serializeOffsetKey(TopicPartition topicPartition) {
[2/2] camel git commit: CAMEL-10697 - Workaround KAFKA-1894 by
calling the wakeup method
Posted by da...@apache.org.
CAMEL-10697 - Workaround KAFKA-1894 by calling the wakeup method
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5a4f641b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5a4f641b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5a4f641b
Branch: refs/heads/camel-2.18.x
Commit: 5a4f641b6c0145e30c104059ff3228cf0bcb3c6d
Parents: 70d4750
Author: Antoine DESSAIGNE <an...@gmail.com>
Authored: Wed Jan 11 17:25:56 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 12 10:51:51 2017 +0100
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5a4f641b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 66c4335..1614f5e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.kafka;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -43,6 +44,8 @@ public class KafkaConsumer extends DefaultConsumer {
private final KafkaEndpoint endpoint;
private final Processor processor;
private final Long pollTimeoutMs;
+ // This list helps working around the infinite loop of KAFKA-1894
+ private final List<KafkaFetchRecords> tasks = new ArrayList<>();
public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -73,7 +76,9 @@ public class KafkaConsumer extends DefaultConsumer {
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
- executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps()));
+ KafkaFetchRecords task = new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps());
+ executor.submit(task);
+ tasks.add(task);
}
}
@@ -87,7 +92,12 @@ public class KafkaConsumer extends DefaultConsumer {
} else {
executor.shutdownNow();
}
+ if (!executor.isTerminated()) {
+ tasks.forEach(KafkaFetchRecords::shutdown);
+ executor.shutdownNow();
+ }
}
+ tasks.clear();
executor = null;
super.doStop();
@@ -167,6 +177,11 @@ public class KafkaConsumer extends DefaultConsumer {
IOHelper.close(consumer);
}
}
+
+ private void shutdown() {
+ // As advised in the KAFKA-1894 ticket, calling this wakeup method breaks the infinite loop
+ consumer.wakeup();
+ }
}
}