You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/12 13:55:24 UTC

[camel] branch camel-3.18.x updated (da5f70c22bf -> f519f84b7df)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


    from da5f70c22bf camel-jbang - Upgrade to quarkus 2.13.1
     new d407362d057 [camel-18588] Added condition to only commit offset if it is not -1
     new 8aebc9d7ad6 [camel-18588] Added condition to only commit offset if it is not alreday paused
     new f519f84b7df [camel-18327] resuming from last committed offset

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/kafka/KafkaFetchRecords.java   | 15 ++++++++++++---
 .../kafka/consumer/support/KafkaRecordProcessor.java      |  8 ++++++--
 2 files changed, 18 insertions(+), 5 deletions(-)


[camel] 02/03: [camel-18588] Added condition to only commit offset if it is not alreday paused

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8aebc9d7ad659b4238aefa215c63d6b438bb9af5
Author: geekr <ge...@gmail.com>
AuthorDate: Tue Oct 11 15:01:11 2022 -0400

    [camel-18588] Added condition to only commit offset if it is not alreday paused
---
 .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java     | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 5684375bbdd..edf39671436 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -317,8 +317,8 @@ public class KafkaFetchRecords implements Runnable {
                 }
 
                 ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult);
-
-                if (result.isBreakOnErrorHit()) {
+                updateTaskState();
+                if (result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) {
                     LOG.debug("We hit an error ... setting flags to force reconnect");
                     // force re-connect
                     setReconnect(true);
@@ -327,7 +327,6 @@ public class KafkaFetchRecords implements Runnable {
                     lastResult = result;
                 }
 
-                updateTaskState();
             }
 
             if (!isConnected()) {


[camel] 03/03: [camel-18327] resuming from last committed offset

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f519f84b7df11ebc12bf475c70bd35a126f480c0
Author: geekr <ge...@gmail.com>
AuthorDate: Fri Oct 7 13:54:42 2022 -0400

    [camel-18327] resuming from last committed offset
---
 .../org/apache/camel/component/kafka/KafkaFetchRecords.java    | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index edf39671436..079a14615ab 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -41,7 +41,9 @@ import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
@@ -380,6 +382,14 @@ public class KafkaFetchRecords implements Runnable {
                 break;
             case RESUME_REQUESTED:
                 LOG.info("Resuming the consumer as a response to a resume request");
+                if (consumer.committed(this.consumer.assignment()) != null) {
+                    consumer.committed(this.consumer.assignment()).forEach((k, v) -> {
+                        final TopicPartition tp = (TopicPartition) k;
+                        LOG.info("Resuming from the offset {} for the topic {} with partition {}",
+                                ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition());
+                        consumer.seek(tp, ((OffsetAndMetadata) v).offset());
+                    });
+                }
                 consumer.resume(consumer.assignment());
                 state = State.RUNNING;
                 break;


[camel] 01/03: [camel-18588] Added condition to only commit offset if it is not -1

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d407362d0571cedcb2dfd4d04d83263b8da729d3
Author: geekr <ge...@gmail.com>
AuthorDate: Mon Oct 10 15:38:32 2022 -0400

    [camel-18588] Added condition to only commit offset if it is not -1
---
 .../component/kafka/consumer/support/KafkaRecordProcessor.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 38763385b78..1afe53cbe2b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -23,6 +23,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -131,8 +132,11 @@ public class KafkaRecordProcessor {
                 LOG.warn("Will seek consumer to offset {} and start polling again.", partitionLastOffset);
             }
 
-            // force commit, so we resume on next poll where we failed
-            commitManager.forceCommit(partition, partitionLastOffset);
+            // force commit, so we resume on next poll where we failed except when the failure happened
+            // at the first message in a poll
+            if (partitionLastOffset != AbstractCommitManager.START_OFFSET) {
+                commitManager.forceCommit(partition, partitionLastOffset);
+            }
 
             // continue to next partition
             return true;