You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/14 12:08:17 UTC
[2/2] camel git commit: CAMEL-10594: Improve shutdown of kafka
consumer to be graceful and break out the while loop during stopping.
CAMEL-10594: Improve shutdown of kafka consumer to be graceful and break out the while loop during stopping.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f91d100b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f91d100b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f91d100b
Branch: refs/heads/camel-2.18.x
Commit: f91d100bd03cadeefeab27664e750f6505505ff5
Parents: 09b072f
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Dec 14 13:07:34 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 14 13:07:59 2016 +0100
----------------------------------------------------------------------
.../apache/camel/component/kafka/KafkaConsumer.java | 15 +++++++++------
.../camel/component/kafka/KafkaConsumerFullTest.java | 4 ++--
.../kafka/clients/consumer/KafkaConsumerTest.java | 1 +
3 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 6cd5108..66c4335 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -68,8 +68,9 @@ public class KafkaConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
- super.doStart();
LOG.info("Starting Kafka consumer");
+ super.doStart();
+
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps()));
@@ -78,17 +79,18 @@ public class KafkaConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
- super.doStop();
LOG.info("Stopping Kafka consumer");
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
} else {
executor.shutdownNow();
}
}
executor = null;
+
+ super.doStop();
}
class KafkaFetchRecords implements Runnable {
@@ -117,7 +119,7 @@ public class KafkaConsumer extends DefaultConsumer {
@SuppressWarnings("unchecked")
public void run() {
try {
- LOG.debug("Subscribing {} to topic {}", threadId, topicName);
+ LOG.info("Subscribing {} to topic {}", threadId, topicName);
consumer.subscribe(Arrays.asList(topicName.split(",")));
if (endpoint.getConfiguration().isSeekToBeginning()) {
@@ -126,7 +128,7 @@ public class KafkaConsumer extends DefaultConsumer {
consumer.poll(100);
consumer.seekToBeginning(consumer.assignment());
}
- while (isRunAllowed() && !isSuspendingOrSuspended()) {
+ while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) {
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);
for (TopicPartition partition : allRecords.partitions()) {
List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
@@ -151,10 +153,11 @@ public class KafkaConsumer extends DefaultConsumer {
}
}
}
- LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
+ LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
consumer.unsubscribe();
} catch (InterruptException e) {
getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e);
+ LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
consumer.unsubscribe();
Thread.currentThread().interrupt();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 01c0bd1..e8e6c9e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -76,7 +76,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
}
@Test
- public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException {
+ public void kafkaMessageIsConsumedByCamel() throws InterruptedException, IOException {
to.expectedMessageCount(5);
to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
for (int k = 0; k < 5; k++) {
@@ -89,7 +89,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
@Test
@Ignore("Currently there is a bug in kafka which leads to an uninterruptable thread so a resub take too long (works manually)")
- public void kaftMessageIsConsumedByCamelSeekedToBeginning() throws Exception {
+ public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws Exception {
to.expectedMessageCount(5);
to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
for (int k = 0; k < 5; k++) {
http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 1c4f0ee..d74d5e0 100644
--- a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -37,6 +37,7 @@ public class KafkaConsumerTest {
public void init() {
when(kafkaConsumer.poll(1000)).thenReturn(ConsumerRecords.empty());
}
+
@Test
public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() {
ConsumerRecords<Object, Object> consumerRecords = kafkaConsumer.poll(1000);