You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/12/10 02:33:36 UTC

[servicecomb-pack] 03/13: SCB-1627 CallbackContext send the compensation result event to Alpha

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit ca52f12cff38375ec18b0fd573ea02472bce3085
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Sat Nov 30 18:32:18 2019 +0800

    SCB-1627 CallbackContext send the compensation result event to Alpha
---
 .../apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java   | 8 ++++----
 .../servicecomb/pack/omega/transaction/spring/MessageConfig.java  | 8 ++++----
 .../servicecomb/pack/omega/transaction/CallbackContext.java       | 6 +++++-
 .../main/java/org/apache/servicecomb/pack/common/EventType.java   | 4 +++-
 4 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
index e1c2ad5..1fd3648 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
@@ -72,13 +72,13 @@ class OmegaSpringConfig {
   }
 
   @Bean(name = {"compensationContext"})
-  CallbackContext compensationContext(OmegaContext omegaContext) {
-    return new CallbackContext(omegaContext);
+  CallbackContext compensationContext(OmegaContext omegaContext, SagaMessageSender sender) {
+    return new CallbackContext(omegaContext, sender);
   }
 
   @Bean(name = {"coordinateContext"})
-  CallbackContext coordinateContext(OmegaContext omegaContext) {
-    return new CallbackContext(omegaContext);
+  CallbackContext coordinateContext(OmegaContext omegaContext, SagaMessageSender sender) {
+    return new CallbackContext(omegaContext, sender);
   }
 
   @Bean
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/MessageConfig.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/MessageConfig.java
index 6fd2d91..240b441 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/MessageConfig.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/MessageConfig.java
@@ -44,13 +44,13 @@ public class MessageConfig {
   }
 
   @Bean(name = "compensationContext")
-  CallbackContext recoveryCompensationContext(OmegaContext omegaContext) {
-    return new CallbackContext(omegaContext);
+  CallbackContext recoveryCompensationContext(OmegaContext omegaContext, SagaMessageSender sender) {
+    return new CallbackContext(omegaContext, sender);
   }
 
   @Bean(name = "coordinateContext")
-  CallbackContext coordinateContext(OmegaContext omegaContext) {
-    return new CallbackContext(omegaContext);
+  CallbackContext coordinateContext(OmegaContext omegaContext, SagaMessageSender sender) {
+    return new CallbackContext(omegaContext, sender);
   }
 
   @Bean
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
index c483480..c069ac9 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
@@ -31,9 +31,11 @@ public class CallbackContext {
 
   private final Map<String, CallbackContextInternal> contexts = new ConcurrentHashMap<>();
   private final OmegaContext omegaContext;
+  private final SagaMessageSender sender;
 
-  public CallbackContext(OmegaContext omegaContext) {
+  public CallbackContext(OmegaContext omegaContext, SagaMessageSender sender) {
     this.omegaContext = omegaContext;
+    this.sender = sender;
   }
 
   public void addCallbackContext(String key, Method compensationMethod, Object target) {
@@ -50,7 +52,9 @@ public class CallbackContext {
       omegaContext.setLocalTxId(localTxId);
       contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
       LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+      sender.send(new TxCompensateAckSucceedEvent(omegaContext.globalTxId(),omegaContext.localTxId(),omegaContext.globalTxId()));
     } catch (IllegalAccessException | InvocationTargetException e) {
+      sender.send(new TxCompensateAckFailedEvent(omegaContext.globalTxId(),omegaContext.localTxId(),omegaContext.globalTxId()));
       LOG.error(
           "Pre-checking for callback method " + contextInternal.callbackMethod.toString()
               + " was somehow skipped, did you forget to configure callback method checking on service startup?",
diff --git a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
index e8c88a8..e585d0c 100644
--- a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
+++ b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
@@ -26,5 +26,7 @@ public enum EventType {
   SagaEndedEvent,
   SagaAbortedEvent,
   SagaTimeoutEvent,
-  TxCompensateEvent
+  TxCompensateEvent,
+  TxCompensateAckFailedEvent,
+  TxCompensateAckSucceedEvent
 }