You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/22 09:17:22 UTC
[incubator-servicecomb-saga] 07/08: SCB-818 Omega supports of TCC
(WIP)
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch SCB-665
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 0cc9678d4950062ed04bccef4eeea3b71d570564
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Tue Aug 21 10:25:13 2018 +0800
SCB-818 Omega supports of TCC (WIP)
---
.../omega/transaction/tcc/TccEventService.java | 25 ++++++++++++++++++++-
.../transaction/tcc/TccParticipatorAspect.java | 14 ++++++++++--
.../tcc/TccStartAnnotationProcessor.java | 26 ++++++++++------------
.../saga/omega/transaction/tcc/TccStartAspect.java | 4 ++--
.../transaction/tcc/events/ParticipatedEvent.java | 10 ++++++---
.../transaction/tcc/events/TccEndedEvent.java | 4 ++--
.../transaction/tcc/events/TccStartedEvent.java | 4 ++--
7 files changed, 61 insertions(+), 26 deletions(-)
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
index 1649620..ae011bc 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
@@ -1,4 +1,27 @@
package org.apache.servicecomb.saga.omega.transaction.tcc;
-public class TccService {
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+public interface TccEventService {
+
+ void onConnected();
+
+ void onDisconnected();
+
+ void close();
+
+ String target();
+
+ AlphaResponse participate(ParticipatedEvent participateEvent);
+
+ AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent);
+
+ AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent);
+
+ AlphaResponse send(TxEvent event);
+
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
index 02adac2..e64bc2a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction.tcc;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
+import org.apache.servicecomb.saga.common.TransactionStatus;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
@@ -27,6 +28,7 @@ import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy;
import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory;
import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
@@ -39,9 +41,11 @@ public class TccParticipatorAspect {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OmegaContext context;
+ private final TccEventService tccEventService;
- public TccParticipatorAspect(MessageSender sender, OmegaContext context) {
+ public TccParticipatorAspect(TccEventService tccEventService, OmegaContext context) {
this.context = context;
+ this.tccEventService = tccEventService;
}
@Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Participate * *(..)) && @annotation(participate)")
@@ -49,6 +53,7 @@ public class TccParticipatorAspect {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
String localTxId = context.localTxId();
String cancelMethod = participate.cancelMethod();
+ String confirmMethod = participate.confirmMethod();
context.newLocalTxId();
LOG.debug("Updated context {} for participate method {} ", context, method.toString());
@@ -56,13 +61,18 @@ public class TccParticipatorAspect {
try {
Object result = joinPoint.proceed();
// Send the participate message back
+ tccEventService.participate(new ParticipatedEvent(context.globalTxId(), context.localTxId(), localTxId, cancelMethod, confirmMethod,
+ TransactionStatus.Succeed));
LOG.debug("Participate Transaction with context {} has finished.", context);
-
return result;
} catch (Throwable throwable) {
// Now we don't handle the error message
+ tccEventService.participate(new ParticipatedEvent(context.globalTxId(), context.localTxId(), localTxId, cancelMethod,
+ confirmMethod, TransactionStatus.Failed));
LOG.error("Participate Transaction with context {} failed.", context, throwable);
throw throwable;
+ } finally {
+ context.setLocalTxId(localTxId);
}
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
index b93373f..26621d1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
@@ -18,30 +18,30 @@ package org.apache.servicecomb.saga.omega.transaction.tcc;
import javax.transaction.TransactionalException;
+import org.apache.servicecomb.saga.common.TransactionStatus;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.EventAwareInterceptor;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
-import org.apache.servicecomb.saga.omega.transaction.SagaEndedEvent;
-import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
public class TccStartAnnotationProcessor implements EventAwareInterceptor {
private final OmegaContext omegaContext;
- private final MessageSender sender;
+ private final TccEventService eventService;
- TccStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) {
+ TccStartAnnotationProcessor(OmegaContext omegaContext, TccEventService eventService) {
this.omegaContext = omegaContext;
- this.sender = sender;
+ this.eventService = eventService;
}
@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
int retries, Object... message) {
try {
- return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
+ return eventService.TccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
} catch (OmegaException e) {
throw new TransactionalException(e.getMessage(), e.getCause());
}
@@ -49,17 +49,15 @@ public class TccStartAnnotationProcessor implements EventAwareInterceptor {
@Override
public void postIntercept(String parentTxId, String compensationMethod) {
- // Send the confirm event
- /*AlphaResponse response = sender.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
- if (response.aborted()) {
- throw new OmegaException("transaction " + parentTxId + " is aborted");
- }*/
+ eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+ TransactionStatus.Succeed));
}
@Override
public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
// Send the cancel event
- String globalTxId = omegaContext.globalTxId();
- sender.send(new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, throwable));
+ // Do we need to wait for the alpha finish all the transaction
+ eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+ TransactionStatus.Failed));
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
index 90728e3..f6d1d77 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
@@ -39,9 +39,9 @@ public class TccStartAspect {
private final OmegaContext context;
- public TccStartAspect(MessageSender sender, OmegaContext context) {
+ public TccStartAspect(TccEventService tccEventServicer, OmegaContext context) {
this.context = context;
- this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, sender);
+ this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, tccEventServicer);
}
@Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart * *(..)) && @annotation(tccStart)")
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
index 73ede9c..3372f8e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
@@ -17,21 +17,25 @@
package org.apache.servicecomb.saga.omega.transaction.tcc.events;
-public class ParticipateEvent {
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public class ParticipatedEvent {
private final String globalTxId;
private final String localTxId;
private final String parentTxId;
private final String confirmMethod;
private final String cancelMethod;
+ private final TransactionStatus status;
- public ParticipateEvent(String globalTxId, String localTxId, String parentTxId, String confirmMethod,
- String cancelMethod) {
+ public ParticipatedEvent(String globalTxId, String localTxId, String parentTxId, String confirmMethod,
+ String cancelMethod, TransactionStatus status) {
this.globalTxId = globalTxId;
this.localTxId = localTxId;
this.parentTxId = parentTxId;
this.confirmMethod = confirmMethod;
this.cancelMethod = cancelMethod;
+ this.status = status;
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
index 9107ab0..7c666b2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
@@ -18,13 +18,13 @@ package org.apache.servicecomb.saga.omega.transaction.tcc.events;
import org.apache.servicecomb.saga.common.TransactionStatus;
-public class TccEndEvent {
+public class TccEndedEvent {
private final String globalTxId;
private final String localTxId;
private final TransactionStatus status;
- public TccEndEvent(String globalTxId, String localTxId,
+ public TccEndedEvent(String globalTxId, String localTxId,
TransactionStatus status) {
this.globalTxId = globalTxId;
this.localTxId = localTxId;
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
index 64db3ea..edd0333 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
@@ -16,7 +16,7 @@
*/
package org.apache.servicecomb.saga.omega.transaction.tcc.events;
-public class TccStartEvent {
+public class TccStartedEvent {
private final String globalTxId;
private final String localTxId;
@@ -31,7 +31,7 @@ public class TccStartEvent {
- public TccStartEvent(String globalTxId, String localTxId) {
+ public TccStartedEvent(String globalTxId, String localTxId) {
this.globalTxId = globalTxId;
this.localTxId = localTxId;
}