You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/03/18 11:55:10 UTC
[camel] branch master updated: CAMEL-13140: Fixed camel-kafka
consumer may do sync commit when auto commit enabled,
when there are no messages recieved from the partitions.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3aa5f61 CAMEL-13140: Fixed camel-kafka consumer may do sync commit when auto commit enabled, when there are no messages recieved from the partitions.
3aa5f61 is described below
commit 3aa5f61ca9f6f718c9ddbf96ef75a36ef8384119
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 18 12:53:12 2019 +0100
CAMEL-13140: Fixed camel-kafka consumer may do sync commit when auto commit enabled, when there are no messages recieved from the partitions.
---
.../src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java | 3 ---
.../component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java | 4 ++--
components/camel-kafka/src/test/resources/log4j2.properties | 3 +++
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 29e8501..73b5bff 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -404,9 +404,6 @@ public class KafkaConsumer extends DefaultConsumer {
} else if (forceCommit) {
log.debug("Forcing commitSync {} from topic {} with offset: {}", threadId, topicName, partitionLastOffset);
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)));
- } else if (endpoint.getConfiguration().isAutoCommitEnable()) {
- log.debug("Auto commitSync {} from topic {} with offset: {}", threadId, topicName, partitionLastOffset);
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)));
}
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
index 3b08dea..71be2df 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
@@ -69,10 +69,9 @@ public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTe
*/
@Test
public void shouldResumeFromAnyParticularOffset() throws InterruptedException {
- result.expectedMessageCount(3);
result.expectedBodiesReceivedInAnyOrder("message-6", "message-8", "message-9");
- result.assertIsSatisfied(3000);
+ result.assertIsSatisfied(5000);
assertEquals("partition-0", "4", stateRepository.getState(TOPIC + "/0"));
assertEquals("partition-1", "4", stateRepository.getState(TOPIC + "/1"));
@@ -92,6 +91,7 @@ public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTe
public void configure() throws Exception {
from("kafka:" + TOPIC
+ "?groupId=A"
+ + "&autoCommitIntervalMs=1000"
+ "&autoOffsetReset=earliest" // Ask to start from the beginning if we have unknown offset
+ "&consumersCount=2" // We have 2 partitions, we want 1 consumer per partition
+ "&offsetRepository=#offset") // Keep the offset in our repository
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties
index 2e08696..1c24e24 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -37,3 +37,6 @@ logger.camelKafka.level=INFO
logger.idem.name=org.apache.camel.processor.idempotent
logger.idem.level=INFO
+logger.kafka.name=org.apache.kafka
+logger.kafka.level=INFO
+