You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/02/07 07:20:47 UTC
[incubator-eventmesh] branch master updated: add callback for kafka producer
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 6b6bd15f1 add callback for kafka producer
new e62b600e1 Merge pull request #3041 from hgaol/3040
6b6bd15f1 is described below
commit 6b6bd15f1e5a3bef6ae15edd746d0013898fc4d6
Author: hgaol <dh...@hotmail.com>
AuthorDate: Fri Feb 3 23:09:15 2023 +0800
add callback for kafka producer
---
.../connector/kafka/producer/ProducerImpl.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/producer/ProducerImpl.java
index a37a30352..2b0e7fb15 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/producer/ProducerImpl.java
@@ -19,7 +19,9 @@ package org.apache.eventmesh.connector.kafka.producer;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
+import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -110,7 +112,20 @@ public class ProducerImpl {
public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
try {
- this.producer.send(new ProducerRecord<>(cloudEvent.getSubject(), cloudEvent));
+ this.producer.send(new ProducerRecord<>(cloudEvent.getSubject(), cloudEvent), (metadata, exception) -> {
+ if (exception != null) {
+ ConnectorRuntimeException onsEx = new ConnectorRuntimeException(exception.getMessage(), exception);
+ OnExceptionContext context = new OnExceptionContext();
+ context.setTopic(cloudEvent.getSubject());
+ context.setException(onsEx);
+ sendCallback.onException(context);
+ } else {
+ SendResult sendResult = new SendResult();
+ sendResult.setTopic(cloudEvent.getSubject());
+ sendResult.setMessageId(cloudEvent.getId());
+ sendCallback.onSuccess(sendResult);
+ }
+ });
} catch (Exception e) {
log.error(String.format("Send message oneway Exception, %s", cloudEvent), e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org