You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2020/06/23 23:39:58 UTC
[servicecomb-pack] 02/04: SCB-2004 Use message-driven compensation
retry instead of recursive compensation
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 8e0afb719be61701f358a31bd841a5a1e4c09da6
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jun 22 23:55:21 2020 +0800
SCB-2004 Use message-driven compensation retry instead of recursive compensation
---
.../fsm/event/internal/ComponsitedCheckEvent.java | 37 +++++
.../servicecomb/pack/alpha/fsm/SagaActor.java | 154 +++++++++++++--------
.../pack/alpha/fsm/domain/UpdateTxEventDomain.java | 7 +
.../pack/alpha/fsm/model/TxEntities.java | 8 ++
.../pack/alpha/server/fsm/GrpcOmegaCallback.java | 2 +-
5 files changed, 151 insertions(+), 57 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
index 467952f..63ab314 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/internal/ComponsitedCheckEvent.java
@@ -17,10 +17,17 @@
package org.apache.servicecomb.pack.alpha.core.fsm.event.internal;
+import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
public class ComponsitedCheckEvent extends TxEvent {
+ private TxState preState;
+
+ public TxState getPreState() {
+ return preState;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -32,6 +39,36 @@ public class ComponsitedCheckEvent extends TxEvent {
private Builder() {
txComponsitedEvent = new ComponsitedCheckEvent();
}
+
+ public Builder serviceName(String serviceName) {
+ txComponsitedEvent.setServiceName(serviceName);
+ return this;
+ }
+
+ public Builder instanceId(String instanceId) {
+ txComponsitedEvent.setInstanceId(instanceId);
+ return this;
+ }
+
+ public Builder parentTxId(String parentTxId) {
+ txComponsitedEvent.setParentTxId(parentTxId);
+ return this;
+ }
+
+ public Builder localTxId(String localTxId) {
+ txComponsitedEvent.setLocalTxId(localTxId);
+ return this;
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ txComponsitedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public Builder preState(TxState txState) {
+ txComponsitedEvent.preState = txState;
+ return this;
+ }
public ComponsitedCheckEvent build() {
return txComponsitedEvent;
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 017ee0e..9033683 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -32,23 +32,23 @@ import java.util.concurrent.TimeoutException;
import org.apache.servicecomb.pack.alpha.core.AlphaException;
import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
@@ -239,27 +239,50 @@ public class SagaActor extends
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
- self().tell(ComponsitedCheckEvent.builder().build(), self());
+ self().tell(ComponsitedCheckEvent.builder()
+ .serviceName(event.getServiceName())
+ .instanceId(event.getInstanceId())
+ .globalTxId(event.getGlobalTxId())
+ .localTxId(event.getLocalTxId())
+ .parentTxId(event.getParentTxId())
+ .preState(TxState.COMPENSATED_SUCCEED)
+ .build(), self());
}));
}
).event(TxCompensateAckFailedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
- self().tell(ComponsitedCheckEvent.builder().build(), self());
+ self().tell(ComponsitedCheckEvent.builder()
+ .serviceName(event.getServiceName())
+ .instanceId(event.getInstanceId())
+ .globalTxId(event.getGlobalTxId())
+ .localTxId(event.getLocalTxId())
+ .parentTxId(event.getParentTxId())
+ .preState(TxState.COMPENSATED_FAILED)
+ .build(), self());
}));
}
).event(CompensateAckTimeoutEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
- self().tell(ComponsitedCheckEvent.builder().build(), self());
+ self().tell(ComponsitedCheckEvent.builder()
+ .serviceName(event.getServiceName())
+ .instanceId(event.getInstanceId())
+ .globalTxId(event.getGlobalTxId())
+ .localTxId(event.getLocalTxId())
+ .parentTxId(event.getParentTxId())
+ .preState(TxState.COMPENSATED_FAILED)
+ .build(), self());
}));
}
).event(ComponsitedCheckEvent.class, SagaData.class,
(event, data) -> {
- if (data.getTxEntities().hasCompensationSentTx() || !data.isTerminated()) {
- return stay();
+ if (data.getTxEntities().hasCompensationSentTx() ||
+ data.getTxEntities().hasCompensationFailedTx()) {
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
+ return stay().applying(domainEvent);
} else {
if(data.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED);
@@ -272,7 +295,6 @@ public class SagaActor extends
}
).event(SagaAbortedEvent.class, SagaData.class,
(event, data) -> {
- data.setTerminated(true);
if (data.getTxEntities().hasCommittedTx()) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
return stay()
@@ -299,7 +321,7 @@ public class SagaActor extends
return stay().applying(domainEvent).andThen(exec(_data -> {
TxEntity txEntity = _data.getTxEntities().get(event.getLocalTxId());
// call compensate
- compensation(txEntity, _data);
+ compensation(domainEvent, txEntity, _data);
}));
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
@@ -340,7 +362,7 @@ public class SagaActor extends
whenUnhandled(
matchAnyEvent((event, data) -> {
if (event instanceof BaseEvent){
- LOG.error("Unhandled event {}", event);
+ LOG.debug("Unhandled event {}", event);
}
return stay();
})
@@ -443,6 +465,7 @@ public class SagaActor extends
@Override
public SagaData applyEvent(DomainEvent event, SagaData data) {
+ LOG.debug("apply domain event {}", event.getEvent());
try{
if (this.recoveryRunning()) {
LOG.info("recovery {}",event.getEvent());
@@ -493,40 +516,58 @@ public class SagaActor extends
data.getTxEntities().forEachReverse((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
// call compensate
- compensation(v, data);
+ if (!compensation(domainEvent, v, data)) {
+ return;
+ }
}
});
} else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
- // decrement the compensation running counter by one
data.getCompensationRunningCounter().decrementAndGet();
- txEntity.setState(domainEvent.getState());
- LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
+ txEntity.setState(TxState.COMPENSATED_SUCCEED);
+ LOG.info("compensate is succeed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId());
} else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) {
data.getCompensationRunningCounter().decrementAndGet();
- txEntity.setState(domainEvent.getState());
+ txEntity.setState(TxState.COMPENSATED_FAILED);
txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
- LOG.info("compensation is failed {}", txEntity.getLocalTxId());
+ if (txEntity.getReverseRetries() > 0 &&
+ txEntity.getRetriesCounter().get() < txEntity.getReverseRetries()) {
+ data.getTxEntities().forEachReverse((k, v) -> {
+ if (v.getState() == TxState.COMMITTED || v.getState() == TxState.COMPENSATED_FAILED) {
+ // call compensate
+ if (!compensation(domainEvent, v, data)){
+ return;
+ }
+ }
+ });
+ } else {
+ data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
+ self().tell(ComponsitedCheckEvent.builder()
+ .serviceName(txEntity.getServiceName())
+ .instanceId(txEntity.getInstanceId())
+ .globalTxId(txEntity.getGlobalTxId())
+ .localTxId(txEntity.getLocalTxId())
+ .preState(TxState.COMPENSATED_FAILED)
+ .parentTxId(txEntity.getParentTxId()).build(), self());
+ }
}
} else if (event instanceof SagaEndedDomain) {
SagaEndedDomain domainEvent = (SagaEndedDomain) event;
if (domainEvent.getState() == SagaActorState.FAILED) {
- data.setTerminated(true);
data.getTxEntities().forEachReverse((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
// call compensate
- compensation(v, data);
+ if (!compensation(domainEvent, v, data)){
+ return;
+ }
}
});
} else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
- data.setTerminated(true);
data.setSuspendedType(domainEvent.getSuspendedType());
} else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
- data.setTerminated(true);
} else if (domainEvent.getState() == SagaActorState.COMMITTED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
- data.setTerminated(true);
}
}
}catch (Exception ex){
@@ -558,22 +599,29 @@ public class SagaActor extends
}
//call omega compensate method
- private void compensation(TxEntity txEntity, SagaData data) {
+ private boolean compensation(DomainEvent event, TxEntity txEntity, SagaData data) {
// increments the compensation running counter by one
data.getCompensationRunningCounter().incrementAndGet();
txEntity.setState(TxState.COMPENSATION_SENT);
try {
+ LOG.info("compensate {} {} [{}] {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getGlobalTxId(), txEntity.getLocalTxId());
SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
- LOG.info("compensate {} {} {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getLocalTxId());
- } catch (AlphaException ex) {
- LOG.error(ex.getMessage(), ex);
- try {
- Thread.sleep(txEntity.getRetryDelayInMilliseconds());
- } catch (InterruptedException e) {
- LOG.error(e.getMessage(), e);
- }
- compensation(txEntity, data);
} catch (Exception ex) {
+ LOG.error("compensate failed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId(), ex);
+ if (txEntity.getReverseRetries() > 0 &&
+ txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
+ LOG.info("Retry compensate {}/{} [{}] {} after {} ms",
+ txEntity.getRetriesCounter().get() + 1,
+ txEntity.getReverseRetries(),
+ txEntity.getGlobalTxId(),
+ txEntity.getLocalTxId(),
+ txEntity.getRetryDelayInMilliseconds());
+ try {
+ Thread.sleep(txEntity.getRetryDelayInMilliseconds());
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
if (ex instanceof TimeoutException) {
StringWriter writer = new StringWriter();
ex.printStackTrace(new PrintWriter(writer));
@@ -581,7 +629,7 @@ public class SagaActor extends
if (stackTrace.length() > Environment.getInstance().getPayloadsMaxLength()) {
stackTrace = stackTrace.substring(0, Environment.getInstance().getPayloadsMaxLength());
}
- CompensateAckTimeoutEvent event = CompensateAckTimeoutEvent.builder()
+ CompensateAckTimeoutEvent compensateAckTimeoutEvent = CompensateAckTimeoutEvent.builder()
.createTime(new Date(System.currentTimeMillis()))
.globalTxId(txEntity.getGlobalTxId())
.parentTxId(txEntity.getParentTxId())
@@ -590,26 +638,20 @@ public class SagaActor extends
.instanceId(txEntity.getInstanceId())
.payloads(stackTrace.getBytes())
.build();
- self().tell(event, self());
+ self().tell(compensateAckTimeoutEvent, self());
}
- LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
- if (txEntity.getReverseRetries() > 0) {
- // which means the retry number
- if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
- LOG.info("Retry compensate {}/{} after {} ms", txEntity.getRetriesCounter().get() + 1, txEntity.getReverseRetries(),
- txEntity.getRetryDelayInMilliseconds());
- try {
- Thread.sleep(txEntity.getRetryDelayInMilliseconds());
- } catch (InterruptedException e) {
- LOG.error(e.getMessage(), e);
- }
- compensation(txEntity, data);
- } else {
- data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
- }
- } else {
- data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
+ if (ex instanceof AlphaException) {
+ self().tell(TxCompensateAckFailedEvent.builder()
+ .serviceName(txEntity.getServiceName())
+ .instanceId(txEntity.getInstanceId())
+ .globalTxId(txEntity.getGlobalTxId())
+ .localTxId(txEntity.getLocalTxId())
+ .parentTxId(txEntity.getParentTxId())
+ .payloads(ex.getMessage().getBytes())
+ .build(), self());
}
+ return false;
}
+ return true;
}
}
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 97d3b02..6cd9d00 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -24,6 +24,7 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEv
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
public class UpdateTxEventDomain implements DomainEvent {
private String localTxId;
@@ -63,6 +64,12 @@ public class UpdateTxEventDomain implements DomainEvent {
this.state = TxState.COMPENSATED_FAILED;
}
+ public UpdateTxEventDomain(ComponsitedCheckEvent event) {
+ this.event = event;
+ this.localTxId = event.getLocalTxId();
+ this.state = event.getPreState();
+ }
+
public String getLocalTxId() {
return localTxId;
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
index a4531d5..8ba72fd 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
@@ -67,4 +67,12 @@ public class TxEntities {
.filter(map -> map.getValue().getState() == TxState.COMPENSATION_SENT)
.count() > 0;
}
+
+ public boolean hasCompensationFailedTx() {
+ return entities.entrySet().stream()
+ .filter(map -> map.getValue().getState() == TxState.COMPENSATED_FAILED
+ && map.getValue().getReverseRetries() > 0
+ && map.getValue().getReverseRetries() > map.getValue().getRetriesCounter().get())
+ .count() > 0;
+ }
}
\ No newline at end of file
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
index 027046d..dcc7eea 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
@@ -55,7 +55,7 @@ class GrpcOmegaCallback implements OmegaCallback {
if (compensateAckCountDownLatch.getType() == CompensateAckType.Disconnected) {
throw new CompensateConnectException("Omega connect exception");
}else{
- LOG.info("compensate ack "+ compensateAckCountDownLatch.getType().name());
+ LOG.debug("compensate ack "+ compensateAckCountDownLatch.getType().name());
if(compensateAckCountDownLatch.getType() == CompensateAckType.Failed){
throw new CompensateAckFailedException("An exception is thrown inside the compensation method");
}