You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/14 11:41:14 UTC

[servicecomb-pack] 18/21: SCB-1418 add Acknowledgment to KafkaMessageListener

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 208e294bcfe90107f8dbb10dbefb2be2bce5e668
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:57:37 2019 +0800

    SCB-1418 add Acknowledgment to KafkaMessageListener
---
 .../pack/alpha/fsm/channel/kafka/KafkaMessageListener.java          | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
index fe6d535..8d1f880 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
@@ -21,6 +21,7 @@ import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
 
 public class KafkaMessageListener {
 
@@ -33,17 +34,16 @@ public class KafkaMessageListener {
     }
 
     @KafkaListener(topics = "${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}")
-    public void listener(BaseEvent baseEvent){
+    public void listener(BaseEvent baseEvent, Acknowledgment acknowledgment){
         if(logger.isDebugEnabled()){
             logger.debug("listener event = [{}]", baseEvent);
         }
 
         try {
             actorEventSink.send(baseEvent);
+            acknowledgment.acknowledge();
         }catch (Exception e){
             logger.error("subscriber Exception = [{}]", e.getMessage(), e);
         }
-
-
     }
 }
\ No newline at end of file