You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2020/06/23 23:39:57 UTC

[servicecomb-pack] 01/04: SCB-2004 In Omega, if CallbackContenxt does not contain callbackMethod, send TxCompensateAckFailedEvent to Alpha instead of throwing NPE

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit c2d52e9c3cd2203e193bb5154d637fc6f7cb5203
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jun 22 19:35:54 2020 +0800

    SCB-2004 In Omega, if CallbackContenxt does not contain callbackMethod, send TxCompensateAckFailedEvent to Alpha instead of throwing NPE
---
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  | 29 +++++++++---
 .../alpha/server/fsm/OmegaEventSagaSimulator.java  | 52 +++++++++++++++-------
 .../pack/omega/transaction/CallbackContext.java    | 29 ++++++++----
 3 files changed, 80 insertions(+), 30 deletions(-)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index aede177..d010728 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -30,14 +30,14 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
 import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
-import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.common.AlphaMetaKeys;
+import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -480,7 +480,7 @@ public class AlphaIntegrationFsmTest {
     final String localTxId_3 = UUID.randomUUID().toString();
     final Map<String, OmegaCallback>[] omegaInstance = new Map[]{null};
     final String[] serviceName = new String[1];
-    omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+    omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEventsAndReverseRetries(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
       if(event.getType().equals(EventType.TxAbortedEvent.name())){
         //simulate omega disconnect
         serviceName[0] = event.getServiceName();
@@ -489,17 +489,32 @@ public class AlphaIntegrationFsmTest {
       }
       omegaEventSender.getBlockingStub().onTxEvent(event);
     });
-    //simulate omega connected
     await().atMost(5, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
       return sagaData !=null && sagaData.getTxEntities().size()==3;
     });
+
+    //simulate omega connected
     omegaEventSender.getOmegaCallbacks().put(serviceName[0], omegaInstance[0]);
-    waitAlphaCallCompensate(omegaEventSender, globalTxId, localTxId_1, localTxId_2);
-    await().atMost(2, SECONDS).until(() -> {
+
+    await().atMost(10, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
-      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      if(sagaData != null){
+        if(sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED){
+          return true;
+        }else{
+          sagaData.getTxEntities().forEachReverse((k,v) -> {
+            if(v.getState() == TxState.COMPENSATION_SENT || v.getState() == TxState.COMPENSATED_FAILED){
+              omegaEventSender.getBlockingStub().onTxEvent(omegaEventSender.getOmegaEventSagaSimulator().getTxCompensateAckSucceedEvent(v.getGlobalTxId(),v.getLocalTxId()));
+            }
+          });
+          return false;
+        }
+      } else {
+        return false;
+      }
     });
+
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
     assertEquals(sagaData.getTxEntities().size(),3);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 13eb3f1..454f6df 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -65,7 +65,7 @@ public class OmegaEventSagaSimulator {
     final int localTxId_1_ReverseTimeoutSecond = 2;
     List<GrpcTxEvent> sagaEvents = new ArrayList<>();
     sagaEvents.add(sagaStartedEvent(globalTxId));
-    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a", 0, localTxId_1_ReverseTimeoutSecond));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a", 0, localTxId_1_ReverseTimeoutSecond, 5));
     sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
     sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
     sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, NullPointerException.class.getName().getBytes(), "method b"));
@@ -87,6 +87,19 @@ public class OmegaEventSagaSimulator {
     return sagaEvents;
   }
 
