You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/14 12:08:17 UTC

[2/2] camel git commit: CAMEL-10594: Improve shutdown of kafka consumer to be graceful and break out the while loop during stopping.

CAMEL-10594: Improve shutdown of kafka consumer to be graceful and break out the while loop during stopping.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f91d100b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f91d100b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f91d100b

Branch: refs/heads/camel-2.18.x
Commit: f91d100bd03cadeefeab27664e750f6505505ff5
Parents: 09b072f
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Dec 14 13:07:34 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 14 13:07:59 2016 +0100

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaConsumer.java  | 15 +++++++++------
 .../camel/component/kafka/KafkaConsumerFullTest.java |  4 ++--
 .../kafka/clients/consumer/KafkaConsumerTest.java    |  1 +
 3 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 6cd5108..66c4335 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -68,8 +68,9 @@ public class KafkaConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws Exception {
-        super.doStart();
         LOG.info("Starting Kafka consumer");
+        super.doStart();
+
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
             executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps()));
@@ -78,17 +79,18 @@ public class KafkaConsumer extends DefaultConsumer {
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
         LOG.info("Stopping Kafka consumer");
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
-                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
             } else {
                 executor.shutdownNow();
             }
         }
         executor = null;
+
+        super.doStop();
     }
 
     class KafkaFetchRecords implements Runnable {
@@ -117,7 +119,7 @@ public class KafkaConsumer extends DefaultConsumer {
         @SuppressWarnings("unchecked")
         public void run() {
             try {
-                LOG.debug("Subscribing {} to topic {}", threadId, topicName);
+                LOG.info("Subscribing {} to topic {}", threadId, topicName);
                 consumer.subscribe(Arrays.asList(topicName.split(",")));
 
                 if (endpoint.getConfiguration().isSeekToBeginning()) {
@@ -126,7 +128,7 @@ public class KafkaConsumer extends DefaultConsumer {
                     consumer.poll(100);
                     consumer.seekToBeginning(consumer.assignment());
                 }
-                while (isRunAllowed() && !isSuspendingOrSuspended()) {
+                while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) {
                     ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);
                     for (TopicPartition partition : allRecords.partitions()) {
                         List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
@@ -151,10 +153,11 @@ public class KafkaConsumer extends DefaultConsumer {
                         }
                     }
                 }
-                LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
+                LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();
             } catch (InterruptException e) {
                 getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e);
+                LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();
                 Thread.currentThread().interrupt();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 01c0bd1..e8e6c9e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -76,7 +76,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
     }
 
     @Test
-    public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException {
+    public void kafkaMessageIsConsumedByCamel() throws InterruptedException, IOException {
         to.expectedMessageCount(5);
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
         for (int k = 0; k < 5; k++) {
@@ -89,7 +89,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
 
     @Test
     @Ignore("Currently there is a bug in kafka which leads to an uninterruptable thread so a resub take too long (works manually)")
-    public void kaftMessageIsConsumedByCamelSeekedToBeginning() throws Exception {
+    public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws Exception {
         to.expectedMessageCount(5);
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
         for (int k = 0; k < 5; k++) {

http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 1c4f0ee..d74d5e0 100644
--- a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -37,6 +37,7 @@ public class KafkaConsumerTest {
     public void init() {
         when(kafkaConsumer.poll(1000)).thenReturn(ConsumerRecords.empty());
     }
+
     @Test
     public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() {
         ConsumerRecords<Object, Object> consumerRecords = kafkaConsumer.poll(1000);