You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/17 00:06:22 UTC

[servicecomb-pack] 01/05: SCB-1442 add partition by globalTxId

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 98ec9c14acee7fdb589f951dc63023b67c622d4f
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:43:03 2019 +0800

    SCB-1442 add partition by globalTxId
---
 .../pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java       | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
index 068f782..b6a9051 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
 
 import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
@@ -43,7 +44,12 @@ public class KafkaMessagePublisher implements MessagePublisher {
         }
 
         try {
-            kafkaTemplate.send(topic, data).get();
+            if(data instanceof BaseEvent) {
+                BaseEvent event = (BaseEvent) data;
+                kafkaTemplate.send(topic, event.getGlobalTxId(), event).get();
+            }else{
+                kafkaTemplate.send(topic, data).get();
+            }
         } catch (InterruptedException | ExecutionException e) {
             logger.error("publish Exception = [{}]", e.getMessage(), e);
             throw new RuntimeException(e);