+  public List<GrpcTxEvent> lastTxAbortedEventsAndReverseRetries(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b", 15, 0, 1000));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, NullPointerException.class.getName().getBytes(), "method c"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
   public List<GrpcTxEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
     List<GrpcTxEvent> sagaEvents = new ArrayList<>();
     sagaEvents.add(sagaStartedEvent(globalTxId));
@@ -198,80 +211,87 @@ public class OmegaEventSagaSimulator {
   private GrpcTxEvent sagaStartedEvent(String globalTxId) {
     return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent sagaStartedEvent(String globalTxId, int timeout) {
     return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
         null, new byte[0], "", timeout, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent sagaEndedEvent(String globalTxId) {
     return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent sagaAbortedEvent(String globalTxId) {
     return eventOf(EventType.SagaAbortedEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent sagaTimeoutEvent(String globalTxId) {
     return eventOf(EventType.SagaTimeoutEvent, globalTxId, globalTxId,
         null, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent txStartedEvent(String globalTxId,
       String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
     return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
         parentTxId, payloads, compensationMethod, 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent txStartedEvent(String globalTxId,
-      String localTxId, String parentTxId, byte[] payloads, String compensationMethod, int reverseRetries, int reverseTimeout) {
+      String localTxId, String parentTxId, byte[] payloads, String compensationMethod, int reverseRetries, int reverseTimeout, int retryDelayInMilliseconds) {
     return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
         parentTxId, payloads, compensationMethod, 0, "",
-        0, 0, reverseRetries, reverseTimeout);
+        0, 0, reverseRetries, reverseTimeout, retryDelayInMilliseconds);
   }
 
   private GrpcTxEvent txEndedEvent(String globalTxId,
       String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
     return eventOf(EventType.TxEndedEvent, globalTxId, localTxId,
         parentTxId, payloads, compensationMethod, 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent txAbortedEvent(String globalTxId,
       String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
     return eventOf(EventType.TxAbortedEvent, globalTxId, localTxId,
         parentTxId, payloads, compensationMethod, 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   public GrpcTxEvent txCompensatedEvent(String globalTxId,
       String localTxId, String parentTxId) {
     return eventOf(EventType.TxCompensatedEvent, globalTxId, localTxId,
         parentTxId,  new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   public GrpcTxEvent txCompensateAckSucceedEvent(String globalTxId,
       String localTxId, String parentTxId) {
     return eventOf(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId,
         parentTxId, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0, 5);
   }
 
   public GrpcTxEvent txCompensateAckTimeoutEvent(String globalTxId,
       String localTxId, String parentTxId) {
     return eventOf(EventType.CompensateAckTimeoutEvent, globalTxId, localTxId,
         parentTxId, new byte[0], "", 0, "",
-        0, 0, 0, 0);
+        0, 0, 0, 0,5);
+  }
+
+  public GrpcTxEvent txCompensateAckFailedEvent(String globalTxId,
+      String localTxId, String parentTxId) {
+    return eventOf(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId,
+        parentTxId, new byte[0], "", 0, "",
+        0, 0, 0, 0, 5);
   }
 
   private GrpcTxEvent eventOf(EventType eventType,
@@ -285,7 +305,8 @@ public class OmegaEventSagaSimulator {
       int forwardRetries,
       int forwardTimeout,
       int reverseRetries,
-      int reverseTimeout) {
+      int reverseTimeout,
+      int retryDelayInMilliseconds) {
 
     return GrpcTxEvent.newBuilder()
         .setServiceName(serviceName)
@@ -302,6 +323,7 @@ public class OmegaEventSagaSimulator {
         .setRetryMethod(retryMethod)
         .setForwardRetries(forwardRetries)
         .setReverseRetries(reverseRetries)
+        .setRetryDelayInMilliseconds(retryDelayInMilliseconds)
         .setPayloads(ByteString.copyFrom(payloads))
         .build();
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
index 6a3bcb1..4402cd2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
@@ -44,19 +44,32 @@ public class CallbackContext {
   }
 
   public void apply(String globalTxId, String localTxId, String parentTxId, String callbackMethod, Object... payloads) {
-    CallbackContextInternal contextInternal = contexts.get(callbackMethod);
     String oldGlobalTxId = omegaContext.globalTxId();
     String oldLocalTxId = omegaContext.localTxId();
     try {
       omegaContext.setGlobalTxId(globalTxId);
       omegaContext.setLocalTxId(localTxId);
-      contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
-      if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
-        sender.send(
-            new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
-                parentTxId, callbackMethod));
+      if (contexts.containsKey(callbackMethod)) {
+        CallbackContextInternal contextInternal = contexts.get(callbackMethod);
+        contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
+        if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
+          sender.send(
+              new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+                  parentTxId, callbackMethod));
+        }
+        LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+      } else {
+        if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
+          String msg = "callback method " + callbackMethod
+              + " not found on CallbackContext, If it is starting, please try again later";
+          sender.send(
+              new TxCompensateAckFailedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+                  parentTxId, callbackMethod, new Exception(msg)));
+          LOG.error(msg);
+        }else{
+          throw new NullPointerException();
+        }
       }
-      LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
     } catch (IllegalAccessException | InvocationTargetException e) {
       if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
         sender.send(
@@ -64,7 +77,7 @@ public class CallbackContext {
                 parentTxId, callbackMethod, e));
       }
       LOG.error(
-          "Pre-checking for callback method " + contextInternal.callbackMethod.toString()
+          "Pre-checking for callback method " + callbackMethod
               + " was somehow skipped, did you forget to configure callback method checking on service startup?",
           e);
     } finally {