You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2020/06/23 23:39:56 UTC

[servicecomb-pack] branch master updated (b476b8b -> fd07b17)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git.


    from b476b8b  Merge pull request #661 from apache/SCB-1928
     new c2d52e9  SCB-2004 In Omega, if CallbackContenxt does not contain callbackMethod, send TxCompensateAckFailedEvent to Alpha instead of throwing NPE
     new 8e0afb7  SCB-2004 Use message-driven compensation retry instead of recursive compensation
     new 41b27fa  SCB-2004 Fix await for Travis CI
     new fd07b17  SCB-2004 Rename preState to preComponsitedState

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../fsm/event/internal/ComponsitedCheckEvent.java  |  37 +++++
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 154 +++++++++++++--------
 .../pack/alpha/fsm/domain/UpdateTxEventDomain.java |   7 +
 .../pack/alpha/fsm/model/TxEntities.java           |   8 ++
 .../pack/alpha/server/fsm/GrpcOmegaCallback.java   |   2 +-
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  |  29 +++-
 .../alpha/server/fsm/OmegaEventSagaSimulator.java  |  52 +++++--
 .../pack/omega/transaction/CallbackContext.java    |  29 ++--
 8 files changed, 231 insertions(+), 87 deletions(-)


[servicecomb-pack] 01/04: SCB-2004 In Omega, if CallbackContenxt does not contain callbackMethod, send TxCompensateAckFailedEvent to Alpha instead of throwing NPE

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit c2d52e9c3cd2203e193bb5154d637fc6f7cb5203
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jun 22 19:35:54 2020 +0800

    SCB-2004 In Omega, if CallbackContenxt does not contain callbackMethod, send TxCompensateAckFailedEvent to Alpha instead of throwing NPE
---
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  | 29 +++++++++---
 .../alpha/server/fsm/OmegaEventSagaSimulator.java  | 52 +++++++++++++++-------
 .../pack/omega/transaction/CallbackContext.java    | 29 ++++++++----
 3 files changed, 80 insertions(+), 30 deletions(-)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index aede177..d010728 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -30,14 +30,14 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
 import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
-import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.common.AlphaMetaKeys;
+import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -480,7 +480,7 @@ public class AlphaIntegrationFsmTest {
     final String localTxId_3 = UUID.randomUUID().toString();
     final Map<String, OmegaCallback>[] omegaInstance = new Map[]{null};
     final String[] serviceName = new String[1];
-    omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+    omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEventsAndReverseRetries(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
       if(event.getType().equals(EventType.TxAbortedEvent.name())){
         //simulate omega disconnect
         serviceName[0] = event.getServiceName();
@@ -489,17 +489,32 @@ public class AlphaIntegrationFsmTest {
       }
       omegaEventSender.getBlockingStub().onTxEvent(event);
     });
-    //simulate omega connected
     await().atMost(5, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
       return sagaData !=null && sagaData.getTxEntities().size()==3;
     });
+
+    //simulate omega connected
     omegaEventSender.getOmegaCallbacks().put(serviceName[0], omegaInstance[0]);
-    waitAlphaCallCompensate(omegaEventSender, globalTxId, localTxId_1, localTxId_2);
-    await().atMost(2, SECONDS).until(() -> {
+
+    await().atMost(10, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
-      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      if(sagaData != null){
+        if(sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED){
+          return true;
+        }else{
+          sagaData.getTxEntities().forEachReverse((k,v) -> {
+            if(v.getState() == TxState.COMPENSATION_SENT || v.getState() == TxState.COMPENSATED_FAILED){
+              omegaEventSender.getBlockingStub().onTxEvent(omegaEventSender.getOmegaEventSagaSimulator().getTxCompensateAckSucceedEvent(v.getGlobalTxId(),v.getLocalTxId()));
+            }
+          });
+          return false;
+        }
+      } else {
+        return false;
+      }
     });
+
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
     assertEquals(sagaData.getTxEntities().size(),3);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 13eb3f1..454f6df 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -65,7 +65,7 @@ public class OmegaEventSagaSimulator {
     final int localTxId_1_ReverseTimeoutSecond = 2;
     List<GrpcTxEvent> sagaEvents = new ArrayList<>();
     sagaEvents.add(sagaStartedEvent(globalTxId));
-    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a", 0, localTxId_1_ReverseTimeoutSecond));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a", 0, localTxId_1_ReverseTimeoutSecond, 5));
     sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
     sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, NullPointerException.class.getName().getBytes(), "method b"));
