You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/04/05 11:05:06 UTC

camel git commit: CAMEL-9812: Camel leaves Kafka consumers running after shutdown

Repository: camel
Updated Branches:
  refs/heads/master 75b424d0d -> d90a3f9d8


CAMEL-9812: Camel leaves Kafka consumers running after shutdown


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d90a3f9d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d90a3f9d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d90a3f9d

Branch: refs/heads/master
Commit: d90a3f9d89f7ee4e12efad65a099eef3ef2e532e
Parents: 75b424d
Author: Andrea Cosentino <an...@gmail.com>
Authored: Tue Apr 5 11:02:03 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Tue Apr 5 11:04:15 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/component/kafka/KafkaConsumer.java    | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d90a3f9d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index a2f2d5b..ad02258 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -135,12 +135,17 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
                 LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();
+                LOG.debug("Closing {} ", threadId);
+                consumer.close();
             } catch (InterruptException e) {
                 getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e);
                 consumer.unsubscribe();
                 Thread.currentThread().interrupt();
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
+            } finally {
+                LOG.debug("Closing {} ", threadId);
+                consumer.close();
             }
         }