You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2020/06/23 23:39:58 UTC

[servicecomb-pack] 02/04: SCB-2004 Use message-driven compensation retry instead of recursive compensation

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 8e0afb719be61701f358a31bd841a5a1e4c09da6
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jun 22 23:55:21 2020 +0800

    SCB-2004 Use message-driven compensation retry instead of recursive compensation
---
 .../fsm/event/internal/ComponsitedCheckEvent.java  |  37 +++++
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 154 +++++++++++++--------
 .../pack/alpha/fsm/domain/UpdateTxEventDomain.java |   7 +
 .../pack/alpha/fsm/model/TxEntities.java           |   8 ++
 .../pack/alpha/server/fsm/GrpcOmegaCallback.java   |   2 +-
 5 files changed, 151 insertions(+), 57 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
index 467952f..63ab314 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
@@ -17,10 +17,17 @@
 
 package org.apache.servicecomb.pack.alpha.core.fsm.event.internal;
 
+import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
 
 public class ComponsitedCheckEvent extends TxEvent {
 
+  private TxState preState;
+
+  public TxState getPreState() {
+    return preState;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -32,6 +39,36 @@ public class ComponsitedCheckEvent extends TxEvent {
     private Builder() {
       txComponsitedEvent = new ComponsitedCheckEvent();
     }
+    
+    public Builder serviceName(String serviceName) {
+      txComponsitedEvent.setServiceName(serviceName);
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      txComponsitedEvent.setInstanceId(instanceId);
+      return this;
+    }
+
+    public Builder parentTxId(String parentTxId) {
+      txComponsitedEvent.setParentTxId(parentTxId);
+      return this;
+    }
+
+    public Builder localTxId(String localTxId) {
+      txComponsitedEvent.setLocalTxId(localTxId);
+      return this;
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      txComponsitedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public Builder preState(TxState txState) {
+      txComponsitedEvent.preState = txState;
+      return this;
+    }
 
     public ComponsitedCheckEvent build() {
       return txComponsitedEvent;
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 017ee0e..9033683 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -32,23 +32,23 @@ import java.util.concurrent.TimeoutException;
 import org.apache.servicecomb.pack.alpha.core.AlphaException;
 import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
@@ -239,27 +239,50 @@ public class SagaActor extends
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
-                self().tell(ComponsitedCheckEvent.builder().build(), self());
+                self().tell(ComponsitedCheckEvent.builder()
+                    .serviceName(event.getServiceName())
+                    .instanceId(event.getInstanceId())
+                    .globalTxId(event.getGlobalTxId())
+                    .localTxId(event.getLocalTxId())
+                    .parentTxId(event.getParentTxId())
+                    .preState(TxState.COMPENSATED_SUCCEED)
+                    .build(), self());
               }));
             }
         ).event(TxCompensateAckFailedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
-                self().tell(ComponsitedCheckEvent.builder().build(), self());
+                self().tell(ComponsitedCheckEvent.builder()
+                    .serviceName(event.getServiceName())
+                    .instanceId(event.getInstanceId())
+                    .globalTxId(event.getGlobalTxId())
+                    .localTxId(event.getLocalTxId())
+                    .parentTxId(event.getParentTxId())
+                    .preState(TxState.COMPENSATED_FAILED)
+                    .build(), self());
               }));
             }
         ).event(CompensateAckTimeoutEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
-                self().tell(ComponsitedCheckEvent.builder().build(), self());
+                self().tell(ComponsitedCheckEvent.builder()
+                    .serviceName(event.getServiceName())
+                    .instanceId(event.getInstanceId())
+                    .globalTxId(event.getGlobalTxId())
+                    .localTxId(event.getLocalTxId())
+                    .parentTxId(event.getParentTxId())
+                    .preState(TxState.COMPENSATED_FAILED)
+                    .build(), self());
               }));
             }
         ).event(ComponsitedCheckEvent.class, SagaData.class,
             (event, data) -> {
-              if (data.getTxEntities().hasCompensationSentTx() || !data.isTerminated()) {
-                return stay();
+              if (data.getTxEntities().hasCompensationSentTx() ||
+                  data.getTxEntities().hasCompensationFailedTx()) {
+                UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
+                return stay().applying(domainEvent);
               } else {
                 if(data.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
                   SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED);
@@ -272,7 +295,6 @@ public class SagaActor extends
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
             (event, data) -> {
-              data.setTerminated(true);
               if (data.getTxEntities().hasCommittedTx()) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
                 return stay()
@@ -299,7 +321,7 @@ public class SagaActor extends
               return stay().applying(domainEvent).andThen(exec(_data -> {
                 TxEntity txEntity = _data.getTxEntities().get(event.getLocalTxId());
                 // call compensate
-                compensation(txEntity, _data);
+                compensation(domainEvent, txEntity, _data);
               }));
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
@@ -340,7 +362,7 @@ public class SagaActor extends
     whenUnhandled(
         matchAnyEvent((event, data) -> {
           if (event instanceof BaseEvent){
-            LOG.error("Unhandled event {}", event);
+            LOG.debug("Unhandled event {}", event);
           }
           return stay();
         })
@@ -443,6 +465,7 @@ public class SagaActor extends
 
   @Override
   public SagaData applyEvent(DomainEvent event, SagaData data) {
+    LOG.debug("apply domain event {}", event.getEvent());
     try{
       if (this.recoveryRunning()) {
         LOG.info("recovery {}",event.getEvent());
@@ -493,40 +516,58 @@ public class SagaActor extends
           data.getTxEntities().forEachReverse((k, v) -> {
             if (v.getState() == TxState.COMMITTED) {
               // call compensate
-              compensation(v, data);
+              if (!compensation(domainEvent, v, data)) {
+                return;
+              }
             }
           });
         } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
-          // decrement the compensation running counter by one
           data.getCompensationRunningCounter().decrementAndGet();
-          txEntity.setState(domainEvent.getState());
-          LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
+          txEntity.setState(TxState.COMPENSATED_SUCCEED);
+          LOG.info("compensate is succeed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId());
         } else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) {
           data.getCompensationRunningCounter().decrementAndGet();
-          txEntity.setState(domainEvent.getState());
+          txEntity.setState(TxState.COMPENSATED_FAILED);
           txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
-          LOG.info("compensation is failed {}", txEntity.getLocalTxId());
+          if (txEntity.getReverseRetries() > 0 &&
+              txEntity.getRetriesCounter().get() < txEntity.getReverseRetries()) {
+            data.getTxEntities().forEachReverse((k, v) -> {
+              if (v.getState() == TxState.COMMITTED || v.getState() == TxState.COMPENSATED_FAILED) {
+                // call compensate
+                if (!compensation(domainEvent, v, data)){
+                  return;
+                }
+              }
+            });
+          } else {
+            data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
+            self().tell(ComponsitedCheckEvent.builder()
+                .serviceName(txEntity.getServiceName())
+                .instanceId(txEntity.getInstanceId())
+                .globalTxId(txEntity.getGlobalTxId())
+                .localTxId(txEntity.getLocalTxId())
+                .preState(TxState.COMPENSATED_FAILED)
+                .parentTxId(txEntity.getParentTxId()).build(), self());
+          }
         }
       } else if (event instanceof SagaEndedDomain) {
         SagaEndedDomain domainEvent = (SagaEndedDomain) event;
         if (domainEvent.getState() == SagaActorState.FAILED) {
-          data.setTerminated(true);
           data.getTxEntities().forEachReverse((k, v) -> {
             if (v.getState() == TxState.COMMITTED) {
               // call compensate
-              compensation(v, data);
+              if (!compensation(domainEvent, v, data)){
+                return;
+              }
             }
           });
         } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
           data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
-          data.setTerminated(true);
           data.setSuspendedType(domainEvent.getSuspendedType());
         } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
           data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
-          data.setTerminated(true);
         } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
           data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
-          data.setTerminated(true);
         }
       }
     }catch (Exception ex){
@@ -558,22 +599,29 @@ public class SagaActor extends
   }
 
   //call omega compensate method
-  private void compensation(TxEntity txEntity, SagaData data) {
+  private boolean compensation(DomainEvent event, TxEntity txEntity, SagaData data) {
     // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
     txEntity.setState(TxState.COMPENSATION_SENT);
     try {
+      LOG.info("compensate {} {} [{}] {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getGlobalTxId(), txEntity.getLocalTxId());
       SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
-      LOG.info("compensate {} {} {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getLocalTxId());
-    } catch (AlphaException ex) {
-      LOG.error(ex.getMessage(), ex);
-      try {
-        Thread.sleep(txEntity.getRetryDelayInMilliseconds());
-      } catch (InterruptedException e) {
-        LOG.error(e.getMessage(), e);
-      }
-      compensation(txEntity, data);
     } catch (Exception ex) {
+      LOG.error("compensate failed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId(), ex);
+      if (txEntity.getReverseRetries() > 0 &&
+          txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
+        LOG.info("Retry compensate {}/{} [{}] {} after {} ms",
+            txEntity.getRetriesCounter().get() + 1,
+            txEntity.getReverseRetries(),
+            txEntity.getGlobalTxId(),
+            txEntity.getLocalTxId(),
+            txEntity.getRetryDelayInMilliseconds());
+        try {
+          Thread.sleep(txEntity.getRetryDelayInMilliseconds());
+        } catch (InterruptedException e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
       if (ex instanceof TimeoutException) {
         StringWriter writer = new StringWriter();
         ex.printStackTrace(new PrintWriter(writer));
@@ -581,7 +629,7 @@ public class SagaActor extends
         if (stackTrace.length() > Environment.getInstance().getPayloadsMaxLength()) {
           stackTrace = stackTrace.substring(0, Environment.getInstance().getPayloadsMaxLength());
         }
-        CompensateAckTimeoutEvent event = CompensateAckTimeoutEvent.builder()
+        CompensateAckTimeoutEvent compensateAckTimeoutEvent = CompensateAckTimeoutEvent.builder()
             .createTime(new Date(System.currentTimeMillis()))
             .globalTxId(txEntity.getGlobalTxId())
             .parentTxId(txEntity.getParentTxId())
@@ -590,26 +638,20 @@ public class SagaActor extends
             .instanceId(txEntity.getInstanceId())
             .payloads(stackTrace.getBytes())
             .build();
-        self().tell(event, self());
+        self().tell(compensateAckTimeoutEvent, self());
       }
-      LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
-      if (txEntity.getReverseRetries() > 0) {
-        // which means the retry number
-        if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
-          LOG.info("Retry compensate {}/{} after {} ms", txEntity.getRetriesCounter().get() + 1, txEntity.getReverseRetries(),
-              txEntity.getRetryDelayInMilliseconds());
-          try {
-            Thread.sleep(txEntity.getRetryDelayInMilliseconds());
-          } catch (InterruptedException e) {
-            LOG.error(e.getMessage(), e);
-          }
-          compensation(txEntity, data);
-        } else {
-          data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
-        }
-      } else {
-        data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
+      if (ex instanceof AlphaException) {
+        self().tell(TxCompensateAckFailedEvent.builder()
+            .serviceName(txEntity.getServiceName())
+            .instanceId(txEntity.getInstanceId())
+            .globalTxId(txEntity.getGlobalTxId())
+            .localTxId(txEntity.getLocalTxId())
+            .parentTxId(txEntity.getParentTxId())
+            .payloads(ex.getMessage().getBytes())
+            .build(), self());
       }
+      return false;
     }
+    return true;
   }
 }
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 97d3b02..6cd9d00 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -24,6 +24,7 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEv
 import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
 
 public class UpdateTxEventDomain implements DomainEvent {
   private String localTxId;
@@ -63,6 +64,12 @@ public class UpdateTxEventDomain implements DomainEvent {
     this.state = TxState.COMPENSATED_FAILED;
   }
 
+  public UpdateTxEventDomain(ComponsitedCheckEvent event) {
+    this.event = event;
+    this.localTxId = event.getLocalTxId();
+    this.state = event.getPreState();
+  }
+
   public String getLocalTxId() {
     return localTxId;
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
index a4531d5..8ba72fd 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
@@ -67,4 +67,12 @@ public class TxEntities {
         .filter(map -> map.getValue().getState() == TxState.COMPENSATION_SENT)
         .count() > 0;
   }
+
+  public boolean hasCompensationFailedTx() {
+    return entities.entrySet().stream()
+        .filter(map -> map.getValue().getState() == TxState.COMPENSATED_FAILED
+            && map.getValue().getReverseRetries() > 0
+            && map.getValue().getReverseRetries() > map.getValue().getRetriesCounter().get())
+        .count() > 0;
+  }
 }
\ No newline at end of file
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
index 027046d..dcc7eea 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
@@ -55,7 +55,7 @@ class GrpcOmegaCallback implements OmegaCallback {
       if (compensateAckCountDownLatch.getType() == CompensateAckType.Disconnected) {
         throw new CompensateConnectException("Omega connect exception");
       }else{
-        LOG.info("compensate ack "+ compensateAckCountDownLatch.getType().name());
+        LOG.debug("compensate ack "+ compensateAckCountDownLatch.getType().name());
         if(compensateAckCountDownLatch.getType() == CompensateAckType.Failed){
           throw new CompensateAckFailedException("An exception is thrown inside the compensation method");
         }