You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/25 09:27:49 UTC
camel git commit: CAMEL-8180 Fixed the issue of Incorrect handling of
ConsumerTimeoutException with thanks to Ivan
Repository: camel
Updated Branches:
refs/heads/master b2b17cd8c -> 2e9fda50f
CAMEL-8180 Fixed the issue of Incorrect handling of ConsumerTimeoutException with thanks to Ivan
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e9fda50
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e9fda50
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e9fda50
Branch: refs/heads/master
Commit: 2e9fda50fb1dff6ca18473b724e5ae513d3e503c
Parents: b2b17cd
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Dec 25 16:27:12 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Dec 25 16:27:12 2014 +0800
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 45 +++++++++++++++-----
.../camel/component/kafka/KafkaEndpoint.java | 2 +-
.../kafka/KafkaConsumerBatchSizeTest.java | 30 ++++++++-----
3 files changed, 55 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2e9fda50/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 0165310..d6b49d2 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
@@ -73,14 +75,17 @@ public class KafkaConsumer extends DefaultConsumer {
super.doStart();
log.info("Starting Kafka consumer");
executor = endpoint.createExecutor();
-
for (int i = 0; i < endpoint.getConsumersCount(); i++) {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
- if (endpoint.isAutoCommitEnable() != null && Boolean.FALSE == endpoint.isAutoCommitEnable().booleanValue()) {
+ if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
+ if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs().intValue() < 0)
+ && endpoint.getConsumerStreams() > 1) {
+ LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
+ }
CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new BatchingConsumerTask(stream, barrier));
@@ -126,19 +131,39 @@ public class KafkaConsumer extends DefaultConsumer {
}
public void run() {
+
int processed = 0;
- for (MessageAndMetadata<byte[], byte[]> mm : stream) {
- Exchange exchange = endpoint.createKafkaExchange(mm);
+ boolean consumerTimeout;
+ MessageAndMetadata<byte[], byte[]> mm = null;
+ ConsumerIterator<byte[], byte[]> it = stream.iterator();
+
+ while (true) {
+
try {
- processor.process(exchange);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ consumerTimeout = false;
+ if (it.hasNext()) {
+ mm = it.next();
+ } else {
+ break;
+ }
+ Exchange exchange = endpoint.createKafkaExchange(mm);
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ processed++;
+ } catch (ConsumerTimeoutException e) {
+ LOG.debug(e.getMessage(), e);
+ consumerTimeout = true;
}
- processed++;
- if (processed >= endpoint.getBatchSize()) {
+
+ if (processed >= endpoint.getBatchSize() || consumerTimeout) {
try {
berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
- processed = 0;
+ if (!consumerTimeout) {
+ processed = 0;
+ }
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
break;
http://git-wip-us.apache.org/repos/asf/camel/blob/2e9fda50/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index d32cb83..ce68b47 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -286,7 +286,7 @@ public class KafkaEndpoint extends DefaultEndpoint {
configuration.setZookeeperSessionTimeoutMs(zookeeperSessionTimeoutMs);
}
- public int getConsumerTimeoutMs() {
+ public Integer getConsumerTimeoutMs() {
return configuration.getConsumerTimeoutMs();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2e9fda50/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index 198994d..16f9a07 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -33,9 +33,18 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
public static final String TOPIC = "test";
- @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}&"
- + "groupId=group1&autoOffsetReset=smallest&"
- + "autoCommitEnable=false&batchSize=3&consumerStreams=1")
+ @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
+ + "&zookeeperHost=localhost"
+ + "&zookeeperPort={{zookeeperPort}}"
+ + "&groupId=group1"
+ + "&autoOffsetReset=smallest"
+ + "&autoCommitEnable=false"
+ + "&batchSize=3"
+ + "&consumerStreams=10"
+ // If set the consumerTiemout too small the test will fail in JDK7
+ + "&consumerTimeoutMs=300"
+ + "&barrierAwaitTimeoutMs=1000"
+ )
private Endpoint from;
@EndpointInject(uri = "mock:result")
@@ -72,8 +81,7 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
@Test
public void kafkaMessagesIsConsumedByCamel() throws Exception {
- //First 5 must not be committed since batch size is 3
- to.expectedMessageCount(2);
+ //First 2 must not be committed since batch size is 3
to.expectedBodiesReceivedInAnyOrder("m1", "m2");
for (int k = 1; k <= 2; k++) {
String msg = "m" + k;
@@ -81,13 +89,12 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
producer.send(data);
}
to.assertIsSatisfied(3000);
-
+
+ to.reset();
//Restart endpoint,
from.getCamelContext().stop();
from.getCamelContext().start();
-
- to.reset();
- to.expectedMessageCount(10);
+
to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10");
//Second route must wake up and consume all from scratch and commit 9 consumed
@@ -99,14 +106,15 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
to.assertIsSatisfied(3000);
+ to.reset();
//Restart endpoint,
from.getCamelContext().stop();
from.getCamelContext().start();
- to.reset();
-
+
//Only one message should left to consume by this consumer group
to.expectedMessageCount(1);
+ to.assertIsSatisfied(3000);
}
}