You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/22 09:17:22 UTC

[incubator-servicecomb-saga] 07/08: SCB-818 Omega supports of TCC (WIP)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch SCB-665
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 0cc9678d4950062ed04bccef4eeea3b71d570564
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Tue Aug 21 10:25:13 2018 +0800

    SCB-818 Omega supports of TCC (WIP)
---
 .../omega/transaction/tcc/TccEventService.java     | 25 ++++++++++++++++++++-
 .../transaction/tcc/TccParticipatorAspect.java     | 14 ++++++++++--
 .../tcc/TccStartAnnotationProcessor.java           | 26 ++++++++++------------
 .../saga/omega/transaction/tcc/TccStartAspect.java |  4 ++--
 .../transaction/tcc/events/ParticipatedEvent.java  | 10 ++++++---
 .../transaction/tcc/events/TccEndedEvent.java      |  4 ++--
 .../transaction/tcc/events/TccStartedEvent.java    |  4 ++--
 7 files changed, 61 insertions(+), 26 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
index 1649620..ae011bc 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
@@ -1,4 +1,27 @@
 package org.apache.servicecomb.saga.omega.transaction.tcc;
 
-public class TccService {
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+public interface TccEventService {
+
+  void onConnected();
+
+  void onDisconnected();
+
+  void close();
+
+  String target();
+
+  AlphaResponse participate(ParticipatedEvent participateEvent);
+
+  AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent);
+
+  AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent);
+
+  AlphaResponse send(TxEvent event);
+  
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
index 02adac2..e64bc2a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction.tcc;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 
+import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
@@ -27,6 +28,7 @@ import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy;
 import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
@@ -39,9 +41,11 @@ public class TccParticipatorAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext context;
+  private final TccEventService tccEventService;
 
-  public TccParticipatorAspect(MessageSender sender, OmegaContext context) {
+  public TccParticipatorAspect(TccEventService tccEventService, OmegaContext context) {
     this.context = context;
+    this.tccEventService = tccEventService;
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Participate * *(..)) && @annotation(participate)")
@@ -49,6 +53,7 @@ public class TccParticipatorAspect {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     String localTxId = context.localTxId();
     String cancelMethod = participate.cancelMethod();
+    String confirmMethod = participate.confirmMethod();
     
     context.newLocalTxId();
     LOG.debug("Updated context {} for participate method {} ", context, method.toString());
@@ -56,13 +61,18 @@ public class TccParticipatorAspect {
     try {
       Object result = joinPoint.proceed();
       // Send the participate message back
+      tccEventService.participate(new ParticipatedEvent(context.globalTxId(), context.localTxId(), localTxId, cancelMethod, confirmMethod,
+          TransactionStatus.Succeed));
       LOG.debug("Participate Transaction with context {} has finished.", context);
-
       return result;
     } catch (Throwable throwable) {
       // Now we don't handle the error message
+      tccEventService.participate(new ParticipatedEvent(context.globalTxId(), context.localTxId(), localTxId, cancelMethod,
+          confirmMethod, TransactionStatus.Failed));
       LOG.error("Participate Transaction with context {} failed.", context, throwable);
       throw throwable;
+    } finally {
+      context.setLocalTxId(localTxId);
     }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
index b93373f..26621d1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
@@ -18,30 +18,30 @@ package org.apache.servicecomb.saga.omega.transaction.tcc;
 
 import javax.transaction.TransactionalException;
 
+import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.EventAwareInterceptor;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
-import org.apache.servicecomb.saga.omega.transaction.SagaEndedEvent;
-import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
 
 public class TccStartAnnotationProcessor implements EventAwareInterceptor {
 
   private final OmegaContext omegaContext;
-  private final MessageSender sender;
+  private final TccEventService eventService;
 
-  TccStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) {
+  TccStartAnnotationProcessor(OmegaContext omegaContext, TccEventService eventService) {
     this.omegaContext = omegaContext;
-    this.sender = sender;
+    this.eventService = eventService;
   }
 
   @Override
   public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
       int retries, Object... message) {
     try {
-      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
+      return eventService.TccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
@@ -49,17 +49,15 @@ public class TccStartAnnotationProcessor implements EventAwareInterceptor {
 
   @Override
   public void postIntercept(String parentTxId, String compensationMethod) {
-    // Send the confirm event
-    /*AlphaResponse response = sender.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
-    if (response.aborted()) {
-      throw new OmegaException("transaction " + parentTxId + " is aborted");
-    }*/
+    eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+        TransactionStatus.Succeed));
   }
 
   @Override
   public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
     // Send the cancel event
-    String globalTxId = omegaContext.globalTxId();
-    sender.send(new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod, throwable));
+    // Do we need to wait for the alpha finish all the transaction
+    eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+        TransactionStatus.Failed));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
index 90728e3..f6d1d77 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
@@ -39,9 +39,9 @@ public class TccStartAspect {
 
   private final OmegaContext context;
 
-  public TccStartAspect(MessageSender sender, OmegaContext context) {
+  public TccStartAspect(TccEventService tccEventServicer, OmegaContext context) {
     this.context = context;
-    this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, sender);
+    this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, tccEventServicer);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart * *(..)) && @annotation(tccStart)")
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
index 73ede9c..3372f8e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
@@ -17,21 +17,25 @@
 package org.apache.servicecomb.saga.omega.transaction.tcc.events;
 
 
-public class ParticipateEvent {
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public class ParticipatedEvent {
   
   private final String globalTxId;
   private final String localTxId;
   private final String parentTxId;
   private final String confirmMethod;
   private final String cancelMethod;
+  private final TransactionStatus status;
 
 
-  public ParticipateEvent(String globalTxId, String localTxId, String parentTxId, String confirmMethod,
-      String cancelMethod) {
+  public ParticipatedEvent(String globalTxId, String localTxId, String parentTxId, String confirmMethod,
+      String cancelMethod, TransactionStatus status) {
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
     this.parentTxId = parentTxId;
     this.confirmMethod = confirmMethod;
     this.cancelMethod = cancelMethod;
+    this.status = status;
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
index 9107ab0..7c666b2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
@@ -18,13 +18,13 @@ package org.apache.servicecomb.saga.omega.transaction.tcc.events;
 
 import org.apache.servicecomb.saga.common.TransactionStatus;
 
-public class TccEndEvent {
+public class TccEndedEvent {
   private final String globalTxId;
   private final String localTxId;
   private final TransactionStatus status;
    
 
-  public TccEndEvent(String globalTxId, String localTxId,
+  public TccEndedEvent(String globalTxId, String localTxId,
       TransactionStatus status) {
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
index 64db3ea..edd0333 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
@@ -16,7 +16,7 @@
  */
 package org.apache.servicecomb.saga.omega.transaction.tcc.events;
 
-public class TccStartEvent {
+public class TccStartedEvent {
   private final String globalTxId;
   private final String localTxId;
 
@@ -31,7 +31,7 @@ public class TccStartEvent {
 
 
 
-  public TccStartEvent(String globalTxId, String localTxId) {
+  public TccStartedEvent(String globalTxId, String localTxId) {
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
   }