You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/04/28 15:55:55 UTC

[camel] branch main updated (b1288fbfb0f -> aa306b7100b)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from b1288fbfb0f camel-yaml-dsl - Fix mistake in schema about error handler should not end in .class
     new bd0d9cd7b2a CAMEL-18024: rework the last processed offset tracking
     new aa306b7100b CAMEL-18024: use the commit manager for manual commits

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/component/kafka/KafkaFetchRecords.java   |  24 ++--
 .../kafka/consumer/AbstractCommitManager.java      |  10 +-
 .../kafka/consumer/AsyncCommitManager.java         |  38 +++---
 .../component/kafka/consumer/CommitManager.java    |  37 ++++--
 .../kafka/consumer/CommitToOffsetManager.java      |  24 ++--
 .../consumer/DefaultKafkaManualAsyncCommit.java    |  45 +-------
 .../DefaultKafkaManualAsyncCommitFactory.java      |  27 +----
 .../kafka/consumer/DefaultKafkaManualCommit.java   |   9 +-
 .../consumer/DefaultKafkaManualCommitFactory.java  |  26 +----
 .../consumer/DefaultKafkaManualSyncCommit.java     |  32 +-----
 .../kafka/consumer/KafkaAsyncManualCommit.java     |   5 +-
 .../kafka/consumer/KafkaManualCommitFactory.java   |  17 +--
 .../kafka/consumer/NoopCommitManager.java          |  10 +-
 .../component/kafka/consumer/OffsetCache.java      |  64 +++++++++++
 .../kafka/consumer/SyncCommitManager.java          |  31 ++---
 .../consumer/support/KafkaRecordProcessor.java     |   2 +-
 .../support/KafkaRecordProcessorFacade.java        |  16 +--
 .../support/PartitionAssignmentListener.java       |  40 +------
 .../component/kafka/consumer/OffsetCacheTest.java  | 128 +++++++++++++++++++++
 .../integration/pause/KafkaPausableConsumerIT.java |  12 +-
 .../ROOT/pages/camel-3x-upgrade-guide-3_17.adoc    |   7 ++
 21 files changed, 327 insertions(+), 277 deletions(-)
 create mode 100644 components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/OffsetCache.java
 create mode 100644 components/camel-kafka/src/test/java/org/apache/camel/component/kafka/consumer/OffsetCacheTest.java


[camel] 01/02: CAMEL-18024: rework the last processed offset tracking

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bd0d9cd7b2a6dd3f9cb4444e5481e39ed780a76f
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Apr 28 08:25:01 2022 +0200

    CAMEL-18024: rework the last processed offset tracking
    
    Move the last processed offset tracking code into the commit manager so
    it can be handled accordingly to the commit management strategy (sync,
    async, noop, etc)
---
 .../camel/component/kafka/KafkaFetchRecords.java   |  15 +--
 .../kafka/consumer/AbstractCommitManager.java      |   2 +-
 .../kafka/consumer/AsyncCommitManager.java         |  24 ++--
 .../component/kafka/consumer/CommitManager.java    |  27 ++++-
 .../kafka/consumer/CommitToOffsetManager.java      |  24 ++--
 .../kafka/consumer/NoopCommitManager.java          |  10 +-
 .../component/kafka/consumer/OffsetCache.java      |  64 +++++++++++
 .../kafka/consumer/SyncCommitManager.java          |  31 ++---
 .../consumer/support/KafkaRecordProcessor.java     |   2 +-
 .../support/KafkaRecordProcessorFacade.java        |  13 +--
 .../support/PartitionAssignmentListener.java       |  35 +-----
 .../component/kafka/consumer/OffsetCacheTest.java  | 128 +++++++++++++++++++++
 12 files changed, 282 insertions(+), 93 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 01f9efa0596..51553f0fa55 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -18,8 +18,6 @@ package org.apache.camel.component.kafka;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
@@ -75,7 +73,6 @@ public class KafkaFetchRecords implements Runnable {
     private final Pattern topicPattern;
     private final String threadId;
     private final Properties kafkaProps;
-    private final Map<String, Long> lastProcessedOffset = new HashMap<>();
     private final PollExceptionStrategy pollExceptionStrategy;
     private final BridgeExceptionHandlerToErrorHandler bridge;
     private final ReentrantLock lock = new ReentrantLock();
@@ -279,7 +276,7 @@ public class KafkaFetchRecords implements Runnable {
         resumeStrategy.setConsumer(consumer);
 
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
-                threadId, kafkaConsumer.getEndpoint().getConfiguration(), lastProcessedOffset,
+                threadId, kafkaConsumer.getEndpoint().getConfiguration(),
                 this::isRunnable, commitManager, resumeStrategy);
 
         if (LOG.isInfoEnabled()) {
@@ -310,7 +307,7 @@ public class KafkaFetchRecords implements Runnable {
             }
 
             KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(
-                    kafkaConsumer, lastProcessedOffset, threadId, commitManager, consumerListener);
+                    kafkaConsumer, threadId, commitManager, consumerListener);
 
             Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
             while (isKafkaConsumerRunnable() && isConnected() && pollExceptionStrategy.canContinue()) {
@@ -359,11 +356,11 @@ public class KafkaFetchRecords implements Runnable {
             safeUnsubscribe();
         } catch (Exception e) {
             if (LOG.isDebugEnabled()) {
-                LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}",
-                        e.getClass().getName(), threadId, getPrintableTopic(), lastProcessedOffset, e.getMessage(), e);
+                LOG.warn("Exception {} caught by thread {} while polling {} from kafka: {}",
+                        e.getClass().getName(), threadId, getPrintableTopic(), e.getMessage(), e);
             } else {
-                LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}",
-                        e.getClass().getName(), threadId, getPrintableTopic(), lastProcessedOffset, e.getMessage());
+                LOG.warn("Exception {} caught by thread {} while polling {} from kafka: {}",
+                        e.getClass().getName(), threadId, getPrintableTopic(), e.getMessage());
             }
 
             pollExceptionStrategy.handle(partitionLastOffset, e);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
