You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/04/05 11:05:06 UTC
camel git commit: CAMEL-9812: Camel leaves Kafka consumers running
after shutdown
Repository: camel
Updated Branches:
refs/heads/master 75b424d0d -> d90a3f9d8
CAMEL-9812: Camel leaves Kafka consumers running after shutdown
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d90a3f9d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d90a3f9d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d90a3f9d
Branch: refs/heads/master
Commit: d90a3f9d89f7ee4e12efad65a099eef3ef2e532e
Parents: 75b424d
Author: Andrea Cosentino <an...@gmail.com>
Authored: Tue Apr 5 11:02:03 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Tue Apr 5 11:04:15 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/component/kafka/KafkaConsumer.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d90a3f9d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index a2f2d5b..ad02258 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -135,12 +135,17 @@ public class KafkaConsumer extends DefaultConsumer {
}
LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
consumer.unsubscribe();
+ LOG.debug("Closing {} ", threadId);
+ consumer.close();
} catch (InterruptException e) {
getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e);
consumer.unsubscribe();
Thread.currentThread().interrupt();
} catch (Exception e) {
getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
+ } finally {
+ LOG.debug("Closing {} ", threadId);
+ consumer.close();
}
}