You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/11 14:51:03 UTC
[camel] 02/02: CAMEL-18148: allow updating using the key and value separately
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 23812e3b710ff249775a091c9615287ea4d695b4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 11 13:47:06 2022 +0200
CAMEL-18148: allow updating using the key and value separately
---
.../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java | 11 ++++++++---
.../src/main/java/org/apache/camel/resume/ResumeStrategy.java | 9 +++++++++
.../camel/processor/resume/TransientResumeStrategy.java | 7 +++++++
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 8689eeed96e..995d4103740 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -132,8 +132,13 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
LOG.debug("Updating offset on Kafka with key {} to {}", key.getValue(), offsetValue.getValue());
}
- ByteBuffer keyBuffer = key.serialize();
- ByteBuffer valueBuffer = offsetValue.serialize();
+ updateLastOffset(key, offsetValue);
+ }
+
+ @Override
+ public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception {
+ ByteBuffer keyBuffer = offsetKey.serialize();
+ ByteBuffer valueBuffer = offsetKey.serialize();
try {
lock.lock();
@@ -142,7 +147,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
lock.unlock();
}
- doAdd(key, offsetValue);
+ doAdd(offsetKey, offset);
}
/**
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index a9325b829b4..145fdc7145d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -65,4 +65,13 @@ public interface ResumeStrategy extends Service {
* @throws Exception if unable to update the offset
*/
<T extends Resumable> void updateLastOffset(T offset) throws Exception;
+
+ /**
+ * Updates the last processed offset
+ *
+ * @param offset the offset key to update
+ * @param offset the offset value to update
+ * @throws Exception if unable to update the offset
+ */
+ void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception;
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 0e926152536..55147474ea2 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -17,6 +17,8 @@
package org.apache.camel.processor.resume;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeStrategy;
@@ -52,6 +54,11 @@ public class TransientResumeStrategy implements ResumeStrategy {
// this is NO-OP
}
+ @Override
+ public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) {
+ // this is NO-OP
+ }
+
@Override
public void start() {
// this is NO-OP