You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/03/18 11:55:10 UTC

[camel] branch master updated: CAMEL-13140: Fixed camel-kafka consumer may do sync commit when auto commit enabled, when there are no messages recieved from the partitions.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aa5f61  CAMEL-13140: Fixed camel-kafka consumer may do sync commit when auto commit enabled, when there are no messages recieved from the partitions.
3aa5f61 is described below

commit 3aa5f61ca9f6f718c9ddbf96ef75a36ef8384119
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 18 12:53:12 2019 +0100

    CAMEL-13140: Fixed camel-kafka consumer may do sync commit when auto commit enabled, when there are no messages recieved from the partitions.
---
 .../src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java | 3 ---
 .../component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java      | 4 ++--
 components/camel-kafka/src/test/resources/log4j2.properties           | 3 +++
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 29e8501..73b5bff 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -404,9 +404,6 @@ public class KafkaConsumer extends DefaultConsumer {
                 } else if (forceCommit) {
                     log.debug("Forcing commitSync {} from topic {} with offset: {}", threadId, topicName, partitionLastOffset);
                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)));
-                } else if (endpoint.getConfiguration().isAutoCommitEnable()) {
-                    log.debug("Auto commitSync {} from topic {} with offset: {}", threadId, topicName, partitionLastOffset);
-                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)));
                 }
             }
         }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
index 3b08dea..71be2df 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
@@ -69,10 +69,9 @@ public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTe
      */
     @Test
     public void shouldResumeFromAnyParticularOffset() throws InterruptedException {
-        result.expectedMessageCount(3);
         result.expectedBodiesReceivedInAnyOrder("message-6", "message-8", "message-9");
 
-        result.assertIsSatisfied(3000);
+        result.assertIsSatisfied(5000);
 
         assertEquals("partition-0", "4", stateRepository.getState(TOPIC + "/0"));
         assertEquals("partition-1", "4", stateRepository.getState(TOPIC + "/1"));
@@ -92,6 +91,7 @@ public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTe
             public void configure() throws Exception {
                 from("kafka:" + TOPIC
                              + "?groupId=A"
+                             + "&autoCommitIntervalMs=1000"
                              + "&autoOffsetReset=earliest"             // Ask to start from the beginning if we have unknown offset
                              + "&consumersCount=2"                     // We have 2 partitions, we want 1 consumer per partition
                              + "&offsetRepository=#offset")            // Keep the offset in our repository
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties
index 2e08696..1c24e24 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -37,3 +37,6 @@ logger.camelKafka.level=INFO
 logger.idem.name=org.apache.camel.processor.idempotent
 logger.idem.level=INFO
 
+logger.kafka.name=org.apache.kafka
+logger.kafka.level=INFO
+