You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/11 14:51:01 UTC

[camel] branch main updated (6da9ba16648 -> 23812e3b710)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 6da9ba16648 CAMEL-18601: replace assertMockEndpointsSatisfied() in doc (#8518)
     new 04a704c7d50 CAMEL-18148: allow deserializing the key and value separately
     new 23812e3b710 CAMEL-18148: allow updating using the key and value separately

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java    |  4 ++--
 .../file/consumer/adapters/FileResumeAdapterDelegate.java     |  4 ++--
 .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java | 11 ++++++++---
 .../src/main/java/org/apache/camel/resume/Deserializable.java |  8 ++++++++
 .../src/main/java/org/apache/camel/resume/ResumeStrategy.java |  9 +++++++++
 .../camel/processor/resume/TransientResumeStrategy.java       |  7 +++++++
 .../apache/camel/support/resume/ResumeActionAwareAdapter.java |  4 ++--
 7 files changed, 38 insertions(+), 9 deletions(-)


[camel] 02/02: CAMEL-18148: allow updating using the key and value separately

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 23812e3b710ff249775a091c9615287ea4d695b4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 11 13:47:06 2022 +0200

    CAMEL-18148: allow updating using the key and value separately
---
 .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java | 11 ++++++++---
 .../src/main/java/org/apache/camel/resume/ResumeStrategy.java |  9 +++++++++
 .../camel/processor/resume/TransientResumeStrategy.java       |  7 +++++++
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 8689eeed96e..995d4103740 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -132,8 +132,13 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             LOG.debug("Updating offset on Kafka with key {} to {}", key.getValue(), offsetValue.getValue());
         }
 
-        ByteBuffer keyBuffer = key.serialize();
-        ByteBuffer valueBuffer = offsetValue.serialize();
+        updateLastOffset(key, offsetValue);
+    }
+
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception {
+        ByteBuffer keyBuffer = offsetKey.serialize();
+        ByteBuffer valueBuffer = offsetKey.serialize();
 
         try {
             lock.lock();
@@ -142,7 +147,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             lock.unlock();
         }
 
-        doAdd(key, offsetValue);
+        doAdd(offsetKey, offset);
     }
 
     /**
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index a9325b829b4..145fdc7145d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -65,4 +65,13 @@ public interface ResumeStrategy extends Service {
      * @throws Exception if unable to update the offset
      */
     <T extends Resumable> void updateLastOffset(T offset) throws Exception;
+
+    /**
+     * Updates the last processed offset
+     *
+     * @param  offset    the offset key to update
+     * @param  offset    the offset value to update
+     * @throws Exception if unable to update the offset
+     */
+    void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception;
 }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 0e926152536..55147474ea2 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.processor.resume;
 
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeStrategy;
@@ -52,6 +54,11 @@ public class TransientResumeStrategy implements ResumeStrategy {
         // this is NO-OP
     }
 
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) {
+        // this is NO-OP
+    }
+
     @Override
     public void start() {
         // this is NO-OP


[camel] 01/02: CAMEL-18148: allow deserializing the key and value separately

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 04a704c7d509ffd588917e52248ef0eabba6f6a0
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 11 13:35:11 2022 +0200

    CAMEL-18148: allow deserializing the key and value separately
---
 .../aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java        | 4 ++--
 .../file/consumer/adapters/FileResumeAdapterDelegate.java         | 4 ++--
 .../src/main/java/org/apache/camel/resume/Deserializable.java     | 8 ++++++++
 .../org/apache/camel/support/resume/ResumeActionAwareAdapter.java | 4 ++--
 4 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
index 72172ef4bd5..0c472acc8cd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
@@ -88,8 +88,8 @@ public class KinesisDefaultResumeAdapter implements KinesisResumeAdapter, Cachea
 
     @Override
     public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
-        Object keyObj = deserializeObject(keyBuffer);
-        Object valueObj = deserializeObject(valueBuffer);
+        Object keyObj = deserializeKey(keyBuffer);
+        Object valueObj = deserializeValue(valueBuffer);
 
         add(keyObj, valueObj);
 
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
index c5f0ffaa2c9..79def33b4e3 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
@@ -74,8 +74,8 @@ public class FileResumeAdapterDelegate
 
     @Override
     public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
-        Object keyObj = deserializeObject(keyBuffer);
-        Object valueObj = deserializeObject(valueBuffer);
+        Object keyObj = deserializeKey(keyBuffer);
+        Object valueObj = deserializeValue(valueBuffer);
 
         if (valueObj instanceof File) {
             directoryEntriesResumeAdapter.deserializeFileEntry((File) keyObj, (File) valueObj);
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java b/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java
index a40a05ad180..73a3d36b2f7 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java
@@ -53,5 +53,13 @@ public interface Deserializable {
         }
     }
 
+    default Object deserializeKey(ByteBuffer keyBuffer) {
+        return deserializeObject(keyBuffer);
+    }
+
+    default Object deserializeValue(ByteBuffer valueBuffer) {
+        return deserializeObject(valueBuffer);
+    }
+
     boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer);
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
index 03faf8f267d..563f64c8889 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
@@ -73,8 +73,8 @@ public class ResumeActionAwareAdapter implements ResumeActionAware, Cacheable, D
 
     @Override
     public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
-        Object key = deserializeObject(keyBuffer);
-        Object value = deserializeObject(valueBuffer);
+        Object key = deserializeKey(keyBuffer);
+        Object value = deserializeValue(valueBuffer);
 
         return add(key, value);
     }