You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/09/30 10:10:05 UTC
[servicecomb-pack] 08/42: SCB-1368 Add shard region selection Actor
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 10c54afaafc5e2048f9a6cc7c34340cf0f4c77f8
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Tue Sep 10 17:26:57 2019 +0800
SCB-1368 Add shard region selection Actor
---
.../servicecomb/pack/alpha/fsm/SagaActor.java | 28 ++++++----
.../pack/alpha/fsm/SagaShardRegionActor.java | 65 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 12 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index fc06255..ba4b0ff 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -59,13 +59,13 @@ public class SagaActor extends
return Props.create(SagaActor.class, persistenceId);
}
- private final String persistenceId;
+ private String persistenceId;
private long sagaBeginTime;
private long sagaEndTime;
- public SagaActor(String persistenceId) {
- this.persistenceId = persistenceId;
+ public SagaActor() {
+ this.persistenceId = getSelf().path().name();
startWith(SagaActorState.IDLE, SagaData.builder().build());
@@ -380,6 +380,12 @@ public class SagaActor extends
@Override
public SagaData applyEvent(DomainEvent event, SagaData data) {
+ if (this.recoveryRunning()) {
+ LOG.info("SagaActor recovery {}",event.getEvent());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SagaActor apply event {}", event.getEvent());
+ }
// log event to SagaData
if (event.getEvent() != null && !(event
.getEvent() instanceof ComponsitedCheckEvent)) {
@@ -404,6 +410,7 @@ public class SagaActor extends
.compensationMethod(domainEvent.getCompensationMethod())
.payloads(domainEvent.getPayloads())
.state(domainEvent.getState())
+ .beginTime(domainEvent.getEvent().getCreateTime())
.build();
data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
} else {
@@ -412,7 +419,7 @@ public class SagaActor extends
} else if (event instanceof UpdateTxEventDomain) {
UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
- txEntity.setEndTime(new Date());
+ txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
if (domainEvent.getState() == TxState.COMMITTED) {
txEntity.setState(domainEvent.getState());
} else if (domainEvent.getState() == TxState.FAILED) {
@@ -441,27 +448,24 @@ public class SagaActor extends
}
});
} else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
- data.setEndTime(new Date());
+ data.setEndTime(event.getEvent().getCreateTime());
data.setTerminated(true);
data.setSuspendedType(domainEvent.getSuspendedType());
} else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
- data.setEndTime(new Date());
+ data.setEndTime(event.getEvent().getCreateTime());
data.setTerminated(true);
} else if (domainEvent.getState() == SagaActorState.COMMITTED) {
- data.setEndTime(new Date());
+ data.setEndTime(event.getEvent().getCreateTime());
data.setTerminated(true);
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
- }
return data;
}
@Override
public void onRecoveryCompleted() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+ if(stateName() != SagaActorState.IDLE){
+ LOG.info("SagaActor {} recovery completed, state={}", stateData().getGlobalTxId(), stateName());
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
new file mode 100644
index 0000000..d43ba85
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
@@ -0,0 +1,65 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.cluster.sharding.ClusterSharding;
+import akka.cluster.sharding.ClusterShardingSettings;
+import akka.cluster.sharding.ShardRegion;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+
+public class SagaShardRegionActor extends AbstractActor {
+
+ private final ActorRef workerRegion;
+
+ static ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
+ @Override
+ public String entityId(Object message) {
+ if (message instanceof BaseEvent) {
+ return ((BaseEvent) message).getGlobalTxId();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Object entityMessage(Object message) {
+ return message;
+ }
+
+ @Override
+ public String shardId(Object message) {
+ int numberOfShards = 100;
+ if (message instanceof BaseEvent) {
+ String actorId = ((BaseEvent) message).getGlobalTxId();
+ return String.valueOf(actorId.hashCode() % numberOfShards);
+ } else if (message instanceof ShardRegion.StartEntity) {
+ String actorId = ((ShardRegion.StartEntity) message).entityId();
+ return String.valueOf(actorId.hashCode() % numberOfShards);
+ } else {
+ return null;
+ }
+ }
+ };
+
+ public SagaShardRegionActor() {
+ ActorSystem system = getContext().getSystem();
+ ClusterShardingSettings settings = ClusterShardingSettings.create(system);
+ workerRegion = ClusterSharding.get(system)
+ .start(
+ "saga-actor",
+ Props.create(SagaActor.class),
+ settings,
+ messageExtractor);
+ }
+
+ @Override
+ public Receive createReceive() {
+ return receiveBuilder()
+ .matchAny(msg -> {
+ workerRegion.tell(msg, getSelf());
+ })
+ .build();
+ }
+}