You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/07/01 19:45:46 UTC
[camel] branch master updated: CAMEL-15228: add end signal for
polls (#3960)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new de74371 CAMEL-15228: add end signal for polls (#3960)
de74371 is described below
commit de74371cfc20b2149a8d65233b8538e6d30354f0
Author: 7thbit-com <67...@users.noreply.github.com>
AuthorDate: Wed Jul 1 21:45:17 2020 +0200
CAMEL-15228: add end signal for polls (#3960)
* CAMEL-15228: add end signal for polls
* CAMEL-15228: add end signal for polls
Co-authored-by: Jörg Schuberth <gi...@7thbit.com>
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 1 +
.../apache/camel/component/kafka/KafkaConstants.java | 2 ++
.../apache/camel/component/kafka/KafkaConsumer.java | 20 +++++++++++++-------
.../kafka/KafkaConsumerLastRecordHeaderTest.java | 4 ++++
4 files changed, 20 insertions(+), 7 deletions(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index ea8c994..40d86e6 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -293,6 +293,7 @@ The following headers are available when consuming messages from Kafka.
| KafkaConstants.KEY | "kafka.KEY" | Object | The key of the message if configured
| KafkaConstants.HEADERS | "kafka.HEADERS" | org.apache.kafka.common.header.Headers | The record headers
| KafkaConstants.LAST_RECORD_BEFORE_COMMIT | "kafka.LAST_RECORD_BEFORE_COMMIT" | Boolean | Whether or not it's the last record before commit (only available if `autoCommitEnable` endpoint parameter is `false`)
+| KafkaConstants.LAST_POLL_RECORD | "kafka.LAST_POLL_RECORD" | Boolean | Indicates the last record within the current poll request (only available if `autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is `true`)
| KafkaConstants.MANUAL_COMMIT | "CamelKafkaManualCommit" | KafkaManualCommit | Can be used for forcing manual offset commit when using Kafka consumer. |
|===
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 35bde9b..d909d52 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -26,6 +26,7 @@ public final class KafkaConstants {
public static final String OFFSET = "kafka.OFFSET";
public static final String HEADERS = "kafka.HEADERS";
public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT";
+ public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD";
public static final String TIMESTAMP = "kafka.TIMESTAMP";
@Deprecated
@@ -45,3 +46,4 @@ public final class KafkaConstants {
// Utility class
}
}
+
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 30f33fa..e7cffa4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -113,7 +112,7 @@ public class KafkaConsumer extends DefaultConsumer {
super.doStart();
// is the offset repository already started?
- StateRepository repo = endpoint.getConfiguration().getOffsetRepository();
+ StateRepository<String, String> repo = endpoint.getConfiguration().getOffsetRepository();
if (repo instanceof ServiceSupport) {
boolean started = ((ServiceSupport)repo).isStarted();
// if not already started then we would do that and also stop it
@@ -163,7 +162,7 @@ public class KafkaConsumer extends DefaultConsumer {
executor = null;
if (stopOffsetRepo) {
- StateRepository repo = endpoint.getConfiguration().getOffsetRepository();
+ StateRepository<String, String> repo = endpoint.getConfiguration().getOffsetRepository();
LOG.debug("Stopping OffsetRepository: {}", repo);
ServiceHelper.stopAndShutdownService(repo);
}
@@ -309,16 +308,17 @@ public class KafkaConsumer extends DefaultConsumer {
boolean breakOnErrorHit = false;
LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs);
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);
-
- for (TopicPartition partition : allRecords.partitions()) {
-
+
+ Iterator<TopicPartition> partitionIterator = allRecords.partitions().iterator();
+ while (partitionIterator.hasNext()) {
+ TopicPartition partition = partitionIterator.next();
long partitionLastOffset = -1;
Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator();
LOG.debug("Records count {} received for partition {}", allRecords.records(partition).size(), partition);
if (!breakOnErrorHit && recordIterator.hasNext()) {
ConsumerRecord<Object, Object> record;
-
+
while (!breakOnErrorHit && recordIterator.hasNext()) {
record = recordIterator.next();
if (LOG.isTraceEnabled()) {
@@ -341,6 +341,11 @@ public class KafkaConsumer extends DefaultConsumer {
offsetRepository, partition, record.offset());
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
}
+ // if commit management is on user side give additional info for the end of poll loop
+ if (!isAutoCommitEnabled() || endpoint.getConfiguration().isAllowManualCommit()) {
+ exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD,
+ !recordIterator.hasNext() && !partitionIterator.hasNext());
+ }
try {
processor.process(exchange);
@@ -519,3 +524,4 @@ public class KafkaConsumer extends DefaultConsumer {
return Long.parseLong(offset);
}
}
+
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
index 1cb8673..a4eb580 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
@@ -73,6 +73,10 @@ public class KafkaConsumerLastRecordHeaderTest extends BaseEmbeddedKafkaTest {
Boolean header = exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
assertNotNull("Header not set for #" + i, header);
assertEquals("Header invalid for #" + i, header, i == exchanges.size() - 1);
+ // as long as the partitions count is 1 on topic:
+ header = exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_POLL_RECORD, Boolean.class);
+ assertNotNull("Last record header not set for #" + i, header);
+ assertEquals("Last record header invalid for #" + i, header, i == exchanges.size() - 1);
}
}