index 8333fa1687f..bce04599f74 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
@@ -81,7 +81,7 @@ public abstract class AbstractCommitManager implements CommitManager {
     }
 
     @Override
-    public void commitOffsetForce(TopicPartition partition, long partitionLastOffset) {
+    public void forceCommit(TopicPartition partition, long partitionLastOffset) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(),
                     partition.partition(), partitionLastOffset);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
index a364b19ce3a..822fe1ac24f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
@@ -18,6 +18,7 @@
 package org.apache.camel.component.kafka.consumer;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.camel.Exchange;
@@ -34,6 +35,7 @@ public class AsyncCommitManager extends AbstractCommitManager {
     private final Consumer<?, ?> consumer;
 
     private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits = new ConcurrentLinkedQueue<>();
+    private final OffsetCache offsetCache = new OffsetCache();
 
     public AsyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
         super(consumer, kafkaConsumer, threadId, printableTopic);
@@ -60,13 +62,13 @@ public class AsyncCommitManager extends AbstractCommitManager {
     }
 
     @Override
-    public void commitOffsetOnStop(TopicPartition partition, long partitionLastOffset) {
-        commitAsync(consumer, partition, partitionLastOffset);
-    }
+    public void commit(TopicPartition partition) {
+        Long offset = offsetCache.getOffset(partition);
+        if (offset == null) {
+            return;
+        }
 
-    @Override
-    public void commitOffset(TopicPartition partition, long partitionLastOffset) {
-        // NO-OP runs async
+        commitAsync(consumer, partition, offset);
     }
 
     private void commitAsync(Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset) {
@@ -74,8 +76,9 @@ public class AsyncCommitManager extends AbstractCommitManager {
             LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, partition.topic());
         }
 
-        consumer.commitAsync(
-                Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)), null);
+        final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap
+                = Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1));
+        consumer.commitAsync(topicPartitionOffsetAndMetadataMap, offsetCache::removeCommittedEntries);
     }
 
     @Override
@@ -89,4 +92,9 @@ public class AsyncCommitManager extends AbstractCommitManager {
 
         return getManualCommit(exchange, partition, record, asyncCommits, manualCommitFactory);
     }
+
+    @Override
+    public void recordOffset(TopicPartition partition, long partitionLastOffset) {
+        offsetCache.recordOffset(partition, partitionLastOffset);
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
index 13608a7dbe7..842a3364243 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
@@ -25,11 +25,28 @@ public interface CommitManager {
 
     KafkaManualCommit getManualCommit(Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record);
 
-    void commitOffset(TopicPartition partition, long partitionLastOffset);
-
-    void commitOffsetForce(TopicPartition partition, long partitionLastOffset);
-
-    void commitOffsetOnStop(TopicPartition partition, long partitionLastOffset);
+    /**
+     * Commits the offsets of the given partition
+     * 
+     * @param partition the partition to commit the offsets
+     */
+    void commit(TopicPartition partition);
+
+    /**
+     * Forcefully commits the offset of the given partition
+     * 
+     * @param partition           the partition to commit the offsets
+     * @param partitionLastOffset the last offset to commit
+     */
+    void forceCommit(TopicPartition partition, long partitionLastOffset);
+
+    /**
+     * Record the last processed offset for future commit
+     * 
+     * @param partition           the partition to commit the offsets
+     * @param partitionLastOffset the last offset to commit
+     */
+    void recordOffset(TopicPartition partition, long partitionLastOffset);
 
     @Deprecated
     default void processAsyncCommits() {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
index 849bad30c6f..1a073dce22b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 public class CommitToOffsetManager extends AbstractCommitManager {
     private static final Logger LOG = LoggerFactory.getLogger(CommitToOffsetManager.class);
+    private final OffsetCache offsetCache = new OffsetCache();
     private final StateRepository<String, String> offsetRepository;
 
     public CommitToOffsetManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
@@ -35,21 +36,17 @@ public class CommitToOffsetManager extends AbstractCommitManager {
     }
 
     @Override
-    public void commitOffsetOnStop(TopicPartition partition, long partitionLastOffset) {
-        saveStateToOffsetRepository(partition, partitionLastOffset, offsetRepository);
-    }
-
-    @Override
-    public void commitOffset(TopicPartition partition, long partitionLastOffset) {
-        if (partitionLastOffset == START_OFFSET) {
+    public void commit(TopicPartition partition) {
+        Long offset = offsetCache.getOffset(partition);
+        if (offset == null) {
             return;
         }
 
-        saveStateToOffsetRepository(partition, partitionLastOffset, offsetRepository);
+        saveStateToOffsetRepository(partition, offset, offsetRepository);
     }
 
     @Override
-    public void commitOffsetForce(TopicPartition partition, long partitionLastOffset) {
+    public void forceCommit(TopicPartition partition, long partitionLastOffset) {
         saveStateToOffsetRepository(partition, partitionLastOffset, offsetRepository);
     }
 
@@ -77,4 +74,13 @@ public class CommitToOffsetManager extends AbstractCommitManager {
     private static String serializeOffsetValue(long offset) {
         return String.valueOf(offset);
     }
+
+    @Override
+    public void recordOffset(TopicPartition partition, long partitionLastOffset) {
+        if (partitionLastOffset == START_OFFSET) {
+            return;
+        }
+
+        offsetCache.recordOffset(partition, partitionLastOffset);
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/NoopCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/NoopCommitManager.java
index 05f48300329..750104f1ad2 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/NoopCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/NoopCommitManager.java
@@ -37,16 +37,14 @@ public class NoopCommitManager extends AbstractCommitManager {
     }
 
     @Override
-    public void commitOffsetOnStop(TopicPartition partition, long partitionLastOffset) {
+    public void commit(TopicPartition partition) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commit on stop on {} from {} is enabled via Kafka consumer (NO-OP)", threadId, partition.topic());
+            LOG.debug("Auto commit to offset {} from topic {} is disabled (NO-OP)", threadId, partition.topic());
         }
     }
 
     @Override
-    public void commitOffset(TopicPartition partition, long partitionLastOffset) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commit to offset {} from topic {} is disabled (NO-OP)", threadId, partition.topic());
-        }
+    public void recordOffset(TopicPartition partition, long partitionLastOffset) {
+        // NO-OP
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/OffsetCache.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/OffsetCache.java
new file mode 100644
index 00000000000..af77bc2cae5
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/OffsetCache.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OffsetCache {
+    private static final Logger LOG = LoggerFactory.getLogger(OffsetCache.class);
+    private final Map<TopicPartition, Long> lastProcessedOffset = new ConcurrentHashMap<>();
+
+    public void recordOffset(TopicPartition partition, long partitionLastOffset) {
+        lastProcessedOffset.put(partition, partitionLastOffset);
+    }
+
+    public void removeCommittedEntries(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception) {
+        if (exception == null) {
+            committed.forEach(this::removeCommittedEntry);
+        } else {
+            LOG.error("Failed to commit offset: {}", exception.getMessage(), exception);
+        }
+    }
+
+    private void removeCommittedEntry(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
+        LOG.debug(
+                "Offset {} from topic {} from partition {} has been successfully committed and is being removed from tracking",
+                offsetAndMetadata.offset(),
+                topicPartition.topic(), topicPartition.partition());
+
+        lastProcessedOffset.remove(topicPartition);
+    }
+
+    public Long getOffset(TopicPartition partition) {
+        return lastProcessedOffset.get(partition);
+    }
+
+    public long cacheSize() {
+        return lastProcessedOffset.size();
+    }
+
+    public boolean contains(TopicPartition topicPartition) {
+        return lastProcessedOffset.containsKey(topicPartition);
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
index 9cda2f821e9..45b779392e7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.consumer;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Map;
 
 import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 public class SyncCommitManager extends AbstractCommitManager {
     private static final Logger LOG = LoggerFactory.getLogger(SyncCommitManager.class);
 
+    private final OffsetCache offsetCache = new OffsetCache();
     private final Consumer<?, ?> consumer;
 
     public SyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
@@ -45,27 +47,30 @@ public class SyncCommitManager extends AbstractCommitManager {
     }
 
     @Override
-    public void commitOffsetOnStop(TopicPartition partition, long partitionLastOffset) {
+    public void commit(TopicPartition partition) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commitSync on stop {} from topic {}", threadId, partition.topic());
+            LOG.debug("Auto commitSync from thread {} from topic {}", threadId, partition.topic());
         }
 
-        commitSync(partition, partitionLastOffset);
+        commitSync(partition);
     }
 
-    @Override
-    public void commitOffset(TopicPartition partition, long partitionLastOffset) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commitSync from thread {} from topic {}", threadId, partition.topic());
+    private void commitSync(TopicPartition partition) {
+        Long offset = offsetCache.getOffset(partition);
+        if (offset == null) {
+            return;
         }
 
-        commitSync(partition, partitionLastOffset);
+        final Map<TopicPartition, OffsetAndMetadata> offsets
+                = Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1));
+        long timeout = configuration.getCommitTimeoutMs();
+        consumer.commitSync(offsets, Duration.ofMillis(timeout));
+
+        offsetCache.removeCommittedEntries(offsets, null);
     }
 
-    private void commitSync(TopicPartition partition, long partitionLastOffset) {
-        long timeout = configuration.getCommitTimeoutMs();
-        consumer.commitSync(
-                Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)),
-                Duration.ofMillis(timeout));
+    @Override
+    public void recordOffset(TopicPartition partition, long partitionLastOffset) {
+        offsetCache.recordOffset(partition, partitionLastOffset);
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 723cd0f4e96..8e446d194b3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -132,7 +132,7 @@ public class KafkaRecordProcessor {
             }
 
             // force commit, so we resume on next poll where we failed
-            commitManager.commitOffsetForce(partition, partitionLastOffset);
+            commitManager.forceCommit(partition, partitionLastOffset);
 
             // continue to next partition
             return true;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index ee641acc08e..03d5e5b1e62 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.kafka.consumer.support;
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.camel.Exchange;
@@ -33,22 +32,18 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
-
 public class KafkaRecordProcessorFacade {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessorFacade.class);
 
     private final KafkaConsumer camelKafkaConsumer;
-    private final Map<String, Long> lastProcessedOffset;
     private final String threadId;
     private final KafkaRecordProcessor kafkaRecordProcessor;
     private final CommitManager commitManager;
     private final KafkaConsumerListener consumerListener;
 
-    public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, Map<String, Long> lastProcessedOffset, String threadId,
+    public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId,
                                       CommitManager commitManager, KafkaConsumerListener consumerListener) {
         this.camelKafkaConsumer = camelKafkaConsumer;
-        this.lastProcessedOffset = lastProcessedOffset;
         this.threadId = threadId;
         this.commitManager = commitManager;
 
@@ -85,7 +80,7 @@ public class KafkaRecordProcessorFacade {
 
                 if (consumerListener != null) {
                     if (!consumerListener.afterProcess(lastResult)) {
-                        commitManager.commitOffset(partition, lastResult.getPartitionLastOffset());
+                        commitManager.commit(partition);
                         return lastResult;
                     }
                 }
@@ -94,7 +89,7 @@ public class KafkaRecordProcessorFacade {
             if (!lastResult.isBreakOnErrorHit()) {
                 LOG.debug("Committing offset on successful execution");
                 // all records processed from partition so commit them
-                commitManager.commitOffset(partition, lastResult.getPartitionLastOffset());
+                commitManager.commit(partition);
             }
         }
 
@@ -131,7 +126,7 @@ public class KafkaRecordProcessorFacade {
                         recordHasNext, record, lastResult, camelKafkaConsumer.getExceptionHandler());
 
         if (!currentResult.isBreakOnErrorHit()) {
-            lastProcessedOffset.put(serializeOffsetKey(partition), currentResult.getPartitionLastOffset());
+            commitManager.recordOffset(partition, currentResult.getPartitionLastOffset());
         }
 
         // success so release the exchange
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index dcfafa03f99..81dc2adcd5f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.kafka.consumer.support;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -30,61 +29,33 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
-
 public class PartitionAssignmentListener implements ConsumerRebalanceListener {
     private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class);
 
     private final String threadId;
     private final KafkaConfiguration configuration;
-    private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private final CommitManager commitManager;
     private final Supplier<Boolean> stopStateSupplier;
 
     public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
-                                       Map<String, Long> lastProcessedOffset,
                                        Supplier<Boolean> stopStateSupplier, CommitManager commitManager,
                                        KafkaConsumerResumeStrategy resumeStrategy) {
         this.threadId = threadId;
         this.configuration = configuration;
-        this.lastProcessedOffset = lastProcessedOffset;
         this.commitManager = commitManager;
         this.stopStateSupplier = stopStateSupplier;
         this.resumeStrategy = resumeStrategy;
-
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-
-        // if camel is stopping, or we are not running
-        boolean stopping = stopStateSupplier.get();
-
         for (TopicPartition partition : partitions) {
             LOG.debug("onPartitionsRevoked: {} from {}", threadId, partition.topic());
 
-            String offsetKey = serializeOffsetKey(partition);
-            Long offset = lastProcessedOffset.get(offsetKey);
-            if (offset == null) {
-                offset = -1L;
-            }
-            try {
-                // only commit offsets if the component has control
-                if (!configuration.getAutoCommitEnable() && offset != -1L) {
-                    if (stopping) {
-                        commitManager.commitOffsetOnStop(partition, offset);
-                    } else {
-                        commitManager.commitOffset(partition, offset);
-                    }
-
-                }
-            } catch (Exception e) {
-                LOG.error("Error saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey,
-                        offset);
-                throw e;
-            } finally {
-                lastProcessedOffset.remove(offsetKey);
+            // only commit offsets if the component has control
+            if (!configuration.getAutoCommitEnable()) {
+                commitManager.commit(partition);
             }
         }
     }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/consumer/OffsetCacheTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/consumer/OffsetCacheTest.java
new file mode 100644
index 00000000000..0b515280869
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/consumer/OffsetCacheTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class OffsetCacheTest {
+    private OffsetCache offsetCache = new OffsetCache();
+
+    @Order(1)
+    @Test
+    @DisplayName("Tests whether the cache can record offset a single offset")
+    void updateOffsetsSinglePartition() {
+        final TopicPartition topic1 = new TopicPartition("topic1", 1);
+
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic1, 1));
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic1, 2));
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic1, 2),
+                "The cache should not throw exceptions for duplicate records");
+    }
+
+    @Order(2)
+    @Test
+    @DisplayName("Tests whether the cache can retrieve offset information")
+    void getOffset() {
+        final TopicPartition topic1 = new TopicPartition("topic1", 1);
+
+        assertTrue(offsetCache.contains(topic1));
+        assertEquals(2, offsetCache.getOffset(topic1));
+        assertEquals(1, offsetCache.cacheSize());
+    }
+
+    @Order(3)
+    @Test
+    @DisplayName("Tests whether the cache records and updates multiple offsets to be committed")
+    void updateOffsetsMultiplePartitionsSameTopic() {
+        final TopicPartition topic11 = new TopicPartition("topic1", 1);
+        final TopicPartition topic12 = new TopicPartition("topic1", 2);
+        final TopicPartition topic13 = new TopicPartition("topic1", 3);
+
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic11, 1));
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic11, 2));
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic11, 2),
+                "The cache should not throw exceptions for duplicate records");
+
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic12, 1));
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic12, 2));
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic12, 2),
+                "The cache should not throw exceptions for duplicate records");
+
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic13, 3));
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic13, 4));
+        assertDoesNotThrow(() -> offsetCache.recordOffset(topic13, 5),
+                "The cache should not throw exceptions for duplicate records");
+
+        assertTrue(offsetCache.contains(topic11), "The cache should contain an entry for the topic1 on partition 1");
+        assertTrue(offsetCache.contains(topic12), "The cache should contain an entry for the topic1 on partition 2");
+        assertTrue(offsetCache.contains(topic13), "The cache should contain an entry for the topic1 on partition 3");
+
+        assertEquals(2, offsetCache.getOffset(topic11));
+        assertEquals(2, offsetCache.getOffset(topic12));
+        assertEquals(5, offsetCache.getOffset(topic13));
+
+        assertEquals(3, offsetCache.cacheSize());
+    }
+
+    @Order(4)
+    @Test
+    @DisplayName("Tests whether the cache removes committed offsets")
+    void removeCommittedEntries() {
+        final TopicPartition topic11 = new TopicPartition("topic1", 1);
+        final TopicPartition topic12 = new TopicPartition("topic1", 2);
+        final TopicPartition topic13 = new TopicPartition("topic1", 3);
+
+        final Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(topic12, new OffsetAndMetadata(3));
+
+        offsetCache.removeCommittedEntries(offsets, null);
+
+        assertEquals(2, offsetCache.getOffset(topic11));
+        assertNull(offsetCache.getOffset(topic12));
+        assertEquals(5, offsetCache.getOffset(topic13));
+
+        assertEquals(2, offsetCache.cacheSize());
+    }
+
+    @Order(5)
+    @Test
+    @DisplayName("Tests whether the cache retains offsets if the consumer fails to commit")
+    void removeRetainCommittedEntries() {
+        final TopicPartition topic13 = new TopicPartition("topic1", 3);
+        final Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(topic13, new OffsetAndMetadata(3));
+
+        assertDoesNotThrow(() -> offsetCache.removeCommittedEntries(offsets, new Exception("Fake exception")));
+        assertEquals(2, offsetCache.cacheSize());
+    }
+}


