You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/25 09:27:49 UTC

camel git commit: CAMEL-8180 Fixed the issue of Incorrect handling of ConsumerTimeoutException with thanks to Ivan

Repository: camel
Updated Branches:
  refs/heads/master b2b17cd8c -> 2e9fda50f


CAMEL-8180 Fixed the issue of Incorrect handling of ConsumerTimeoutException with thanks to Ivan


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e9fda50
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e9fda50
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e9fda50

Branch: refs/heads/master
Commit: 2e9fda50fb1dff6ca18473b724e5ae513d3e503c
Parents: b2b17cd
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Dec 25 16:27:12 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Dec 25 16:27:12 2014 +0800

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 45 +++++++++++++++-----
 .../camel/component/kafka/KafkaEndpoint.java    |  2 +-
 .../kafka/KafkaConsumerBatchSizeTest.java       | 30 ++++++++-----
 3 files changed, 55 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2e9fda50/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 0165310..d6b49d2 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
@@ -73,14 +75,17 @@ public class KafkaConsumer extends DefaultConsumer {
         super.doStart();
         log.info("Starting Kafka consumer");
         executor = endpoint.createExecutor();
-
         for (int i = 0; i < endpoint.getConsumersCount(); i++) {
             ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
             Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
             topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
             List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
-            if (endpoint.isAutoCommitEnable() != null && Boolean.FALSE == endpoint.isAutoCommitEnable().booleanValue()) {
+            if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
+                if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs().intValue() < 0)
+                        && endpoint.getConsumerStreams() > 1) {
+                    LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
+                }
                 CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
                 for (final KafkaStream<byte[], byte[]> stream : streams) {
                     executor.submit(new BatchingConsumerTask(stream, barrier));
@@ -126,19 +131,39 @@ public class KafkaConsumer extends DefaultConsumer {
         }
 
         public void run() {
+
             int processed = 0;
-            for (MessageAndMetadata<byte[], byte[]> mm : stream) {
-                Exchange exchange = endpoint.createKafkaExchange(mm);
+            boolean consumerTimeout;
+            MessageAndMetadata<byte[], byte[]> mm = null;
+            ConsumerIterator<byte[], byte[]> it = stream.iterator();
+
+            while (true) {
+
                 try {
-                    processor.process(exchange);
-                } catch (Exception e) {
-                    LOG.error(e.getMessage(), e);
+                    consumerTimeout = false;
+                    if (it.hasNext()) {
+                        mm = it.next();
+                    } else {
+                        break;
+                    }
+                    Exchange exchange = endpoint.createKafkaExchange(mm);
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        LOG.error(e.getMessage(), e);
+                    }
+                    processed++;
+                } catch (ConsumerTimeoutException e) {
+                    LOG.debug(e.getMessage(), e);
+                    consumerTimeout = true;
                 }
-                processed++;
-                if (processed >= endpoint.getBatchSize()) {
+
+                if (processed >= endpoint.getBatchSize() || consumerTimeout) {
                     try {
                         berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
-                        processed = 0;
+                        if (!consumerTimeout) {
+                            processed = 0;
+                        }
                     } catch (InterruptedException e) {
                         LOG.error(e.getMessage(), e);
                         break;

http://git-wip-us.apache.org/repos/asf/camel/blob/2e9fda50/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index d32cb83..ce68b47 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -286,7 +286,7 @@ public class KafkaEndpoint extends DefaultEndpoint {
         configuration.setZookeeperSessionTimeoutMs(zookeeperSessionTimeoutMs);
     }
 
-    public int getConsumerTimeoutMs() {
+    public Integer getConsumerTimeoutMs() {
         return configuration.getConsumerTimeoutMs();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2e9fda50/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index 198994d..16f9a07 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -33,9 +33,18 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}&"
-        + "groupId=group1&autoOffsetReset=smallest&"
-        + "autoCommitEnable=false&batchSize=3&consumerStreams=1")
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
+            + "&zookeeperHost=localhost"
+            + "&zookeeperPort={{zookeeperPort}}"
+            + "&groupId=group1"
+            + "&autoOffsetReset=smallest"
+            + "&autoCommitEnable=false"
+            + "&batchSize=3"
+            + "&consumerStreams=10"
+            // If set the consumerTiemout too small the test will fail in JDK7
+            + "&consumerTimeoutMs=300"
+            + "&barrierAwaitTimeoutMs=1000"
+    )
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
@@ -72,8 +81,7 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
 
     @Test
     public void kafkaMessagesIsConsumedByCamel() throws Exception {
-        //First 5 must not be committed since batch size is 3
-        to.expectedMessageCount(2);
+        //First 2 must not be committed since batch size is 3
         to.expectedBodiesReceivedInAnyOrder("m1", "m2");
         for (int k = 1; k <= 2; k++) {
             String msg = "m" + k;
@@ -81,13 +89,12 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
             producer.send(data);
         }
         to.assertIsSatisfied(3000);
-
+        
+        to.reset();
         //Restart endpoint,
         from.getCamelContext().stop();
         from.getCamelContext().start();
-
-        to.reset();
-        to.expectedMessageCount(10);
+        
         to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10");
 
         //Second route must wake up and consume all from scratch and commit 9 consumed
@@ -99,14 +106,15 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
 
         to.assertIsSatisfied(3000);
 
+        to.reset();
         //Restart endpoint,
         from.getCamelContext().stop();
         from.getCamelContext().start();
 
-        to.reset();
-
+        
         //Only one message should left to consume by this consumer group
         to.expectedMessageCount(1);
+        to.assertIsSatisfied(3000);
     }
 }