You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/13 08:53:24 UTC
[camel] branch main updated: CAMEL-18148: Added support for running callbacks on offset updates
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new b4887d5c821 CAMEL-18148: Added support for running callbacks on offset updates
b4887d5c821 is described below
commit b4887d5c821f03dcecbf9c429678235fd86f9e04
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Oct 12 10:27:43 2022 +0200
CAMEL-18148: Added support for running callbacks on offset updates
---
.../kafka/SingleNodeKafkaResumeStrategy.java | 46 ++++++++++------------
.../org/apache/camel/resume/ResumeStrategy.java | 36 ++++++++++++++++-
.../processor/resume/TransientResumeStrategy.java | 12 +++++-
3 files changed, 65 insertions(+), 29 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 995d4103740..fe7459f9d09 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -24,8 +24,6 @@ import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -66,8 +64,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
private Producer<byte[], byte[]> producer;
private Duration pollDuration = Duration.ofSeconds(1);
- private final Queue<RecordError> producerErrors = new ConcurrentLinkedQueue<>();
-
private boolean subscribed;
private ResumeAdapter adapter;
private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
@@ -102,15 +98,19 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
* @param message the message to send
* @throws ExecutionException
* @throws InterruptedException
- * @see SingleNodeKafkaResumeStrategy#getProducerErrors()
+ *
*/
- protected void produce(byte[] key, byte[] message) throws ExecutionException, InterruptedException {
+ protected void produce(byte[] key, byte[] message, UpdateCallBack updateCallBack)
+ throws ExecutionException, InterruptedException {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(resumeStrategyConfiguration.getTopic(), key, message);
producer.send(record, (recordMetadata, e) -> {
- if (e != null) {
- LOG.error("Failed to send message {}", e.getMessage(), e);
- producerErrors.add(new RecordError(recordMetadata, e));
+ LOG.error("Failed to send message {}", e.getMessage(), e);
+
+ if (updateCallBack != null) {
+ updateCallBack.onUpdate(e);
+ } else {
+ LOG.warn("The is no callback installed for handling errors when producing records to Kafka");
}
});
}
@@ -125,6 +125,11 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
@Override
public <T extends Resumable> void updateLastOffset(T offset) throws Exception {
+ updateLastOffset(offset, null);
+ }
+
+ @Override
+ public <T extends Resumable> void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception {
OffsetKey<?> key = offset.getOffsetKey();
Offset<?> offsetValue = offset.getLastOffset();
@@ -137,12 +142,17 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
@Override
public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception {
+ updateLastOffset(offsetKey, offset, null);
+ }
+
+ @Override
+ public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception {
ByteBuffer keyBuffer = offsetKey.serialize();
ByteBuffer valueBuffer = offsetKey.serialize();
try {
lock.lock();
- produce(keyBuffer.array(), valueBuffer.array());
+ produce(keyBuffer.array(), valueBuffer.array(), updateCallBack);
} finally {
lock.unlock();
}
@@ -337,15 +347,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
this.adapter = adapter;
}
- /**
- * Gets the set record of sent items
- *
- * @return A collection with all the record errors
- */
- protected Collection<RecordError> getProducerErrors() {
- return Collections.unmodifiableCollection(producerErrors);
- }
-
@Override
public void build() {
// NO-OP
@@ -419,13 +420,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
return producer;
}
- /**
- * Clear the producer errors
- */
- public void resetProducerErrors() {
- producerErrors.clear();
- }
-
protected KafkaResumeStrategyConfiguration getResumeStrategyConfiguration() {
return resumeStrategyConfiguration;
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 145fdc7145d..f565961f028 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -24,6 +24,18 @@ import org.apache.camel.Service;
* processing records.
*/
public interface ResumeStrategy extends Service {
+ /**
+ * A callback that can be executed after the last offset is updated
+ */
+ @FunctionalInterface
+ interface UpdateCallBack {
+ /**
+ * The method to execute after the last offset is updated
+ * @param throwable an instance of a Throwable if an exception was thrown during the update process
+ */
+ void onUpdate(Throwable throwable);
+ }
+
String DEFAULT_NAME = "resumeStrategy";
/**
@@ -66,12 +78,32 @@ public interface ResumeStrategy extends Service {
*/
<T extends Resumable> void updateLastOffset(T offset) throws Exception;
+
+ /**
+ * Updates the last processed offset
+ *
+ * @param offset the offset to update
+ * @param updateCallBack a callback to be executed after the updated has occurred (null if not available)
+ * @throws Exception if unable to update the offset
+ */
+ <T extends Resumable> void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception;
+
+ /**
+ * Updates the last processed offset
+ *
+ * @param offsetKey the offset key to update
+ * @param offsetValue the offset value to update
+ * @throws Exception if unable to update the offset
+ */
+ void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offsetValue) throws Exception;
+
/**
* Updates the last processed offset
*
- * @param offset the offset key to update
+ * @param offsetKey the offset key to update
* @param offset the offset value to update
+ * @param updateCallBack a callback to be executed after the updated has occurred (null if not available)
* @throws Exception if unable to update the offset
*/
- void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception;
+ void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception;
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 55147474ea2..78245e3816e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -36,7 +36,7 @@ public class TransientResumeStrategy implements ResumeStrategy {
@Override
public void setAdapter(ResumeAdapter adapter) {
-
+ // this is NO-OP
}
@Override
@@ -54,6 +54,16 @@ public class TransientResumeStrategy implements ResumeStrategy {
// this is NO-OP
}
+ @Override
+ public <T extends Resumable> void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception {
+ // this is NO-OP
+ }
+
+ @Override
+ public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception {
+ // this is NO-OP
+ }
+
@Override
public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) {
// this is NO-OP