You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/13 08:53:24 UTC

[camel] branch main updated: CAMEL-18148: Added support for running callbacks on offset updates

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b4887d5c821 CAMEL-18148: Added support for running callbacks on offset updates
b4887d5c821 is described below

commit b4887d5c821f03dcecbf9c429678235fd86f9e04
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Oct 12 10:27:43 2022 +0200

    CAMEL-18148: Added support for running callbacks on offset updates
---
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 46 ++++++++++------------
 .../org/apache/camel/resume/ResumeStrategy.java    | 36 ++++++++++++++++-
 .../processor/resume/TransientResumeStrategy.java  | 12 +++++-
 3 files changed, 65 insertions(+), 29 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 995d4103740..fe7459f9d09 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -24,8 +24,6 @@ import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -66,8 +64,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     private Producer<byte[], byte[]> producer;
     private Duration pollDuration = Duration.ofSeconds(1);
 
-    private final Queue<RecordError> producerErrors = new ConcurrentLinkedQueue<>();
-
     private boolean subscribed;
     private ResumeAdapter adapter;
     private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
@@ -102,15 +98,19 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      * @param  message              the message to send
      * @throws ExecutionException
      * @throws InterruptedException
-     * @see                         SingleNodeKafkaResumeStrategy#getProducerErrors()
+     *
      */
-    protected void produce(byte[] key, byte[] message) throws ExecutionException, InterruptedException {
+    protected void produce(byte[] key, byte[] message, UpdateCallBack updateCallBack)
+            throws ExecutionException, InterruptedException {
         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(resumeStrategyConfiguration.getTopic(), key, message);
 
         producer.send(record, (recordMetadata, e) -> {
-            if (e != null) {
-                LOG.error("Failed to send message {}", e.getMessage(), e);
-                producerErrors.add(new RecordError(recordMetadata, e));
+            LOG.error("Failed to send message {}", e.getMessage(), e);
+
+            if (updateCallBack != null) {
+                updateCallBack.onUpdate(e);
+            } else {
+                LOG.warn("The is no callback installed for handling errors when producing records to Kafka");
             }
         });
     }
@@ -125,6 +125,11 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     @Override
     public <T extends Resumable> void updateLastOffset(T offset) throws Exception {
+        updateLastOffset(offset, null);
+    }
+
+    @Override
+    public <T extends Resumable> void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception {
         OffsetKey<?> key = offset.getOffsetKey();
         Offset<?> offsetValue = offset.getLastOffset();
 
@@ -137,12 +142,17 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     @Override
     public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception {
+        updateLastOffset(offsetKey, offset, null);
+    }
+
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception {
         ByteBuffer keyBuffer = offsetKey.serialize();
         ByteBuffer valueBuffer = offsetKey.serialize();
 
         try {
             lock.lock();
-            produce(keyBuffer.array(), valueBuffer.array());
+            produce(keyBuffer.array(), valueBuffer.array(), updateCallBack);
         } finally {
             lock.unlock();
         }
@@ -337,15 +347,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         this.adapter = adapter;
     }
 
-    /**
-     * Gets the set record of sent items
-     *
-     * @return A collection with all the record errors
-     */
-    protected Collection<RecordError> getProducerErrors() {
-        return Collections.unmodifiableCollection(producerErrors);
-    }
-
     @Override
     public void build() {
         // NO-OP
@@ -419,13 +420,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         return producer;
     }
 
-    /**
-     * Clear the producer errors
-     */
-    public void resetProducerErrors() {
-        producerErrors.clear();
-    }
-
     protected KafkaResumeStrategyConfiguration getResumeStrategyConfiguration() {
         return resumeStrategyConfiguration;
     }
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 145fdc7145d..f565961f028 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -24,6 +24,18 @@ import org.apache.camel.Service;
  * processing records.
  */
 public interface ResumeStrategy extends Service {
+    /**
+     * A callback that can be executed after the last offset is updated
+     */
+    @FunctionalInterface
+    interface UpdateCallBack {
+        /**
+         * The method to execute after the last offset is updated
+         * @param throwable an instance of a Throwable if an exception was thrown during the update process
+         */
+        void onUpdate(Throwable throwable);
+    }
+
     String DEFAULT_NAME = "resumeStrategy";
 
     /**
@@ -66,12 +78,32 @@ public interface ResumeStrategy extends Service {
      */
     <T extends Resumable> void updateLastOffset(T offset) throws Exception;
 
+
+    /**
+     * Updates the last processed offset
+     *
+     * @param  offset    the offset to update
+     * @param updateCallBack a callback to be executed after the updated has occurred (null if not available)
+     * @throws Exception if unable to update the offset
+     */
+    <T extends Resumable> void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception;
+
+    /**
+     * Updates the last processed offset
+     *
+     * @param  offsetKey    the offset key to update
+     * @param  offsetValue    the offset value to update
+     * @throws Exception if unable to update the offset
+     */
+    void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offsetValue) throws Exception;
+
     /**
      * Updates the last processed offset
      *
-     * @param  offset    the offset key to update
+     * @param  offsetKey    the offset key to update
      * @param  offset    the offset value to update
+     * @param updateCallBack a callback to be executed after the updated has occurred (null if not available)
      * @throws Exception if unable to update the offset
      */
-    void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception;
+    void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception;
 }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 55147474ea2..78245e3816e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -36,7 +36,7 @@ public class TransientResumeStrategy implements ResumeStrategy {
 
     @Override
     public void setAdapter(ResumeAdapter adapter) {
-
+        // this is NO-OP
     }
 
     @Override
@@ -54,6 +54,16 @@ public class TransientResumeStrategy implements ResumeStrategy {
         // this is NO-OP
     }
 
+    @Override
+    public <T extends Resumable> void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception {
+        // this is NO-OP
+    }
+
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception {
+        // this is NO-OP
+    }
+
     @Override
     public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) {
         // this is NO-OP