@@ -87,6 +87,19 @@ public class OmegaEventSagaSimulator {
     return sagaEvents;
   }
 
+  public List<GrpcTxEvent> lastTxAbortedEventsAndReverseRetries(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b", 15, 0, 1000));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, NullPointerException.class.getName().getBytes(), "method c"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
   public List<GrpcTxEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
     List<GrpcTxEvent> sagaEvents = new ArrayList<>();
     sagaEvents.add(sagaStartedEvent(globalTxId));
@@ -198,80 +211,87 @@ public class OmegaEventSagaSimulator {
   private GrpcTxEvent sagaStartedEvent(String globalTxId) {
     return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent sagaStartedEvent(String globalTxId, int timeout) {
     return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
         null, new byte[0], "", timeout, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent sagaEndedEvent(String globalTxId) {
     return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent sagaAbortedEvent(String globalTxId) {
     return eventOf(EventType.SagaAbortedEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent sagaTimeoutEvent(String globalTxId) {
     return eventOf(EventType.SagaTimeoutEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent txStartedEvent(String globalTxId,
       String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
     return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
         parentTxId, payloads, compensationMethod, 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent txStartedEvent(String globalTxId,
-      String localTxId, String parentTxId, byte[] payloads, String compensationMethod, int reverseRetries, int reverseTimeout) {
+      String localTxId, String parentTxId, byte[] payloads, String compensationMethod, int reverseRetries, int reverseTimeout, int retryDelayInMilliseconds) {
     return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
         parentTxId, payloads, compensationMethod, 0, "",
-        0, 0, reverseRetries, reverseTimeout);
+        0, 0, reverseRetries, reverseTimeout, retryDelayInMilliseconds);
   }
 
   private GrpcTxEvent txEndedEvent(String globalTxId,
       String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
     return eventOf(EventType.TxEndedEvent, globalTxId, localTxId,
         parentTxId, payloads, compensationMethod, 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent txAbortedEvent(String globalTxId,
       String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
     return eventOf(EventType.TxAbortedEvent, globalTxId, localTxId,
         parentTxId, payloads, compensationMethod, 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   public GrpcTxEvent txCompensatedEvent(String globalTxId,
       String localTxId, String parentTxId) {
     return eventOf(EventType.TxCompensatedEvent, globalTxId, localTxId,
         parentTxId,  new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   public GrpcTxEvent txCompensateAckSucceedEvent(String globalTxId,
       String localTxId, String parentTxId) {
     return eventOf(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId,
         parentTxId, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   public GrpcTxEvent txCompensateAckTimeoutEvent(String globalTxId,
       String localTxId, String parentTxId) {
     return eventOf(EventType.CompensateAckTimeoutEvent, globalTxId, localTxId,
         parentTxId, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0,5);
+  }
+
+  public GrpcTxEvent txCompensateAckFailedEvent(String globalTxId,
+      String localTxId, String parentTxId) {
+    return eventOf(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId,
+        parentTxId, new byte[0], "", 0, "",
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent eventOf(EventType eventType,
@@ -285,7 +305,8 @@ public class OmegaEventSagaSimulator {
       int forwardRetries,
       int forwardTimeout,
       int reverseRetries,
-      int reverseTimeout) {
+      int reverseTimeout,
+      int retryDelayInMilliseconds) {
 
     return GrpcTxEvent.newBuilder()
         .setServiceName(serviceName)
@@ -302,6 +323,7 @@ public class OmegaEventSagaSimulator {
         .setRetryMethod(retryMethod)
         .setForwardRetries(forwardRetries)
         .setReverseRetries(reverseRetries)
+        .setRetryDelayInMilliseconds(retryDelayInMilliseconds)
         .setPayloads(ByteString.copyFrom(payloads))
         .build();
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
index 6a3bcb1..4402cd2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
@@ -44,19 +44,32 @@ public class CallbackContext {
   }
 
   public void apply(String globalTxId, String localTxId, String parentTxId, String callbackMethod, Object... payloads) {
-    CallbackContextInternal contextInternal = contexts.get(callbackMethod);
     String oldGlobalTxId = omegaContext.globalTxId();
     String oldLocalTxId = omegaContext.localTxId();
     try {
       omegaContext.setGlobalTxId(globalTxId);
       omegaContext.setLocalTxId(localTxId);
-      contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
-      if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
-        sender.send(
-            new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
-                parentTxId, callbackMethod));
+      if (contexts.containsKey(callbackMethod)) {
+        CallbackContextInternal contextInternal = contexts.get(callbackMethod);
+        contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
+        if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
+          sender.send(
+              new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+                  parentTxId, callbackMethod));
+        }
+        LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+      } else {
+        if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
+          String msg = "callback method " + callbackMethod
+              + " not found on CallbackContext, If it is starting, please try again later";
+          sender.send(
+              new TxCompensateAckFailedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+                  parentTxId, callbackMethod, new Exception(msg)));
+          LOG.error(msg);
+        }else{
+          throw new NullPointerException();
+        }
       }
-      LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
     } catch (IllegalAccessException | InvocationTargetException e) {
       if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
         sender.send(
@@ -64,7 +77,7 @@ public class CallbackContext {
                 parentTxId, callbackMethod, e));
       }
       LOG.error(
-          "Pre-checking for callback method " + contextInternal.callbackMethod.toString()
+          "Pre-checking for callback method " + callbackMethod
               + " was somehow skipped, did you forget to configure callback method checking on service startup?",
           e);
     } finally {


[servicecomb-pack] 03/04: SCB-2004 Fix await for Travis CI

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 41b27fa6a5baded07c6d437fa59da95dfb4902b5
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Tue Jun 23 12:04:48 2020 +0800

    SCB-2004 Fix await for Travis CI
---
 .../servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index d010728..872e23e 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -497,7 +497,7 @@ public class AlphaIntegrationFsmTest {
     //simulate omega connected
     omegaEventSender.getOmegaCallbacks().put(serviceName[0], omegaInstance[0]);
 
-    await().atMost(10, SECONDS).until(() -> {
+    await().atMost(15, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
       if(sagaData != null){
         if(sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED){


[servicecomb-pack] 02/04: SCB-2004 Use message-driven compensation retry instead of recursive compensation

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 8e0afb719be61701f358a31bd841a5a1e4c09da6
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jun 22 23:55:21 2020 +0800

    SCB-2004 Use message-driven compensation retry instead of recursive compensation
---
 .../fsm/event/internal/ComponsitedCheckEvent.java  |  37 +++++
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 154 +++++++++++++--------
 .../pack/alpha/fsm/domain/UpdateTxEventDomain.java |   7 +
 .../pack/alpha/fsm/model/TxEntities.java           |   8 ++
 .../pack/alpha/server/fsm/GrpcOmegaCallback.java   |   2 +-
 5 files changed, 151 insertions(+), 57 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
index 467952f..63ab314 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
@@ -17,10 +17,17 @@
 
 package org.apache.servicecomb.pack.alpha.core.fsm.event.internal;
 
+import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
 
 public class ComponsitedCheckEvent extends TxEvent {
 
+  private TxState preState;
+
+  public TxState getPreState() {
+    return preState;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -32,6 +39,36 @@ public class ComponsitedCheckEvent extends TxEvent {
     private Builder() {
       txComponsitedEvent = new ComponsitedCheckEvent();
     }
+    
+    public Builder serviceName(String serviceName) {
+      txComponsitedEvent.setServiceName(serviceName);
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      txComponsitedEvent.setInstanceId(instanceId);
+      return this;
+    }
+
+    public Builder parentTxId(String parentTxId) {
+      txComponsitedEvent.setParentTxId(parentTxId);
+      return this;
+    }
+
+    public Builder localTxId(String localTxId) {
+      txComponsitedEvent.setLocalTxId(localTxId);
+      return this;
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      txComponsitedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public Builder preState(TxState txState) {
+      txComponsitedEvent.preState = txState;
+      return this;
+    }
 
     public ComponsitedCheckEvent build() {
       return txComponsitedEvent;
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 017ee0e..9033683 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -32,23 +32,23 @@ import java.util.concurrent.TimeoutException;
 import org.apache.servicecomb.pack.alpha.core.AlphaException;
 import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
@@ -239,27 +239,50 @@ public class SagaActor extends
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
-                self().tell(ComponsitedCheckEvent.builder().build(), self());
+                self().tell(ComponsitedCheckEvent.builder()
+                    .serviceName(event.getServiceName())
+                    .instanceId(event.getInstanceId())
+                    .globalTxId(event.getGlobalTxId())
+                    .localTxId(event.getLocalTxId())
+                    .parentTxId(event.getParentTxId())
+                    .preState(TxState.COMPENSATED_SUCCEED)
+                    .build(), self());
               }));
             }
         ).event(TxCompensateAckFailedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
-                self().tell(ComponsitedCheckEvent.builder().build(), self());
+                self().tell(ComponsitedCheckEvent.builder()
+                    .serviceName(event.getServiceName())
+                    .instanceId(event.getInstanceId())
+                    .globalTxId(event.getGlobalTxId())
+                    .localTxId(event.getLocalTxId())
+                    .parentTxId(event.getParentTxId())
+                    .preState(TxState.COMPENSATED_FAILED)
+                    .build(), self());
               }));
             }
         ).event(CompensateAckTimeoutEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
-                self().tell(ComponsitedCheckEvent.builder().build(), self());
+                self().tell(ComponsitedCheckEvent.builder()
+                    .serviceName(event.getServiceName())
+                    .instanceId(event.getInstanceId())
+                    .globalTxId(event.getGlobalTxId())
+                    .localTxId(event.getLocalTxId())
+                    .parentTxId(event.getParentTxId())
+                    .preState(TxState.COMPENSATED_FAILED)
+                    .build(), self());
               }));
             }
         ).event(ComponsitedCheckEvent.class, SagaData.class,
             (event, data) -> {
-              if (data.getTxEntities().hasCompensationSentTx() || !data.isTerminated()) {
-                return stay();
+              if (data.getTxEntities().hasCompensationSentTx() ||
+                  data.getTxEntities().hasCompensationFailedTx()) {
+                UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
+                return stay().applying(domainEvent);
               } else {
                 if(data.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
                   SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED);
@@ -272,7 +295,6 @@ public class SagaActor extends
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
             (event, data) -> {
-              data.setTerminated(true);
               if (data.getTxEntities().hasCommittedTx()) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
                 return stay()
@@ -299,7 +321,7 @@ public class SagaActor extends
               return stay().applying(domainEvent).andThen(exec(_data -> {
                 TxEntity txEntity = _data.getTxEntities().get(event.getLocalTxId());
                 // call compensate
-                compensation(txEntity, _data);
+                compensation(domainEvent, txEntity, _data);
               }));
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
@@ -340,7 +362,7 @@ public class SagaActor extends
     whenUnhandled(
         matchAnyEvent((event, data) -> {
           if (event instanceof BaseEvent){
-            LOG.error("Unhandled event {}", event);
+            LOG.debug("Unhandled event {}", event);
           }
           return stay();
         })
@@ -443,6 +465,7 @@ public class SagaActor extends
 
   @Override
   public SagaData applyEvent(DomainEvent event, SagaData data) {
+    LOG.debug("apply domain event {}", event.getEvent());
     try{
       if (this.recoveryRunning()) {
         LOG.info("recovery {}",event.getEvent());
@@ -493,40 +516,58 @@ public class SagaActor extends
           data.getTxEntities().forEachReverse((k, v) -> {
             if (v.getState() == TxState.COMMITTED) {
               // call compensate
-              compensation(v, data);
+              if (!compensation(domainEvent, v, data)) {
+                return;
+              }
             }
           });
         } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
-          // decrement the compensation running counter by one
           data.getCompensationRunningCounter().decrementAndGet();
-          txEntity.setState(domainEvent.getState());
-          LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
+          txEntity.setState(TxState.COMPENSATED_SUCCEED);
+          LOG.info("compensate is succeed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId());
         } else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) {
           data.getCompensationRunningCounter().decrementAndGet();
-          txEntity.setState(domainEvent.getState());
+          txEntity.setState(TxState.COMPENSATED_FAILED);
           txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
-          LOG.info("compensation is failed {}", txEntity.getLocalTxId());
+          if (txEntity.getReverseRetries() > 0 &&
+              txEntity.getRetriesCounter().get() < txEntity.getReverseRetries()) {
+            data.getTxEntities().forEachReverse((k, v) -> {
+              if (v.getState() == TxState.COMMITTED || v.getState() == TxState.COMPENSATED_FAILED) {
+                // call compensate
+                if (!compensation(domainEvent, v, data)){
+                  return;
+                }
+              }
+            });
+          } else {
+            data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
+            self().tell(ComponsitedCheckEvent.builder()
+                .serviceName(txEntity.getServiceName())
+                .instanceId(txEntity.getInstanceId())
+                .globalTxId(txEntity.getGlobalTxId())
+                .localTxId(txEntity.getLocalTxId())
+                .preState(TxState.COMPENSATED_FAILED)
+                .parentTxId(txEntity.getParentTxId()).build(), self());
+          }
         }
       } else if (event instanceof SagaEndedDomain) {
         SagaEndedDomain domainEvent = (SagaEndedDomain) event;
         if (domainEvent.getState() == SagaActorState.FAILED) {
-          data.setTerminated(true);
           data.getTxEntities().forEachReverse((k, v) -> {
             if (v.getState() == TxState.COMMITTED) {
               // call compensate
-              compensation(v, data);
+              if (!compensation(domainEvent, v, data)){
+                return;
+              }
             }
           });
         } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
           data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
-          data.setTerminated(true);
           data.setSuspendedType(domainEvent.getSuspendedType());
         } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
           data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
-          data.setTerminated(true);
         } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
           data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
-          data.setTerminated(true);
         }
       }
     }catch (Exception ex){
@@ -558,22 +599,29 @@ public class SagaActor extends
   }
 
   //call omega compensate method
-  private void compensation(TxEntity txEntity, SagaData data) {
+  private boolean compensation(DomainEvent event, TxEntity txEntity, SagaData data) {
     // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
     txEntity.setState(TxState.COMPENSATION_SENT);
     try {
+      LOG.info("compensate {} {} [{}] {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getGlobalTxId(), txEntity.getLocalTxId());
       SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
-      LOG.info("compensate {} {} {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getLocalTxId());
-    } catch (AlphaException ex) {
-      LOG.error(ex.getMessage(), ex);
-      try {
-        Thread.sleep(txEntity.getRetryDelayInMilliseconds());
-      } catch (InterruptedException e) {
-        LOG.error(e.getMessage(), e);
-      }
-      compensation(txEntity, data);
     } catch (Exception ex) {
+      LOG.error("compensate failed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId(), ex);
+      if (txEntity.getReverseRetries() > 0 &&
+          txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
+        LOG.info("Retry compensate {}/{} [{}] {} after {} ms",
+            txEntity.getRetriesCounter().get() + 1,
+            txEntity.getReverseRetries(),
+            txEntity.getGlobalTxId(),
+            txEntity.getLocalTxId(),
+            txEntity.getRetryDelayInMilliseconds());
+        try {
+          Thread.sleep(txEntity.getRetryDelayInMilliseconds());
+        } catch (InterruptedException e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
       if (ex instanceof TimeoutException) {
         StringWriter writer = new StringWriter();
         ex.printStackTrace(new PrintWriter(writer));
@@ -581,7 +629,7 @@ public class SagaActor extends
         if (stackTrace.length() > Environment.getInstance().getPayloadsMaxLength()) {
           stackTrace = stackTrace.substring(0, Environment.getInstance().getPayloadsMaxLength());
         }
-        CompensateAckTimeoutEvent event = CompensateAckTimeoutEvent.builder()
+        CompensateAckTimeoutEvent compensateAckTimeoutEvent = CompensateAckTimeoutEvent.builder()
             .createTime(new Date(System.currentTimeMillis()))
             .globalTxId(txEntity.getGlobalTxId())
             .parentTxId(txEntity.getParentTxId())
@@ -590,26 +638,20 @@ public class SagaActor extends
             .instanceId(txEntity.getInstanceId())
             .payloads(stackTrace.getBytes())
             .build();
-        self().tell(event, self());
+        self().tell(compensateAckTimeoutEvent, self());
       }
-      LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
-      if (txEntity.getReverseRetries() > 0) {
-        // which means the retry number
-        if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
-          LOG.info("Retry compensate {}/{} after {} ms", txEntity.getRetriesCounter().get() + 1, txEntity.getReverseRetries(),
-              txEntity.getRetryDelayInMilliseconds());
-          try {
-            Thread.sleep(txEntity.getRetryDelayInMilliseconds());
-          } catch (InterruptedException e) {
-            LOG.error(e.getMessage(), e);
-          }
-          compensation(txEntity, data);
-        } else {
-          data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
-        }
-      } else {
-        data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
+      if (ex instanceof AlphaException) {
+        self().tell(TxCompensateAckFailedEvent.builder()
+            .serviceName(txEntity.getServiceName())
+            .instanceId(txEntity.getInstanceId())
+            .globalTxId(txEntity.getGlobalTxId())
+            .localTxId(txEntity.getLocalTxId())
+            .parentTxId(txEntity.getParentTxId())
+            .payloads(ex.getMessage().getBytes())
+            .build(), self());
       }
+      return false;
     }
+    return true;
   }
 }
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 97d3b02..6cd9d00 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -24,6 +24,7 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEv
 import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
 
 public class UpdateTxEventDomain implements DomainEvent {
   private String localTxId;
@@ -63,6 +64,12 @@ public class UpdateTxEventDomain implements DomainEvent {
     this.state = TxState.COMPENSATED_FAILED;
   }
 
+  public UpdateTxEventDomain(ComponsitedCheckEvent event) {
+    this.event = event;
+    this.localTxId = event.getLocalTxId();
+    this.state = event.getPreState();
+  }
+
   public String getLocalTxId() {
     return localTxId;
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
index a4531d5..8ba72fd 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
@@ -67,4 +67,12 @@ public class TxEntities {
         .filter(map -> map.getValue().getState() == TxState.COMPENSATION_SENT)
         .count() > 0;
   }
+
+  public boolean hasCompensationFailedTx() {
+    return entities.entrySet().stream()
+        .filter(map -> map.getValue().getState() == TxState.COMPENSATED_FAILED
+            && map.getValue().getReverseRetries() > 0
+            && map.getValue().getReverseRetries() > map.getValue().getRetriesCounter().get())
+        .count() > 0;
+  }
 }
\ No newline at end of file
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
index 027046d..dcc7eea 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
@@ -55,7 +55,7 @@ class GrpcOmegaCallback implements OmegaCallback {
       if (compensateAckCountDownLatch.getType() == CompensateAckType.Disconnected) {
         throw new CompensateConnectException("Omega connect exception");
       }else{
-        LOG.info("compensate ack "+ compensateAckCountDownLatch.getType().name());
+        LOG.debug("compensate ack "+ compensateAckCountDownLatch.getType().name());
         if(compensateAckCountDownLatch.getType() == CompensateAckType.Failed){
           throw new CompensateAckFailedException("An exception is thrown inside the compensation method");
         }


[servicecomb-pack] 04/04: SCB-2004 Rename preState to preComponsitedState

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit fd07b173b934947516096aaa2be7a441368c7b3f
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jun 24 02:53:32 2020 +0800

    SCB-2004 Rename preState to preComponsitedState
---
 .../pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java | 8 ++++----
 .../servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java    | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
index 63ab314..ba02bea 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
@@ -22,10 +22,10 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
 
 public class ComponsitedCheckEvent extends TxEvent {
 
-  private TxState preState;
+  private TxState preComponsitedState;
 
-  public TxState getPreState() {
-    return preState;
+  public TxState getPreComponsitedState() {
+    return preComponsitedState;
   }
 
   public static Builder builder() {
@@ -66,7 +66,7 @@ public class ComponsitedCheckEvent extends TxEvent {
     }
 
     public Builder preState(TxState txState) {
-      txComponsitedEvent.preState = txState;
+      txComponsitedEvent.preComponsitedState = txState;
       return this;
     }
 
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 6cd9d00..af7195a 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -67,7 +67,7 @@ public class UpdateTxEventDomain implements DomainEvent {
   public UpdateTxEventDomain(ComponsitedCheckEvent event) {
     this.event = event;
     this.localTxId = event.getLocalTxId();
-    this.state = event.getPreState();
+    this.state = event.getPreComponsitedState();
   }
 
   public String getLocalTxId() {