You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/07/15 03:42:21 UTC

[servicecomb-pack] branch master updated: SCB-1376 Use the internal event to trigger SagaActor stop

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d39af2  SCB-1376 Use the internal event to trigger SagaActor stop
5d39af2 is described below

commit 5d39af2b5b0da473eff30e5deb1cebbeab68fd1a
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Fri Jul 12 20:15:30 2019 +0800

    SCB-1376 Use the internal event to trigger SagaActor stop
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 55 ++++++++++------------
 .../ComponsitedCheckEvent.java}                    | 10 ++--
 .../StopEvent.java}                                | 14 +++---
 3 files changed, 36 insertions(+), 43 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index cfa8eaa..cb97cb1 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -36,7 +36,7 @@ import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedCheckInternalEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.internal.ComponsitedCheckEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
@@ -98,22 +98,19 @@ public class SagaActor extends
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
-                  .applying(domainEvent)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                  .applying(domainEvent);
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
-                  .applying(domainEvent)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                  .applying(domainEvent);
             }
         ).event(Collections.singletonList(StateTimeout()), SagaData.class,
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
-                  .applying(domainEvent)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                  .applying(domainEvent);
             })
     );
 
@@ -145,8 +142,7 @@ public class SagaActor extends
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
-                  .applying(domainEvent)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                  .applying(domainEvent);
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
@@ -156,8 +152,7 @@ public class SagaActor extends
             }
         ).event(Collections.singletonList(StateTimeout()), SagaData.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+              return goTo(SagaActorState.SUSPENDED);
             })
     );
 
@@ -189,15 +184,13 @@ public class SagaActor extends
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
-                  .applying(domainEvent)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                  .applying(domainEvent);
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
               return goTo(SagaActorState.COMMITTED)
-                  .applying(domainEvent)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                  .applying(domainEvent);
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
@@ -211,8 +204,7 @@ public class SagaActor extends
             }
         ).event(Collections.singletonList(StateTimeout()), SagaData.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));//.replying(data);
+              return goTo(SagaActorState.SUSPENDED);
             })
     );
 
@@ -221,17 +213,16 @@ public class SagaActor extends
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
-                  .applying(domainEvent)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                  .applying(domainEvent);
             }
         ).event(TxCompensatedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
-                self().tell(TxComponsitedCheckInternalEvent.builder().build(), self());
+                self().tell(ComponsitedCheckEvent.builder().build(), self());
               }));
             }
-        ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
+        ).event(ComponsitedCheckEvent.class, SagaData.class,
             (event, data) -> {
               if (hasCompensationSentTx(data) || !data.isTerminated()) {
                 return stay();
@@ -239,8 +230,7 @@ public class SagaActor extends
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event,
                     SagaActorState.COMPENSATED);
                 return goTo(SagaActorState.COMPENSATED)
-                    .applying(domainEvent)
-                    .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                    .applying(domainEvent);
               }
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
@@ -258,8 +248,7 @@ public class SagaActor extends
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event,
                     SagaActorState.COMPENSATED);
                 return goTo(SagaActorState.COMPENSATED)
-                    .applying(domainEvent)
-                    .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                    .applying(domainEvent);
               }
             }
         ).event(TxStartedEvent.class, SagaData.class,
@@ -280,13 +269,12 @@ public class SagaActor extends
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
-                  .applying(domainEvent)
-                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+                  .applying(domainEvent);
             })
     );
 
     when(SagaActorState.COMMITTED,
-        matchAnyEvent(
+        matchEvent(org.apache.servicecomb.pack.alpha.fsm.event.internal.StopEvent.class,
             (event, data) -> {
               //  已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
               //  以下基于 journal-redis 说明:
@@ -315,7 +303,7 @@ public class SagaActor extends
     );
 
     when(SagaActorState.SUSPENDED,
-        matchAnyEvent(
+        matchEvent(org.apache.servicecomb.pack.alpha.fsm.event.internal.StopEvent.class,
             (event, data) -> {
               deleteMessages(lastSequenceNr());
               deleteSnapshot(snapshotSequenceNr());
@@ -325,7 +313,7 @@ public class SagaActor extends
     );
 
     when(SagaActorState.COMPENSATED,
-        matchAnyEvent(
+        matchEvent(org.apache.servicecomb.pack.alpha.fsm.event.internal.StopEvent.class,
             (event, data) -> {
               deleteMessages(lastSequenceNr());
               deleteSnapshot(snapshotSequenceNr());
@@ -351,6 +339,11 @@ public class SagaActor extends
           if (LOG.isDebugEnabled()) {
             LOG.debug("transition {} {} -> {}", getSelf(), from, to);
           }
+          if (to == SagaActorState.COMMITTED ||
+              to == SagaActorState.SUSPENDED ||
+              to == SagaActorState.COMPENSATED) {
+            self().tell(org.apache.servicecomb.pack.alpha.fsm.event.internal.StopEvent.builder().build(), self());
+          }
         })
     );
 
@@ -375,7 +368,7 @@ public class SagaActor extends
   public SagaData applyEvent(DomainEvent event, SagaData data) {
     // log event to SagaData
     if (event.getEvent() != null && !(event
-        .getEvent() instanceof TxComponsitedCheckInternalEvent)) {
+        .getEvent() instanceof ComponsitedCheckEvent)) {
       data.logEvent(event.getEvent());
     }
     if (event instanceof SagaStartedDomain) {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/internal/ComponsitedCheckEvent.java
similarity index 77%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/internal/ComponsitedCheckEvent.java
index 225ef26..7ccb581 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/internal/ComponsitedCheckEvent.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event;
+package org.apache.servicecomb.pack.alpha.fsm.event.internal;
 
 import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 
-public class TxComponsitedCheckInternalEvent extends TxEvent {
+public class ComponsitedCheckEvent extends TxEvent {
 
   public static Builder builder() {
     return new Builder();
@@ -27,13 +27,13 @@ public class TxComponsitedCheckInternalEvent extends TxEvent {
 
   public static final class Builder {
 
-    private TxComponsitedCheckInternalEvent txComponsitedEvent;
+    private ComponsitedCheckEvent txComponsitedEvent;
 
     private Builder() {
-      txComponsitedEvent = new TxComponsitedCheckInternalEvent();
+      txComponsitedEvent = new ComponsitedCheckEvent();
     }
 
-    public TxComponsitedCheckInternalEvent build() {
+    public ComponsitedCheckEvent build() {
       return txComponsitedEvent;
     }
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/internal/StopEvent.java
similarity index 70%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/internal/StopEvent.java
index 225ef26..dd443d5 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/internal/StopEvent.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event;
+package org.apache.servicecomb.pack.alpha.fsm.event.internal;
 
-import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
 
-public class TxComponsitedCheckInternalEvent extends TxEvent {
+public class StopEvent extends BaseEvent {
 
   public static Builder builder() {
     return new Builder();
@@ -27,14 +27,14 @@ public class TxComponsitedCheckInternalEvent extends TxEvent {
 
   public static final class Builder {
 
-    private TxComponsitedCheckInternalEvent txComponsitedEvent;
+    private StopEvent stopEvent;
 
     private Builder() {
-      txComponsitedEvent = new TxComponsitedCheckInternalEvent();
+      stopEvent = new StopEvent();
     }
 
-    public TxComponsitedCheckInternalEvent build() {
-      return txComponsitedEvent;
+    public StopEvent build() {
+      return stopEvent;
     }
   }
 }