You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/12/10 02:33:36 UTC
[servicecomb-pack] 03/13: SCB-1627 CallbackContext send the
compensation result event to Alpha
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit ca52f12cff38375ec18b0fd573ea02472bce3085
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Sat Nov 30 18:32:18 2019 +0800
SCB-1627 CallbackContext send the compensation result event to Alpha
---
.../apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java | 8 ++++----
.../servicecomb/pack/omega/transaction/spring/MessageConfig.java | 8 ++++----
.../servicecomb/pack/omega/transaction/CallbackContext.java | 6 +++++-
.../main/java/org/apache/servicecomb/pack/common/EventType.java | 4 +++-
4 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
index e1c2ad5..1fd3648 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSpringConfig.java
@@ -72,13 +72,13 @@ class OmegaSpringConfig {
}
@Bean(name = {"compensationContext"})
- CallbackContext compensationContext(OmegaContext omegaContext) {
- return new CallbackContext(omegaContext);
+ CallbackContext compensationContext(OmegaContext omegaContext, SagaMessageSender sender) {
+ return new CallbackContext(omegaContext, sender);
}
@Bean(name = {"coordinateContext"})
- CallbackContext coordinateContext(OmegaContext omegaContext) {
- return new CallbackContext(omegaContext);
+ CallbackContext coordinateContext(OmegaContext omegaContext, SagaMessageSender sender) {
+ return new CallbackContext(omegaContext, sender);
}
@Bean
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/MessageConfig.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/MessageConfig.java
index 6fd2d91..240b441 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/MessageConfig.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/MessageConfig.java
@@ -44,13 +44,13 @@ public class MessageConfig {
}
@Bean(name = "compensationContext")
- CallbackContext recoveryCompensationContext(OmegaContext omegaContext) {
- return new CallbackContext(omegaContext);
+ CallbackContext recoveryCompensationContext(OmegaContext omegaContext, SagaMessageSender sender) {
+ return new CallbackContext(omegaContext, sender);
}
@Bean(name = "coordinateContext")
- CallbackContext coordinateContext(OmegaContext omegaContext) {
- return new CallbackContext(omegaContext);
+ CallbackContext coordinateContext(OmegaContext omegaContext, SagaMessageSender sender) {
+ return new CallbackContext(omegaContext, sender);
}
@Bean
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
index c483480..c069ac9 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
@@ -31,9 +31,11 @@ public class CallbackContext {
private final Map<String, CallbackContextInternal> contexts = new ConcurrentHashMap<>();
private final OmegaContext omegaContext;
+ private final SagaMessageSender sender;
- public CallbackContext(OmegaContext omegaContext) {
+ public CallbackContext(OmegaContext omegaContext, SagaMessageSender sender) {
this.omegaContext = omegaContext;
+ this.sender = sender;
}
public void addCallbackContext(String key, Method compensationMethod, Object target) {
@@ -50,7 +52,9 @@ public class CallbackContext {
omegaContext.setLocalTxId(localTxId);
contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+ sender.send(new TxCompensateAckSucceedEvent(omegaContext.globalTxId(),omegaContext.localTxId(),omegaContext.globalTxId()));
} catch (IllegalAccessException | InvocationTargetException e) {
+ sender.send(new TxCompensateAckFailedEvent(omegaContext.globalTxId(),omegaContext.localTxId(),omegaContext.globalTxId()));
LOG.error(
"Pre-checking for callback method " + contextInternal.callbackMethod.toString()
+ " was somehow skipped, did you forget to configure callback method checking on service startup?",
diff --git a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
index e8c88a8..e585d0c 100644
--- a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
+++ b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
@@ -26,5 +26,7 @@ public enum EventType {
SagaEndedEvent,
SagaAbortedEvent,
SagaTimeoutEvent,
- TxCompensateEvent
+ TxCompensateEvent,
+ TxCompensateAckFailedEvent,
+ TxCompensateAckSucceedEvent
}