You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/09/30 10:10:17 UTC
[servicecomb-pack] 20/42: SCB-1368 Delete the Actor state
persistent data after transaction data is saved successfully
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 6779ea15fa757ba10535391e01dac412f1dbe518
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Sat Sep 21 01:06:05 2019 +0800
SCB-1368 Delete the Actor state persistent data after transaction data is saved successfully
---
.../servicecomb/pack/alpha/fsm/SagaActor.java | 278 +++++++++++++--------
.../src/main/resources/application.yaml | 19 +-
2 files changed, 180 insertions(+), 117 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index ba4b0ff..4bd536e 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -17,7 +17,9 @@
package org.apache.servicecomb.pack.alpha.fsm;
+import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.cluster.sharding.ShardRegion;
import akka.persistence.fsm.AbstractPersistentFSM;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
@@ -27,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.pack.alpha.core.AlphaException;
import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
@@ -72,6 +75,7 @@ public class SagaActor extends
when(SagaActorState.IDLE,
matchEvent(SagaStartedEvent.class,
(event, data) -> {
+ log(event);
sagaBeginTime = System.currentTimeMillis();
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
SagaStartedDomain domainEvent = new SagaStartedDomain(event);
@@ -92,6 +96,7 @@ public class SagaActor extends
when(SagaActorState.READY,
matchEvent(TxStartedEvent.class, SagaData.class,
(event, data) -> {
+ log(event);
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
@@ -104,12 +109,14 @@ public class SagaActor extends
}
).event(SagaEndedEvent.class,
(event, data) -> {
+ log(event);
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(SagaAbortedEvent.class,
(event, data) -> {
+ log(event);
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
@@ -125,6 +132,7 @@ public class SagaActor extends
when(SagaActorState.PARTIALLY_ACTIVE,
matchEvent(TxEndedEvent.class, SagaData.class,
(event, data) -> {
+ log(event);
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_COMMITTED)
@@ -137,6 +145,7 @@ public class SagaActor extends
}
).event(TxStartedEvent.class,
(event, data) -> {
+ log(event);
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return stay()
@@ -148,6 +157,7 @@ public class SagaActor extends
}
).event(SagaTimeoutEvent.class,
(event, data) -> {
+ log(event);
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED,
SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
@@ -155,6 +165,7 @@ public class SagaActor extends
}
).event(TxAbortedEvent.class,
(event, data) -> {
+ log(event);
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return goTo(SagaActorState.FAILED)
.applying(domainEvent);
@@ -169,6 +180,7 @@ public class SagaActor extends
when(SagaActorState.PARTIALLY_COMMITTED,
matchEvent(TxStartedEvent.class,
(event, data) -> {
+ log(event);
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
@@ -181,6 +193,7 @@ public class SagaActor extends
}
).event(TxEndedEvent.class,
(event, data) -> {
+ log(event);
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
if (data.getExpirationTime() != null) {
return stay()
@@ -192,23 +205,27 @@ public class SagaActor extends
}
).event(SagaTimeoutEvent.class,
(event, data) -> {
+ log(event);
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(SagaEndedEvent.class,
(event, data) -> {
+ log(event);
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
return goTo(SagaActorState.COMMITTED)
.applying(domainEvent);
}
).event(SagaAbortedEvent.class,
(event, data) -> {
+ log(event);
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
return goTo(SagaActorState.FAILED).applying(domainEvent);
}
).event(TxAbortedEvent.class,
(event, data) -> {
+ log(event);
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return goTo(SagaActorState.FAILED).applying(domainEvent);
}
@@ -222,12 +239,14 @@ public class SagaActor extends
when(SagaActorState.FAILED,
matchEvent(SagaTimeoutEvent.class, SagaData.class,
(event, data) -> {
+ log(event);
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(TxCompensatedEvent.class, SagaData.class,
(event, data) -> {
+ log(event);
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
self().tell(ComponsitedCheckEvent.builder().build(), self());
@@ -235,6 +254,7 @@ public class SagaActor extends
}
).event(ComponsitedCheckEvent.class, SagaData.class,
(event, data) -> {
+ log(event);
if (hasCompensationSentTx(data) || !data.isTerminated()) {
return stay();
} else {
@@ -246,6 +266,7 @@ public class SagaActor extends
}
).event(SagaAbortedEvent.class, SagaData.class,
(event, data) -> {
+ log(event);
data.setTerminated(true);
if (hasCommittedTx(data)) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
@@ -264,11 +285,13 @@ public class SagaActor extends
}
).event(TxStartedEvent.class, SagaData.class,
(event, data) -> {
+ log(event);
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
return stay().applying(domainEvent);
}
).event(TxEndedEvent.class, SagaData.class,
(event, data) -> {
+ log(event);
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId());
@@ -287,27 +310,8 @@ public class SagaActor extends
when(SagaActorState.COMMITTED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
- // 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
- // 以下基于 journal-redis 说明:
- // 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
- // journal:persistenceIds
- // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
- // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
- //
- // 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
- // 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
- // 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
- // 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
- //
- // 何如清理:
- // 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
- // 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
- // 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
- // 并删除 journal:persisted:item:highestSequenceNr
- //
- // 目前可以看到的解释是 https://github.com/akka/akka/issues/21181
- deleteMessages(lastSequenceNr());
- deleteSnapshot(snapshotSequenceNr());
+ log(event);
+ beforeStop(stateName(), data);
return stop();
}
)
@@ -316,8 +320,8 @@ public class SagaActor extends
when(SagaActorState.SUSPENDED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
- deleteMessages(lastSequenceNr());
- deleteSnapshot(snapshotSequenceNr());
+ log(event);
+ beforeStop(stateName(), data);
return stop();
}
)
@@ -326,8 +330,8 @@ public class SagaActor extends
when(SagaActorState.COMPENSATED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
- deleteMessages(lastSequenceNr());
- deleteSnapshot(snapshotSequenceNr());
+ log(event);
+ beforeStop(stateName(), data);
return stop();
}
)
@@ -348,13 +352,14 @@ public class SagaActor extends
.putSagaData(stateData().getGlobalTxId(), stateData());
}
if (LOG.isDebugEnabled()) {
- LOG.debug("transition {} {} -> {}", getSelf(), from, to);
+ LOG.debug("transition {} {} -> {}", stateData().getGlobalTxId(), from, to);
}
if (to == SagaActorState.COMMITTED ||
to == SagaActorState.SUSPENDED ||
to == SagaActorState.COMPENSATED) {
self().tell(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.builder().build(), self());
}
+ LOG.info("transition {} {} -> {}", stateData().getGlobalTxId(), from, to);
})
);
@@ -362,102 +367,151 @@ public class SagaActor extends
matchStop(
Normal(), (state, data) -> {
if (LOG.isDebugEnabled()) {
- LOG.debug("stop {} {}", data.getGlobalTxId(), state);
+ LOG.debug("saga actor stopped {} {}", getSelf(), state);
}
- sagaEndTime = System.currentTimeMillis();
- SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
- SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaAvgTime(sagaEndTime - sagaBeginTime);
- data.setLastState(state);
- data.setEndTime(new Date());
- data.setTerminated(true);
- SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
- .stopSagaData(data.getGlobalTxId(), data);
+ LOG.info("stopped {} {}", data.getGlobalTxId(), state);
}
)
);
}
- @Override
- public SagaData applyEvent(DomainEvent event, SagaData data) {
- if (this.recoveryRunning()) {
- LOG.info("SagaActor recovery {}",event.getEvent());
- }
+ private void beforeStop(SagaActorState state, SagaData data){
if (LOG.isDebugEnabled()) {
- LOG.debug("SagaActor apply event {}", event.getEvent());
+ LOG.debug("stop {} {}", data.getGlobalTxId(), state);
}
- // log event to SagaData
- if (event.getEvent() != null && !(event
- .getEvent() instanceof ComponsitedCheckEvent)) {
- data.logEvent(event.getEvent());
+ try{
+ sagaEndTime = System.currentTimeMillis();
+ data.setLastState(state);
+ data.setEndTime(new Date());
+ data.setTerminated(true);
+ SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
+ .stopSagaData(data.getGlobalTxId(), data);
+ SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
+ SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system())
+ .doSagaAvgTime(sagaEndTime - sagaBeginTime);
+
+ // destroy self from cluster shard region
+ getContext().getParent()
+ .tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("destroy saga actor {} from cluster shard region", getSelf());
+ }
+
+ // clear self mailbox from persistence
+ // 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
+ // 以下基于 journal-redis 说明:
+ // 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
+ // journal:persistenceIds
+ // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
+ // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
+ //
+ // 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
+ // 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
+ // 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
+ // 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
+ //
+ // 何如清理:
+ // 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
+ // 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
+ // 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
+ // 并删除 journal:persisted:item:highestSequenceNr
+ //
+ // 目前可以看到的解释是 https://github.com/akka/akka/issues/21181
+ deleteMessages(lastSequenceNr());
+ deleteSnapshot(snapshotSequenceNr());
+ }catch(Exception e){
+ LOG.error("stop {} fail",data.getGlobalTxId());
+ throw e;
}
- if (event instanceof SagaStartedDomain) {
- SagaStartedDomain domainEvent = (SagaStartedDomain) event;
- data.setServiceName(domainEvent.getEvent().getServiceName());
- data.setInstanceId(domainEvent.getEvent().getInstanceId());
- data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId());
- data.setBeginTime(domainEvent.getEvent().getCreateTime());
- data.setExpirationTime(domainEvent.getExpirationTime());
- } else if (event instanceof AddTxEventDomain) {
- AddTxEventDomain domainEvent = (AddTxEventDomain) event;
- if (!data.getTxEntityMap().containsKey(domainEvent.getEvent().getLocalTxId())) {
- TxEntity txEntity = TxEntity.builder()
- .serviceName(domainEvent.getEvent().getServiceName())
- .instanceId(domainEvent.getEvent().getInstanceId())
- .globalTxId(domainEvent.getEvent().getGlobalTxId())
- .localTxId(domainEvent.getEvent().getLocalTxId())
- .parentTxId(domainEvent.getEvent().getParentTxId())
- .compensationMethod(domainEvent.getCompensationMethod())
- .payloads(domainEvent.getPayloads())
- .state(domainEvent.getState())
- .beginTime(domainEvent.getEvent().getCreateTime())
- .build();
- data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
- } else {
- LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId());
+ }
+
+ @Override
+ public SagaData applyEvent(DomainEvent event, SagaData data) {
+ try{
+ if (this.recoveryRunning()) {
+ LOG.info("SagaActor recovery {}",event.getEvent());
+ }else if (LOG.isDebugEnabled()) {
+ LOG.debug("SagaActor apply event {}", event.getEvent());
}
- } else if (event instanceof UpdateTxEventDomain) {
- UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
- TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
- txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
- if (domainEvent.getState() == TxState.COMMITTED) {
- txEntity.setState(domainEvent.getState());
- } else if (domainEvent.getState() == TxState.FAILED) {
- txEntity.setState(domainEvent.getState());
- txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
- data.getTxEntityMap().forEach((k, v) -> {
- if (v.getState() == TxState.COMMITTED) {
- // call compensate
- compensation(v, data);
- }
- });
- } else if (domainEvent.getState() == TxState.COMPENSATED) {
- // decrement the compensation running counter by one
- data.getCompensationRunningCounter().decrementAndGet();
- txEntity.setState(domainEvent.getState());
- LOG.info("compensation is completed {}", txEntity.getLocalTxId());
+ // log event to SagaData
+ if (event.getEvent() != null && !(event
+ .getEvent() instanceof ComponsitedCheckEvent)) {
+ data.logEvent(event.getEvent());
}
- } else if (event instanceof SagaEndedDomain) {
- SagaEndedDomain domainEvent = (SagaEndedDomain) event;
- if (domainEvent.getState() == SagaActorState.FAILED) {
- data.setTerminated(true);
- data.getTxEntityMap().forEach((k, v) -> {
- if (v.getState() == TxState.COMMITTED) {
- // call compensate
- compensation(v, data);
- }
- });
- } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
- data.setEndTime(event.getEvent().getCreateTime());
- data.setTerminated(true);
- data.setSuspendedType(domainEvent.getSuspendedType());
- } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
- data.setEndTime(event.getEvent().getCreateTime());
- data.setTerminated(true);
- } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
- data.setEndTime(event.getEvent().getCreateTime());
- data.setTerminated(true);
+ if (event instanceof SagaStartedDomain) {
+ SagaStartedDomain domainEvent = (SagaStartedDomain) event;
+ data.setServiceName(domainEvent.getEvent().getServiceName());
+ data.setInstanceId(domainEvent.getEvent().getInstanceId());
+ data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId());
+ data.setBeginTime(domainEvent.getEvent().getCreateTime());
+ data.setExpirationTime(domainEvent.getExpirationTime());
+ } else if (event instanceof AddTxEventDomain) {
+ AddTxEventDomain domainEvent = (AddTxEventDomain) event;
+ if (!data.getTxEntityMap().containsKey(domainEvent.getEvent().getLocalTxId())) {
+ TxEntity txEntity = TxEntity.builder()
+ .serviceName(domainEvent.getEvent().getServiceName())
+ .instanceId(domainEvent.getEvent().getInstanceId())
+ .globalTxId(domainEvent.getEvent().getGlobalTxId())
+ .localTxId(domainEvent.getEvent().getLocalTxId())
+ .parentTxId(domainEvent.getEvent().getParentTxId())
+ .compensationMethod(domainEvent.getCompensationMethod())
+ .payloads(domainEvent.getPayloads())
+ .state(domainEvent.getState())
+ .beginTime(domainEvent.getEvent().getCreateTime())
+ .build();
+ data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
+ } else {
+ LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId());
+ }
+ } else if (event instanceof UpdateTxEventDomain) {
+ UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
+ TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
+ txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
+ if (domainEvent.getState() == TxState.COMMITTED) {
+ txEntity.setState(domainEvent.getState());
+ } else if (domainEvent.getState() == TxState.FAILED) {
+ txEntity.setState(domainEvent.getState());
+ txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
+ data.getTxEntityMap().forEach((k, v) -> {
+ if (v.getState() == TxState.COMMITTED) {
+ // call compensate
+ compensation(v, data);
+ }
+ });
+ } else if (domainEvent.getState() == TxState.COMPENSATED) {
+ // decrement the compensation running counter by one
+ data.getCompensationRunningCounter().decrementAndGet();
+ txEntity.setState(domainEvent.getState());
+ LOG.info("compensation is completed {}", txEntity.getLocalTxId());
+ }
+ } else if (event instanceof SagaEndedDomain) {
+ SagaEndedDomain domainEvent = (SagaEndedDomain) event;
+ if (domainEvent.getState() == SagaActorState.FAILED) {
+ data.setTerminated(true);
+ data.getTxEntityMap().forEach((k, v) -> {
+ if (v.getState() == TxState.COMMITTED) {
+ // call compensate
+ compensation(v, data);
+ }
+ });
+ } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
+ data.setEndTime(event.getEvent().getCreateTime());
+ data.setTerminated(true);
+ data.setSuspendedType(domainEvent.getSuspendedType());
+ } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
+ data.setEndTime(event.getEvent().getCreateTime());
+ data.setTerminated(true);
+ } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
+ data.setEndTime(event.getEvent().getCreateTime());
+ data.setTerminated(true);
+ }
}
+ }catch (Exception ex){
+ LOG.error("SagaActor apply event {}", event.getEvent());
+ beforeStop(SagaActorState.SUSPENDED, data);
+ stop();
+ //TODO 增加 SagaActor 处理失败指标
}
return data;
}
@@ -531,4 +585,10 @@ public class SagaActor extends
}
}
}
+
+ private void log(BaseEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(event.toString());
+ }
+ }
}
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 98a58ce..664f692 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -90,7 +90,7 @@ akkaConfig:
acceptable-heartbeat-pause: 6s
seed-nodes: ["akka://alpha-cluster@127.0.0.1:8070"]
sharding:
- state-store-mode: "persistence"
+ state-store-mode: "ddata" #ddata,persistence
remember-entities: true
shard-failure-backoff: 5s
@@ -173,21 +173,24 @@ akkaConfig:
commit-timeout: 15s
commit-time-warning: 1s
commit-refresh-interval: infinite
- use-dispatcher: "akka.kafka.default-dispatcher"
+ use-dispatcher: "akka.kafka.saga-kafka"
kafka-clients.enable.auto.commit: false
wait-close-partition: 500ms
- position-timeout: 5s
- offset-for-times-timeout: 5s
- metadata-request-timeout: 5s
+ position-timeout: 10s
+ offset-for-times-timeout: 10s
+ metadata-request-timeout: 10s
eos-draining-check-interval: 30ms
partition-handler-warning: 5s
connection-checker.enable: false
connection-checker.max-retries: 3
connection-checker.check-interval: 15s
connection-checker.backoff-factor: 2.0
- max-batch: 1000
- max-interval: 10s
- parallelism: 1
+ saga-kafka:
+ type: "Dispatcher"
+ executor: "thread-pool-executor"
+ thread-pool-executor:
+ fixed-pool-size: 20
+
akka-persistence-redis:
redis: