You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/11 14:51:03 UTC

[camel] 02/02: CAMEL-18148: allow updating using the key and value separately

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 23812e3b710ff249775a091c9615287ea4d695b4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 11 13:47:06 2022 +0200

    CAMEL-18148: allow updating using the key and value separately
---
 .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java | 11 ++++++++---
 .../src/main/java/org/apache/camel/resume/ResumeStrategy.java |  9 +++++++++
 .../camel/processor/resume/TransientResumeStrategy.java       |  7 +++++++
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 8689eeed96e..995d4103740 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -132,8 +132,13 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             LOG.debug("Updating offset on Kafka with key {} to {}", key.getValue(), offsetValue.getValue());
         }
 
-        ByteBuffer keyBuffer = key.serialize();
-        ByteBuffer valueBuffer = offsetValue.serialize();
+        updateLastOffset(key, offsetValue);
+    }
+
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception {
+        ByteBuffer keyBuffer = offsetKey.serialize();
+        ByteBuffer valueBuffer = offsetKey.serialize();
 
         try {
             lock.lock();
@@ -142,7 +147,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             lock.unlock();
         }
 
-        doAdd(key, offsetValue);
+        doAdd(offsetKey, offset);
     }
 
     /**
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index a9325b829b4..145fdc7145d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -65,4 +65,13 @@ public interface ResumeStrategy extends Service {
      * @throws Exception if unable to update the offset
      */
     <T extends Resumable> void updateLastOffset(T offset) throws Exception;
+
+    /**
+     * Updates the last processed offset
+     *
+     * @param  offset    the offset key to update
+     * @param  offset    the offset value to update
+     * @throws Exception if unable to update the offset
+     */
+    void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception;
 }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 0e926152536..55147474ea2 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.processor.resume;
 
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeStrategy;
@@ -52,6 +54,11 @@ public class TransientResumeStrategy implements ResumeStrategy {
         // this is NO-OP
     }
 
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) {
+        // this is NO-OP
+    }
+
     @Override
     public void start() {
         // this is NO-OP