[camel] 02/02: CAMEL-18024: use the commit manager for manual commits

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit aa306b7100b2d9b4ef7f1556b26b2a37cc63915d
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Apr 28 13:39:26 2022 +0200

    CAMEL-18024: use the commit manager for manual commits
    
    This avoids duplication of commit code, simplifies the manual commit factories and remove the need for queueing the async commits
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 11 +-----
 .../kafka/consumer/AbstractCommitManager.java      |  8 ++--
 .../kafka/consumer/AsyncCommitManager.java         | 14 +------
 .../component/kafka/consumer/CommitManager.java    | 12 +++---
 .../consumer/DefaultKafkaManualAsyncCommit.java    | 45 +++-------------------
 .../DefaultKafkaManualAsyncCommitFactory.java      | 27 +------------
 .../kafka/consumer/DefaultKafkaManualCommit.java   |  9 +----
 .../consumer/DefaultKafkaManualCommitFactory.java  | 26 +------------
 .../consumer/DefaultKafkaManualSyncCommit.java     | 32 +++------------
 .../kafka/consumer/KafkaAsyncManualCommit.java     |  5 +--
 .../kafka/consumer/KafkaManualCommitFactory.java   | 17 +-------
 .../support/KafkaRecordProcessorFacade.java        |  3 +-
 .../support/PartitionAssignmentListener.java       |  5 +--
 .../integration/pause/KafkaPausableConsumerIT.java | 12 ++++--
 .../ROOT/pages/camel-3x-upgrade-guide-3_17.adoc    |  7 ++++
 15 files changed, 47 insertions(+), 186 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 51553f0fa55..c2002826d7c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -276,8 +276,7 @@ public class KafkaFetchRecords implements Runnable {
         resumeStrategy.setConsumer(consumer);
 
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
-                threadId, kafkaConsumer.getEndpoint().getConfiguration(),
-                this::isRunnable, commitManager, resumeStrategy);
+                threadId, kafkaConsumer.getEndpoint().getConfiguration(), commitManager, resumeStrategy);
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
@@ -318,9 +317,7 @@ public class KafkaFetchRecords implements Runnable {
                     }
                 }
 
