You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/07/01 19:45:46 UTC

[camel] branch master updated: CAMEL-15228: add end signal for polls (#3960)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new de74371  CAMEL-15228: add end signal for polls (#3960)
de74371 is described below

commit de74371cfc20b2149a8d65233b8538e6d30354f0
Author: 7thbit-com <67...@users.noreply.github.com>
AuthorDate: Wed Jul 1 21:45:17 2020 +0200

    CAMEL-15228: add end signal for polls (#3960)
    
    * CAMEL-15228: add end signal for polls
    
    * CAMEL-15228: add end signal for polls
    
    Co-authored-by: Jörg Schuberth <gi...@7thbit.com>
---
 .../camel-kafka/src/main/docs/kafka-component.adoc   |  1 +
 .../apache/camel/component/kafka/KafkaConstants.java |  2 ++
 .../apache/camel/component/kafka/KafkaConsumer.java  | 20 +++++++++++++-------
 .../kafka/KafkaConsumerLastRecordHeaderTest.java     |  4 ++++
 4 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index ea8c994..40d86e6 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -293,6 +293,7 @@ The following headers are available when consuming messages from Kafka.
 | KafkaConstants.KEY                       | "kafka.KEY"                       | Object  | The key of the message if configured
 | KafkaConstants.HEADERS                   | "kafka.HEADERS"                   | org.apache.kafka.common.header.Headers  | The record headers
 | KafkaConstants.LAST_RECORD_BEFORE_COMMIT | "kafka.LAST_RECORD_BEFORE_COMMIT" | Boolean | Whether or not it's the last record before commit (only available if `autoCommitEnable` endpoint parameter is `false`)
+| KafkaConstants.LAST_POLL_RECORD | "kafka.LAST_POLL_RECORD" | Boolean | Indicates the last record within the current poll request (only available if `autoCommitEnable` endpoint parameter is `false` or `allowManualCommit` is `true`)
 | KafkaConstants.MANUAL_COMMIT             | "CamelKafkaManualCommit"          | KafkaManualCommit | Can be used for forcing manual offset commit when using Kafka consumer. |
 |===
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 35bde9b..d909d52 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -26,6 +26,7 @@ public final class KafkaConstants {
     public static final String OFFSET = "kafka.OFFSET";
     public static final String HEADERS = "kafka.HEADERS";
     public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT";
+    public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD";
     public static final String TIMESTAMP = "kafka.TIMESTAMP";
 
     @Deprecated
@@ -45,3 +46,4 @@ public final class KafkaConstants {
         // Utility class
     }
 }
+
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 30f33fa..e7cffa4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.regex.Pattern;
 import java.util.stream.StreamSupport;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -113,7 +112,7 @@ public class KafkaConsumer extends DefaultConsumer {
         super.doStart();
 
         // is the offset repository already started?
-        StateRepository repo = endpoint.getConfiguration().getOffsetRepository();
+        StateRepository<String, String> repo = endpoint.getConfiguration().getOffsetRepository();
         if (repo instanceof ServiceSupport) {
             boolean started = ((ServiceSupport)repo).isStarted();
             // if not already started then we would do that and also stop it
@@ -163,7 +162,7 @@ public class KafkaConsumer extends DefaultConsumer {
         executor = null;
 
         if (stopOffsetRepo) {
-            StateRepository repo = endpoint.getConfiguration().getOffsetRepository();
+            StateRepository<String, String> repo = endpoint.getConfiguration().getOffsetRepository();
             LOG.debug("Stopping OffsetRepository: {}", repo);
             ServiceHelper.stopAndShutdownService(repo);
         }
@@ -309,16 +308,17 @@ public class KafkaConsumer extends DefaultConsumer {
                     boolean breakOnErrorHit = false;
                     LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs);
                     ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);
-
-                    for (TopicPartition partition : allRecords.partitions()) {
-
+                    
+                    Iterator<TopicPartition> partitionIterator = allRecords.partitions().iterator();
+                    while (partitionIterator.hasNext()) {
+                        TopicPartition partition = partitionIterator.next();
                         long partitionLastOffset = -1;
 
                         Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator();
                         LOG.debug("Records count {} received for partition {}", allRecords.records(partition).size(), partition);
                         if (!breakOnErrorHit && recordIterator.hasNext()) {
                             ConsumerRecord<Object, Object> record;
-
+ 
                             while (!breakOnErrorHit && recordIterator.hasNext()) {
                                 record = recordIterator.next();
                                 if (LOG.isTraceEnabled()) {
@@ -341,6 +341,11 @@ public class KafkaConsumer extends DefaultConsumer {
                                                                                                                                  offsetRepository, partition, record.offset());
                                     exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
                                 }
+                                // if commit management is on user side give additional info for the end of poll loop
+                                if (!isAutoCommitEnabled() || endpoint.getConfiguration().isAllowManualCommit()) {
+                                    exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD,
+                                                               !recordIterator.hasNext() && !partitionIterator.hasNext());
+                                }
 
                                 try {
                                     processor.process(exchange);
@@ -519,3 +524,4 @@ public class KafkaConsumer extends DefaultConsumer {
         return Long.parseLong(offset);
     }
 }
+
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
index 1cb8673..a4eb580 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
@@ -73,6 +73,10 @@ public class KafkaConsumerLastRecordHeaderTest extends BaseEmbeddedKafkaTest {
             Boolean header = exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
             assertNotNull("Header not set for #" + i, header);
             assertEquals("Header invalid for #" + i, header, i == exchanges.size() - 1);
+            // as long as the partitions count is 1 on topic:
+            header = exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_POLL_RECORD, Boolean.class);
+            assertNotNull("Last record header not set for #" + i, header);
+            assertEquals("Last record header invalid for #" + i, header, i == exchanges.size() - 1);
         }
     }