You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/02/07 07:20:47 UTC

[incubator-eventmesh] branch master updated: add callback for kafka producer

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b6bd15f1 add callback for kafka producer
     new e62b600e1 Merge pull request #3041 from hgaol/3040
6b6bd15f1 is described below

commit 6b6bd15f1e5a3bef6ae15edd746d0013898fc4d6
Author: hgaol <dh...@hotmail.com>
AuthorDate: Fri Feb 3 23:09:15 2023 +0800

    add callback for kafka producer
---
 .../connector/kafka/producer/ProducerImpl.java          | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/producer/ProducerImpl.java
index a37a30352..2b0e7fb15 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/producer/ProducerImpl.java
@@ -19,7 +19,9 @@ package org.apache.eventmesh.connector.kafka.producer;
 
 import org.apache.eventmesh.api.RequestReplyCallback;
 import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.api.SendResult;
 import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
+import org.apache.eventmesh.api.exception.OnExceptionContext;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -110,7 +112,20 @@ public class ProducerImpl {
 
     public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
         try {
-            this.producer.send(new ProducerRecord<>(cloudEvent.getSubject(), cloudEvent));
+            this.producer.send(new ProducerRecord<>(cloudEvent.getSubject(), cloudEvent), (metadata, exception) -> {
+                if (exception != null) {
+                    ConnectorRuntimeException onsEx = new ConnectorRuntimeException(exception.getMessage(), exception);
+                    OnExceptionContext context = new OnExceptionContext();
+                    context.setTopic(cloudEvent.getSubject());
+                    context.setException(onsEx);
+                    sendCallback.onException(context);
+                } else {
+                    SendResult sendResult = new SendResult();
+                    sendResult.setTopic(cloudEvent.getSubject());
+                    sendResult.setMessageId(cloudEvent.getId());
+                    sendCallback.onSuccess(sendResult);
+                }
+            });
         } catch (Exception e) {
             log.error(String.format("Send message oneway Exception, %s", cloudEvent), e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org