-                commitManager.processAsyncCommits();
-
-                ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, consumer);
+                ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
 
                 if (result.isBreakOnErrorHit()) {
                     LOG.debug("We hit an error ... setting flags to force reconnect");
@@ -427,10 +424,6 @@ public class KafkaFetchRecords implements Runnable {
                 && !kafkaConsumer.isSuspendingOrSuspended();
     }
 
-    private boolean isRunnable() {
-        return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && !kafkaConsumer.isRunAllowed();
-    }
-
     private boolean isReconnect() {
         return reconnect;
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
index bce04599f74..6b1b3dd4d5d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
@@ -18,7 +18,6 @@
 package org.apache.camel.component.kafka.consumer;
 
 import java.time.Duration;
-import java.util.Collection;
 import java.util.Collections;
 
 import org.apache.camel.Exchange;
@@ -53,19 +52,18 @@ public abstract class AbstractCommitManager implements CommitManager {
 
     protected KafkaManualCommit getManualCommit(
             Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record,
-            Collection<KafkaAsyncManualCommit> asyncCommits,
             KafkaManualCommitFactory manualCommitFactory) {
 
         StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
         long commitTimeoutMs = configuration.getCommitTimeoutMs();
 
         KafkaManualCommitFactory.CamelExchangePayload camelExchangePayload = new KafkaManualCommitFactory.CamelExchangePayload(
-                exchange, consumer, threadId, offsetRepository, asyncCommits);
+                exchange, consumer, threadId, offsetRepository);
         KafkaManualCommitFactory.KafkaRecordPayload kafkaRecordPayload = new KafkaManualCommitFactory.KafkaRecordPayload(
                 partition,
                 record.offset(), commitTimeoutMs);
 
-        return manualCommitFactory.newInstance(camelExchangePayload, kafkaRecordPayload);
+        return manualCommitFactory.newInstance(camelExchangePayload, kafkaRecordPayload, this);
     }
 
     @Override
@@ -77,7 +75,7 @@ public abstract class AbstractCommitManager implements CommitManager {
             manualCommitFactory = new DefaultKafkaManualCommitFactory();
         }
 
-        return getManualCommit(exchange, partition, record, null, manualCommitFactory);
+        return getManualCommit(exchange, partition, record, manualCommitFactory);
     }
 
     @Override
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
index 822fe1ac24f..d05778bdaa9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.kafka.consumer;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.component.kafka.KafkaConsumer;
@@ -34,7 +33,6 @@ public class AsyncCommitManager extends AbstractCommitManager {
     private static final Logger LOG = LoggerFactory.getLogger(AsyncCommitManager.class);
     private final Consumer<?, ?> consumer;
 
-    private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits = new ConcurrentLinkedQueue<>();
     private final OffsetCache offsetCache = new OffsetCache();
 
     public AsyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
@@ -43,18 +41,8 @@ public class AsyncCommitManager extends AbstractCommitManager {
         this.consumer = consumer;
     }
 
-    @Override
-    @Deprecated
-    public void processAsyncCommits() {
-        while (!asyncCommits.isEmpty()) {
-            asyncCommits.poll().processAsyncCommit();
-        }
-    }
-
     @Override
     public void commit() {
-        processAsyncCommits();
-
         if (kafkaConsumer.getEndpoint().getConfiguration().isAutoCommitEnable()) {
             LOG.info("Auto commitAsync {} from {}", threadId, printableTopic);
             consumer.commitAsync();
@@ -90,7 +78,7 @@ public class AsyncCommitManager extends AbstractCommitManager {
             manualCommitFactory = new DefaultKafkaManualAsyncCommitFactory();
         }
 
-        return getManualCommit(exchange, partition, record, asyncCommits, manualCommitFactory);
+        return getManualCommit(exchange, partition, record, manualCommitFactory);
     }
 
     @Override
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
index 842a3364243..df0a2546605 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
@@ -25,6 +25,11 @@ public interface CommitManager {
 
     KafkaManualCommit getManualCommit(Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record);
 
+    /**
+     * Commits everything that has been cached
+     */
+    void commit();
+
     /**
      * Commits the offsets of the given partition
      * 
@@ -47,11 +52,4 @@ public interface CommitManager {
      * @param partitionLastOffset the last offset to commit
      */
     void recordOffset(TopicPartition partition, long partitionLastOffset);
-
-    @Deprecated
-    default void processAsyncCommits() {
-
-    }
-
-    void commit();
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
index ddc3eb19478..fbc4d0610a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
@@ -16,52 +16,19 @@
  */
 package org.apache.camel.component.kafka.consumer;
 
-import java.util.Collections;
-
-import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit implements KafkaAsyncManualCommit {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaManualAsyncCommit.class);
+    private final CommitManager commitManager;
 
     public DefaultKafkaManualAsyncCommit(KafkaManualCommitFactory.CamelExchangePayload camelExchangePayload,
-                                         KafkaManualCommitFactory.KafkaRecordPayload recordPayload) {
+                                         KafkaManualCommitFactory.KafkaRecordPayload recordPayload,
+                                         CommitManager commitManager) {
         super(camelExchangePayload, recordPayload);
-    }
 
-    @Override
-    public void commit() {
-        camelExchangePayload.asyncCommits.add(this);
+        this.commitManager = commitManager;
     }
 
     @Override
-    public void processAsyncCommit() {
-        commitAsyncOffset(getOffsetRepository(), getPartition(), getRecordOffset());
-    }
-
-    protected void commitAsyncOffset(
-            StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset) {
-        if (recordOffset != AbstractCommitManager.START_OFFSET) {
-            if (offsetRepository != null) {
-                offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(recordOffset));
-            } else {
-                LOG.debug("Commit async {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
-                camelExchangePayload.consumer.commitAsync(
-                        Collections.singletonMap(partition, new OffsetAndMetadata(recordOffset + 1)),
-                        (offsets, exception) -> {
-                            if (exception != null) {
-                                LOG.error("Error during async commit for {} from topic {} with offset {}: ",
-                                        getThreadId(), getTopicName(), recordOffset, exception);
-                            } else {
-                                LOG.debug("CommitAsync done for {} from topic {} with offset: {}", getThreadId(),
-                                        getTopicName(), recordOffset);
-                            }
-                        });
-            }
-        }
+    public void commit() {
+        commitManager.forceCommit(getPartition(), getRecordOffset());
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java
index a92b78004b2..bc48e91f3de 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java
@@ -16,34 +16,11 @@
  */
 package org.apache.camel.component.kafka.consumer;
 
-import java.util.Collection;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-
 public class DefaultKafkaManualAsyncCommitFactory implements KafkaManualCommitFactory {
 
-    @Override
-    public KafkaManualCommit newInstance(CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload) {
-        return new DefaultKafkaManualAsyncCommit(camelExchangePayload, kafkaRecordPayload);
-    }
-
-    /**
-     * @deprecated Use DefaultKafkaManualAsyncCommitFactory#newInstance(CamelExchangePayload, KafkaRecordPayload)
-     */
-    @Deprecated(since = "3.15.0")
     @Override
     public KafkaManualCommit newInstance(
-            Exchange exchange, Consumer consumer, String topicName, String threadId,
-            StateRepository<String, String> offsetRepository,
-            TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
-
-        CamelExchangePayload camelExchangePayload
-                = new CamelExchangePayload(exchange, consumer, threadId, offsetRepository, asyncCommits);
-        KafkaRecordPayload kafkaRecordPayload = new KafkaRecordPayload(partition, recordOffset, commitTimeout);
-
-        return newInstance(camelExchangePayload, kafkaRecordPayload);
+            CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload, CommitManager commitManager) {
+        return new DefaultKafkaManualAsyncCommit(camelExchangePayload, kafkaRecordPayload, commitManager);
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
index 546f72cd2aa..5d35c7b604a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
@@ -31,14 +31,6 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
         this.kafkaRecordPayload = kafkaRecordPayload;
     }
 
-    protected String serializeOffsetKey(TopicPartition topicPartition) {
-        return topicPartition.topic() + '/' + topicPartition.partition();
-    }
-
-    protected String serializeOffsetValue(long offset) {
-        return String.valueOf(offset);
-    }
-
     /**
      * @deprecated Use {@link #getCamelExchangePayload()}
      */
@@ -55,6 +47,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
         return camelExchangePayload.threadId;
     }
 
+    @Deprecated
     public StateRepository<String, String> getOffsetRepository() {
         return camelExchangePayload.offsetRepository;
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java
index 9643ee5f888..92599e739af 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java
@@ -16,33 +16,11 @@
  */
 package org.apache.camel.component.kafka.consumer;
 
-import java.util.Collection;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-
 public class DefaultKafkaManualCommitFactory implements KafkaManualCommitFactory {
 
-    @Override
-    public KafkaManualCommit newInstance(CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload) {
-        return new DefaultKafkaManualSyncCommit(camelExchangePayload, kafkaRecordPayload);
-    }
-
-    /**
-     * @deprecated Use DefaultKafkaManualCommitFactory#newInstance(CamelExchangePayload, KafkaRecordPayload)
-     */
-    @Deprecated(since = "3.15.0")
     @Override
     public KafkaManualCommit newInstance(
-            Exchange exchange, Consumer consumer, String topicName, String threadId,
-            StateRepository<String, String> offsetRepository,
-            TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
-        CamelExchangePayload camelExchangePayload
-                = new CamelExchangePayload(exchange, consumer, threadId, offsetRepository, asyncCommits);
-        KafkaRecordPayload kafkaRecordPayload = new KafkaRecordPayload(partition, recordOffset, commitTimeout);
-
-        return newInstance(camelExchangePayload, kafkaRecordPayload);
+            CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload, CommitManager commitManager) {
+        return new DefaultKafkaManualSyncCommit(camelExchangePayload, kafkaRecordPayload, commitManager);
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
index 1d5aed92526..29932cb810e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
@@ -16,40 +16,18 @@
  */
 package org.apache.camel.component.kafka.consumer;
 
-import java.time.Duration;
-import java.util.Collections;
-
-import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class DefaultKafkaManualSyncCommit extends DefaultKafkaManualCommit implements KafkaManualCommit {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaManualSyncCommit.class);
+    private final CommitManager commitManager;
 
     public DefaultKafkaManualSyncCommit(KafkaManualCommitFactory.CamelExchangePayload camelExchangePayload,
-                                        KafkaManualCommitFactory.KafkaRecordPayload kafkaRecordPayload) {
+                                        KafkaManualCommitFactory.KafkaRecordPayload kafkaRecordPayload,
+                                        CommitManager commitManager) {
         super(camelExchangePayload, kafkaRecordPayload);
+        this.commitManager = commitManager;
     }
 
     @Override
     public void commit() {
-        commitOffset(getOffsetRepository(), getPartition(), getRecordOffset());
-    }
-
-    protected void commitOffset(StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset) {
-        if (recordOffset != AbstractCommitManager.START_OFFSET) {
-            if (offsetRepository != null) {
-                offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(recordOffset));
-            } else {
-                LOG.debug("Commit sync {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
-                camelExchangePayload.consumer.commitSync(
-                        Collections.singletonMap(partition, new OffsetAndMetadata(recordOffset + 1)),
-                        Duration.ofMillis(getCommitTimeout()));
-                LOG.debug("Commit sync done for {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
-            }
-        }
+        commitManager.forceCommit(getPartition(), getRecordOffset());
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
index eff9eea3763..e6ebcf134b1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
@@ -20,8 +20,5 @@ package org.apache.camel.component.kafka.consumer;
  * Can be used for forcing async manual offset commit when using Kafka consumer.
  */
 public interface KafkaAsyncManualCommit extends KafkaManualCommit {
-    /**
-     * Used in the consumer loop to effectively call org.apache.kafka.clients.consumer.KafkaConsumer#commitAsync()
-     */
-    void processAsyncCommit();
+
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java
index 162448a5445..bfab9b356be 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.kafka.consumer;
 
-import java.util.Collection;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -35,16 +33,13 @@ public interface KafkaManualCommitFactory {
         public final Consumer<?, ?> consumer;
         public final String threadId;
         public final StateRepository<String, String> offsetRepository;
-        public final Collection<KafkaAsyncManualCommit> asyncCommits;
 
         public CamelExchangePayload(Exchange exchange, Consumer<?, ?> consumer, String threadId,
-                                    StateRepository<String, String> offsetRepository,
-                                    Collection<KafkaAsyncManualCommit> asyncCommits) {
+                                    StateRepository<String, String> offsetRepository) {
             this.exchange = exchange;
             this.consumer = consumer;
             this.threadId = threadId;
             this.offsetRepository = offsetRepository;
-            this.asyncCommits = asyncCommits;
         }
     }
 
@@ -69,14 +64,6 @@ public interface KafkaManualCommitFactory {
      * @param camelExchangePayload the exchange-related payload from Camel
      * @param kafkaRecordPayload   the record-related payload from Kafka
      */
-    KafkaManualCommit newInstance(CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload);
-
-    /**
-     * @deprecated Use KafkaManualCommitFactory#newInstance(CamelExchangePayload, KafkaRecordPayload)
-     */
-    @Deprecated(since = "3.15.0")
     KafkaManualCommit newInstance(
-            Exchange exchange, Consumer consumer, String topicName, String threadId,
-            StateRepository<String, String> offsetRepository,
-            TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits);
+            CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload, CommitManager commitManager);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 03d5e5b1e62..53e519523e7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -25,7 +25,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -55,7 +54,7 @@ public class KafkaRecordProcessorFacade {
         return camelKafkaConsumer.isStopping();
     }
 
-    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords, Consumer<?, ?> consumer) {
+    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
         logRecords(allRecords);
 
         Set<TopicPartition> partitions = allRecords.partitions();
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 81dc2adcd5f..e471d1e2ea6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.kafka.consumer.support;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
@@ -36,15 +35,13 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
     private final KafkaConfiguration configuration;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private final CommitManager commitManager;
-    private final Supplier<Boolean> stopStateSupplier;
 
     public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
-                                       Supplier<Boolean> stopStateSupplier, CommitManager commitManager,
+                                       CommitManager commitManager,
                                        KafkaConsumerResumeStrategy resumeStrategy) {
         this.threadId = threadId;
         this.configuration = configuration;
         this.commitManager = commitManager;
-        this.stopStateSupplier = stopStateSupplier;
         this.resumeStrategy = resumeStrategy;
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
index 7ed6fdeb57f..369bd194e9f 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
@@ -55,8 +55,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
     // Just a wrapper for us to check if the expected methods are being called
     private static class TestListener extends KafkaConsumerListener {
-        boolean afterConsumeCalled;
-        boolean afterProcessCalled;
+        volatile boolean afterConsumeCalled;
+        volatile boolean afterProcessCalled;
 
         @Override
         public boolean afterConsume(Object ignored) {
@@ -190,8 +190,12 @@ public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
 
         await().atMost(30, TimeUnit.SECONDS).untilAdder(count, greaterThan(10L));
 
-        assertTrue(testConsumerListener.afterConsumeCalled, "The afterConsume method should have been called");
-        assertTrue(testConsumerListener.afterProcessCalled, "The afterProcess method should have been called");
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(testConsumerListener.afterConsumeCalled,
+                        "The afterConsume method should have been called"));
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(testConsumerListener.afterProcessCalled,
+                        "The afterProcess method should have been called"));
 
         to.assertIsSatisfied();
         assertEquals(5, to.getExchanges().size(), "Did not receive the expected amount of messages");
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_17.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_17.adoc
index 61e118df0d2..865b7c7550f 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_17.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_17.adoc
@@ -96,6 +96,13 @@ Asynchronous, Synchronous or NO-OP commit policies from the former `autoCommitOn
 * Async can be set using `kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory`
 * Sync can be set using `kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory`
 
+The deprecated constructors for the kafkaManualCommitFactory have been removed. The constructor should now receive the following parameters:
+
+```
+CamelExchangePayload camelExchangePayload, KafkaRecordPayload kafkaRecordPayload, CommitManager commitManager
+```
+
+
 === camel-platform-http-vertx
 
 The configuration for body handler file uploads has changed from `true` to `false`.