You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2020/06/23 23:39:57 UTC
[servicecomb-pack] 01/04: SCB-2004 In Omega,
if CallbackContenxt does not contain callbackMethod,
send TxCompensateAckFailedEvent to Alpha instead of throwing NPE
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit c2d52e9c3cd2203e193bb5154d637fc6f7cb5203
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jun 22 19:35:54 2020 +0800
SCB-2004 In Omega, if CallbackContenxt does not contain callbackMethod, send TxCompensateAckFailedEvent to Alpha instead of throwing NPE
---
.../alpha/server/fsm/AlphaIntegrationFsmTest.java | 29 +++++++++---
.../alpha/server/fsm/OmegaEventSagaSimulator.java | 52 +++++++++++++++-------
.../pack/omega/transaction/CallbackContext.java | 29 ++++++++----
3 files changed, 80 insertions(+), 30 deletions(-)
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index aede177..d010728 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -30,14 +30,14 @@ import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
-import org.apache.servicecomb.pack.common.EventType;
import org.apache.servicecomb.pack.common.AlphaMetaKeys;
+import org.apache.servicecomb.pack.common.EventType;
import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.junit.After;
import org.junit.AfterClass;
@@ -480,7 +480,7 @@ public class AlphaIntegrationFsmTest {
final String localTxId_3 = UUID.randomUUID().toString();
final Map<String, OmegaCallback>[] omegaInstance = new Map[]{null};
final String[] serviceName = new String[1];
- omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEventsAndReverseRetries(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
if(event.getType().equals(EventType.TxAbortedEvent.name())){
//simulate omega disconnect
serviceName[0] = event.getServiceName();
@@ -489,17 +489,32 @@ public class AlphaIntegrationFsmTest {
}
omegaEventSender.getBlockingStub().onTxEvent(event);
});
- //simulate omega connected
await().atMost(5, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
return sagaData !=null && sagaData.getTxEntities().size()==3;
});
+
+ //simulate omega connected
omegaEventSender.getOmegaCallbacks().put(serviceName[0], omegaInstance[0]);
- waitAlphaCallCompensate(omegaEventSender, globalTxId, localTxId_1, localTxId_2);
- await().atMost(2, SECONDS).until(() -> {
+
+ await().atMost(10, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
- return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ if(sagaData != null){
+ if(sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED){
+ return true;
+ }else{
+ sagaData.getTxEntities().forEachReverse((k,v) -> {
+ if(v.getState() == TxState.COMPENSATION_SENT || v.getState() == TxState.COMPENSATED_FAILED){
+ omegaEventSender.getBlockingStub().onTxEvent(omegaEventSender.getOmegaEventSagaSimulator().getTxCompensateAckSucceedEvent(v.getGlobalTxId(),v.getLocalTxId()));
+ }
+ });
+ return false;
+ }
+ } else {
+ return false;
+ }
});
+
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
assertEquals(sagaData.getTxEntities().size(),3);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 13eb3f1..454f6df 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -65,7 +65,7 @@ public class OmegaEventSagaSimulator {
final int localTxId_1_ReverseTimeoutSecond = 2;
List<GrpcTxEvent> sagaEvents = new ArrayList<>();
sagaEvents.add(sagaStartedEvent(globalTxId));
- sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a", 0, localTxId_1_ReverseTimeoutSecond));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a", 0, localTxId_1_ReverseTimeoutSecond, 5));
sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
sagaEvents.add(txAbortedEvent(globalTxId, localTxId_2, globalTxId, NullPointerException.class.getName().getBytes(), "method b"));
@@ -87,6 +87,19 @@ public class OmegaEventSagaSimulator {
return sagaEvents;
}
+ public List<GrpcTxEvent> lastTxAbortedEventsAndReverseRetries(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(sagaStartedEvent(globalTxId));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b", 15, 0, 1000));
+ sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+ sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c"));
+ sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, NullPointerException.class.getName().getBytes(), "method c"));
+ sagaEvents.add(sagaAbortedEvent(globalTxId));
+ return sagaEvents;
+ }
+
public List<GrpcTxEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
List<GrpcTxEvent> sagaEvents = new ArrayList<>();
sagaEvents.add(sagaStartedEvent(globalTxId));
@@ -198,80 +211,87 @@ public class OmegaEventSagaSimulator {
private GrpcTxEvent sagaStartedEvent(String globalTxId) {
return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
private GrpcTxEvent sagaStartedEvent(String globalTxId, int timeout) {
return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
null, new byte[0], "", timeout, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
private GrpcTxEvent sagaEndedEvent(String globalTxId) {
return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
private GrpcTxEvent sagaAbortedEvent(String globalTxId) {
return eventOf(EventType.SagaAbortedEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
private GrpcTxEvent sagaTimeoutEvent(String globalTxId) {
return eventOf(EventType.SagaTimeoutEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
private GrpcTxEvent txStartedEvent(String globalTxId,
String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
parentTxId, payloads, compensationMethod, 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
private GrpcTxEvent txStartedEvent(String globalTxId,
- String localTxId, String parentTxId, byte[] payloads, String compensationMethod, int reverseRetries, int reverseTimeout) {
+ String localTxId, String parentTxId, byte[] payloads, String compensationMethod, int reverseRetries, int reverseTimeout, int retryDelayInMilliseconds) {
return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
parentTxId, payloads, compensationMethod, 0, "",
- 0, 0, reverseRetries, reverseTimeout);
+ 0, 0, reverseRetries, reverseTimeout, retryDelayInMilliseconds);
}
private GrpcTxEvent txEndedEvent(String globalTxId,
String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
return eventOf(EventType.TxEndedEvent, globalTxId, localTxId,
parentTxId, payloads, compensationMethod, 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
private GrpcTxEvent txAbortedEvent(String globalTxId,
String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
return eventOf(EventType.TxAbortedEvent, globalTxId, localTxId,
parentTxId, payloads, compensationMethod, 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
public GrpcTxEvent txCompensatedEvent(String globalTxId,
String localTxId, String parentTxId) {
return eventOf(EventType.TxCompensatedEvent, globalTxId, localTxId,
parentTxId, new byte[0], "", 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
public GrpcTxEvent txCompensateAckSucceedEvent(String globalTxId,
String localTxId, String parentTxId) {
return eventOf(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId,
parentTxId, new byte[0], "", 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0, 5);
}
public GrpcTxEvent txCompensateAckTimeoutEvent(String globalTxId,
String localTxId, String parentTxId) {
return eventOf(EventType.CompensateAckTimeoutEvent, globalTxId, localTxId,
parentTxId, new byte[0], "", 0, "",
- 0, 0, 0, 0);
+ 0, 0, 0, 0,5);
+ }
+
+ public GrpcTxEvent txCompensateAckFailedEvent(String globalTxId,
+ String localTxId, String parentTxId) {
+ return eventOf(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId,
+ parentTxId, new byte[0], "", 0, "",
+ 0, 0, 0, 0, 5);
}
private GrpcTxEvent eventOf(EventType eventType,
@@ -285,7 +305,8 @@ public class OmegaEventSagaSimulator {
int forwardRetries,
int forwardTimeout,
int reverseRetries,
- int reverseTimeout) {
+ int reverseTimeout,
+ int retryDelayInMilliseconds) {
return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
@@ -302,6 +323,7 @@ public class OmegaEventSagaSimulator {
.setRetryMethod(retryMethod)
.setForwardRetries(forwardRetries)
.setReverseRetries(reverseRetries)
+ .setRetryDelayInMilliseconds(retryDelayInMilliseconds)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
index 6a3bcb1..4402cd2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
@@ -44,19 +44,32 @@ public class CallbackContext {
}
public void apply(String globalTxId, String localTxId, String parentTxId, String callbackMethod, Object... payloads) {
- CallbackContextInternal contextInternal = contexts.get(callbackMethod);
String oldGlobalTxId = omegaContext.globalTxId();
String oldLocalTxId = omegaContext.localTxId();
try {
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(localTxId);
- contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
- if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
- sender.send(
- new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
- parentTxId, callbackMethod));
+ if (contexts.containsKey(callbackMethod)) {
+ CallbackContextInternal contextInternal = contexts.get(callbackMethod);
+ contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
+ if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
+ sender.send(
+ new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+ parentTxId, callbackMethod));
+ }
+ LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+ } else {
+ if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
+ String msg = "callback method " + callbackMethod
+ + " not found on CallbackContext, If it is starting, please try again later";
+ sender.send(
+ new TxCompensateAckFailedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+ parentTxId, callbackMethod, new Exception(msg)));
+ LOG.error(msg);
+ }else{
+ throw new NullPointerException();
+ }
}
- LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
} catch (IllegalAccessException | InvocationTargetException e) {
if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
sender.send(
@@ -64,7 +77,7 @@ public class CallbackContext {
parentTxId, callbackMethod, e));
}
LOG.error(
- "Pre-checking for callback method " + contextInternal.callbackMethod.toString()
+ "Pre-checking for callback method " + callbackMethod
+ " was somehow skipped, did you forget to configure callback method checking on service startup?",
e);
} finally {