You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eventmesh.apache.org by GitBox <gi...@apache.org> on 2022/09/16 23:46:54 UTC

[GitHub] [incubator-eventmesh] githublaohu commented on a diff in pull request #1264: [ISSUE #1262] Kafka Support for manual acknowledgement

githublaohu commented on code in PR #1264:
URL: https://github.com/apache/incubator-eventmesh/pull/1264#discussion_r973506693


##########
eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/consumer/KafkaConsumerRunner.java:
##########
@@ -37,22 +39,28 @@ public class KafkaConsumerRunner implements Runnable {
     private final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunner.class);
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer<String, CloudEvent> consumer;
+    private Map<CloudEvent, Long> cloudEventToOffset;
     private EventListener listener;
     private AtomicInteger offset;
 
     public KafkaConsumerRunner(KafkaConsumer<String, CloudEvent> kafkaConsumer) {
         this.consumer = kafkaConsumer;
+        cloudEventToOffset = new HashMap<>();
     }
 
-    public void setListener(EventListener listener) {
+    public synchronized void setListener(EventListener listener) {
         this.listener = listener;
     }
 
+    public long getOffset(CloudEvent cloudEvent) {
+        return cloudEventToOffset.getOrDefault(cloudEvent, 0L);
+    }
+
     @Override
     public void run() {

Review Comment:
   如果这个线程是一直执行,请保证异常情况下不退出线程



##########
eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/consumer/KafkaConsumerRunner.java:
##########
@@ -37,22 +39,28 @@ public class KafkaConsumerRunner implements Runnable {
     private final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunner.class);
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer<String, CloudEvent> consumer;
+    private Map<CloudEvent, Long> cloudEventToOffset;
     private EventListener listener;
     private AtomicInteger offset;
 
     public KafkaConsumerRunner(KafkaConsumer<String, CloudEvent> kafkaConsumer) {
         this.consumer = kafkaConsumer;
+        cloudEventToOffset = new HashMap<>();
     }
 
-    public void setListener(EventListener listener) {
+    public synchronized void setListener(EventListener listener) {
         this.listener = listener;
     }
 
+    public long getOffset(CloudEvent cloudEvent) {
+        return cloudEventToOffset.getOrDefault(cloudEvent, 0L);
+    }
+
     @Override
     public void run() {
-        try {
+        try { // TODO: change it to manual ack
             while (!closed.get()) {

Review Comment:
   1. while里面加入一个try catch,catch里面打印日志就了
   2.  在 forEach里面也加入一个try cath,一个消息报错。整个list就不执行了。这样不行吧



##########
eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/consumer/KafkaConsumerRunner.java:
##########
@@ -64,29 +72,33 @@ public void commit(EventMeshAction action) {
                                 case CommitMessage:
                                     // update offset
                                     logger.info("message commit, topic: {}, current offset:{}", topicName,
-                                        offset.get());
+                                        rec.offset());
                                     break;
                                 case ReconsumeLater:
                                     // don't update offset
                                     break;
                                 case ManualAck:
                                     // update offset
-                                    offset.incrementAndGet();
                                     logger
-                                        .info("message ack, topic: {}, current offset:{}", topicName, offset.get());
+                                        .info("message ack, topic: {}, current offset:{}", topicName, rec.offset());
                                     break;
                                 default:
                             }
                         }
                     };
+                    cloudEventToOffset.put(cloudEvent, rec.offset());
                     if (listener != null) {
                         listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
                     }
                 });
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
-            if (!closed.get()) throw e;
+            if (!closed.get()) {

Review Comment:
   感觉没有意义,closed会在while识别。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org