You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/09/30 10:10:05 UTC

[servicecomb-pack] 08/42: SCB-1368 Add shard region selection Actor

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 10c54afaafc5e2048f9a6cc7c34340cf0f4c77f8
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Tue Sep 10 17:26:57 2019 +0800

    SCB-1368 Add shard region selection Actor
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 28 ++++++----
 .../pack/alpha/fsm/SagaShardRegionActor.java       | 65 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 12 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index fc06255..ba4b0ff 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -59,13 +59,13 @@ public class SagaActor extends
     return Props.create(SagaActor.class, persistenceId);
   }
 
-  private final String persistenceId;
+  private String persistenceId;
 
   private long sagaBeginTime;
   private long sagaEndTime;
 
-  public SagaActor(String persistenceId) {
-    this.persistenceId = persistenceId;
+  public SagaActor() {
+    this.persistenceId = getSelf().path().name();
 
     startWith(SagaActorState.IDLE, SagaData.builder().build());
 
@@ -380,6 +380,12 @@ public class SagaActor extends
 
   @Override
   public SagaData applyEvent(DomainEvent event, SagaData data) {
+    if (this.recoveryRunning()) {
+      LOG.info("SagaActor recovery {}",event.getEvent());
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SagaActor apply event {}", event.getEvent());
+    }
     // log event to SagaData
     if (event.getEvent() != null && !(event
         .getEvent() instanceof ComponsitedCheckEvent)) {
@@ -404,6 +410,7 @@ public class SagaActor extends
             .compensationMethod(domainEvent.getCompensationMethod())
             .payloads(domainEvent.getPayloads())
             .state(domainEvent.getState())
+            .beginTime(domainEvent.getEvent().getCreateTime())
             .build();
         data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
       } else {
@@ -412,7 +419,7 @@ public class SagaActor extends
     } else if (event instanceof UpdateTxEventDomain) {
       UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
       TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
-      txEntity.setEndTime(new Date());
+      txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
       if (domainEvent.getState() == TxState.COMMITTED) {
         txEntity.setState(domainEvent.getState());
       } else if (domainEvent.getState() == TxState.FAILED) {
@@ -441,27 +448,24 @@ public class SagaActor extends
           }
         });
       } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
-        data.setEndTime(new Date());
+        data.setEndTime(event.getEvent().getCreateTime());
         data.setTerminated(true);
         data.setSuspendedType(domainEvent.getSuspendedType());
       } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
-        data.setEndTime(new Date());
+        data.setEndTime(event.getEvent().getCreateTime());
         data.setTerminated(true);
       } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
-        data.setEndTime(new Date());
+        data.setEndTime(event.getEvent().getCreateTime());
         data.setTerminated(true);
       }
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
-    }
     return data;
   }
 
   @Override
   public void onRecoveryCompleted() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+    if(stateName() != SagaActorState.IDLE){
+      LOG.info("SagaActor {} recovery completed, state={}", stateData().getGlobalTxId(), stateName());
     }
   }
 
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
new file mode 100644
index 0000000..d43ba85
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
@@ -0,0 +1,65 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.cluster.sharding.ClusterSharding;
+import akka.cluster.sharding.ClusterShardingSettings;
+import akka.cluster.sharding.ShardRegion;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+
+public class SagaShardRegionActor extends AbstractActor {
+
+  private final ActorRef workerRegion;
+
+  static ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
+    @Override
+    public String entityId(Object message) {
+      if (message instanceof BaseEvent) {
+        return ((BaseEvent) message).getGlobalTxId();
+      } else {
+        return null;
+      }
+    }
+
+    @Override
+    public Object entityMessage(Object message) {
+      return message;
+    }
+
+    @Override
+    public String shardId(Object message) {
+      int numberOfShards = 100;
+      if (message instanceof BaseEvent) {
+        String actorId = ((BaseEvent) message).getGlobalTxId();
+        return String.valueOf(actorId.hashCode() % numberOfShards);
+      } else if (message instanceof ShardRegion.StartEntity) {
+        String actorId = ((ShardRegion.StartEntity) message).entityId();
+        return String.valueOf(actorId.hashCode() % numberOfShards);
+      } else {
+        return null;
+      }
+    }
+  };
+
+  public SagaShardRegionActor() {
+    ActorSystem system = getContext().getSystem();
+    ClusterShardingSettings settings = ClusterShardingSettings.create(system);
+    workerRegion = ClusterSharding.get(system)
+        .start(
+            "saga-actor",
+            Props.create(SagaActor.class),
+            settings,
+            messageExtractor);
+  }
+
+  @Override
+  public Receive createReceive() {
+    return receiveBuilder()
+        .matchAny(msg -> {
+          workerRegion.tell(msg, getSelf());
+        })
+        .build();
+  }
+}