You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/08/31 00:43:25 UTC

[GitHub] WillemJiang closed pull request #283: SCB-876 Implement of Omega TCC coordinate command callback and refactor

WillemJiang closed pull request #283: SCB-876 Implement of Omega TCC coordinate command callback and refactor
URL: https://github.com/apache/incubator-servicecomb-saga/pull/283
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CallbackContext.java
similarity index 62%
rename from omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
rename to omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CallbackContext.java
index 0823240a..0fe2613c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CallbackContext.java
@@ -22,39 +22,37 @@
 import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CompensationContext {
+public class CallbackContext {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final Map<String, CompensationContextInternal> contexts = new ConcurrentHashMap<>();
+  private final Map<String, CallbackContextInternal> contexts = new ConcurrentHashMap<>();
   private final OmegaContext omegaContext;
 
-  public CompensationContext(OmegaContext omegaContext) {
+  public CallbackContext(OmegaContext omegaContext) {
     this.omegaContext = omegaContext;
   }
 
-  public void addCompensationContext(Method compensationMethod, Object target) {
+  public void addCallbackContext(Method compensationMethod, Object target) {
     compensationMethod.setAccessible(true);
-    contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
+    contexts.put(compensationMethod.toString(), new CallbackContextInternal(target, compensationMethod));
   }
 
-  public void apply(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
-    CompensationContextInternal contextInternal = contexts.get(compensationMethod);
-
+  public void apply(String globalTxId, String localTxId, String callbackMethod, Object... payloads) {
+    CallbackContextInternal contextInternal = contexts.get(callbackMethod);
     String oldGlobalTxId = omegaContext.globalTxId();
-    String oldLocalTxId= omegaContext.localTxId();
+    String oldLocalTxId = omegaContext.localTxId();
     try {
       omegaContext.setGlobalTxId(globalTxId);
       omegaContext.setLocalTxId(localTxId);
-      contextInternal.compensationMethod.invoke(contextInternal.target, payloads);
-      LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+      contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
+      LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
     } catch (IllegalAccessException | InvocationTargetException e) {
       LOG.error(
-          "Pre-checking for compensation method " + contextInternal.compensationMethod.toString()
-              + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
+          "Pre-checking for callback method " + contextInternal.callbackMethod.toString()
+              + " was somehow skipped, did you forget to configure callback method checking on service startup?",
           e);
     } finally {
       omegaContext.setGlobalTxId(oldGlobalTxId);
@@ -62,14 +60,14 @@ public void apply(String globalTxId, String localTxId, String compensationMethod
     }
   }
 
-  private static final class CompensationContextInternal {
+  private static final class CallbackContextInternal {
     private final Object target;
 
-    private final Method compensationMethod;
+    private final Method callbackMethod;
 
-    private CompensationContextInternal(Object target, Method compensationMethod) {
+    private CallbackContextInternal(Object target, Method callbackMethod) {
       this.target = target;
-      this.compensationMethod = compensationMethod;
+      this.callbackMethod = callbackMethod;
     }
   }
 }
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index 8b928682..4055a691 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,6 +17,9 @@
 
 package org.apache.servicecomb.saga.omega.context;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * OmegaContext holds the globalTxId and localTxId which are used to build the invocation map
  */
@@ -27,6 +30,12 @@
   private final ThreadLocal<String> globalTxId = new InheritableThreadLocal<>();
   private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>();
   private final IdGenerator<String> idGenerator;
+  private final ThreadLocal<Map<String, Object[]>> parameters = new InheritableThreadLocal<Map<String, Object[]>>() {
+    @Override
+    protected Map<String, Object[]> initialValue() {
+      return new HashMap<>();
+    }
+  };
 
   public OmegaContext(IdGenerator<String> idGenerator) {
     this.idGenerator = idGenerator;
@@ -60,9 +69,18 @@ public String localTxId() {
     return localTxId.get();
   }
 
+  public void setParameters(String localTxId, Object ... parameters) {
+    this.parameters.get().put(localTxId, parameters);
+  }
+
+  public Object[] parameters(String localTxId) {
+    return parameters.get().get(localTxId);
+  }
+
   public void clear() {
     globalTxId.remove();
     localTxId.remove();
+    parameters.remove();
   }
 
   @Override
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 217e102f..2db09dd4 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -18,9 +18,12 @@
 package org.apache.servicecomb.saga.omega.spring;
 
 import com.google.common.collect.ImmutableList;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
 import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
+import org.apache.servicecomb.saga.omega.connector.grpc.GrpcTccEventService;
 import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
-import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -29,6 +32,7 @@
 import org.apache.servicecomb.saga.omega.format.MessageFormat;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
@@ -48,9 +52,14 @@ OmegaContext omegaContext(@Qualifier("omegaUniqueIdGenerator") IdGenerator<Strin
     return new OmegaContext(idGenerator);
   }
 
-  @Bean
-  CompensationContext compensationContext(OmegaContext omegaContext) {
-    return new CompensationContext(omegaContext);
+  @Bean(name = {"compensationContext"})
+  CallbackContext compensationContext(OmegaContext omegaContext) {
+    return new CallbackContext(omegaContext);
+  }
+
+  @Bean(name = {"coordinateContext"})
+  CallbackContext coordinateContext(OmegaContext omegaContext) {
+    return new CallbackContext(omegaContext);
   }
 
   @Bean
@@ -99,4 +108,13 @@ public void run() {
     }));
     return sender;
   }
+
+  // TODO should integrate with loadBalance message sender in future.
+  @Bean
+  TccEventService tccEventService(ServiceConfig serviceConfig,
+      @Lazy org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler coordinateMessageHandler,
+      @Value("${alpha.cluster.address:localhost:8080}") String address) {
+    ManagedChannel channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
+    return new GrpcTccEventService(serviceConfig, channel, address, coordinateMessageHandler);
+  }
 }
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 7d7d45ff..790394f7 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -17,7 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
-import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
@@ -27,9 +27,9 @@
 
   private final OmegaContext omegaContext;
 
-  private final CompensationContext compensationContext;
+  private final CallbackContext compensationContext;
 
-  CompensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
+  CompensableAnnotationProcessor(OmegaContext omegaContext, CallbackContext compensationContext) {
     this.omegaContext = omegaContext;
     this.compensationContext = compensationContext;
   }
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 90d8b060..e9bbd42e 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -17,26 +17,14 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
-import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
-
-import org.apache.servicecomb.saga.omega.context.CompensationContext;
-import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.ReflectionUtils.MethodCallback;
-
-class CompensableMethodCheckingCallback implements MethodCallback {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final Object bean;
+class CompensableMethodCheckingCallback extends MethodCheckingCallback {
 
-  private final CompensationContext compensationContext;
-
-  CompensableMethodCheckingCallback(Object bean, CompensationContext compensationContext) {
-    this.bean = bean;
-    this.compensationContext = compensationContext;
+  public CompensableMethodCheckingCallback(Object bean, CallbackContext callbackContext) {
+    super(bean, callbackContext, "compensation");
   }
 
   @Override
@@ -44,21 +32,7 @@ public void doWith(Method method) throws IllegalArgumentException {
     if (!method.isAnnotationPresent(Compensable.class)) {
       return;
     }
-
     String compensationMethod = method.getAnnotation(Compensable.class).compensationMethod();
-
-    try {
-      compensationContext.addCompensationContext(method, bean);
-
-      if (!compensationMethod.isEmpty()) {
-        Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
-        compensationContext.addCompensationContext(signature, bean);
-        LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
-      }
-    } catch (NoSuchMethodException e) {
-      throw new OmegaException(
-          "No such compensation method [" + compensationMethod + "] found in " + bean.getClass().getCanonicalName(),
-          e);
-    }
+    loadMethodContext(method, compensationMethod);
   }
 }
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
index 9ef937ea..07a9466b 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -27,8 +27,6 @@
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
 import org.slf4j.Logger;
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/MethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/MethodCheckingCallback.java
new file mode 100644
index 00000000..b5aa9887
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/MethodCheckingCallback.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.ReflectionUtils.MethodCallback;
+
+public abstract class MethodCheckingCallback implements MethodCallback {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Object bean;
+
+  private final CallbackContext callbackContext;
+
+  private final String callbackType;
+
+  public MethodCheckingCallback(Object bean, CallbackContext callbackContext, String callbackType) {
+    this.bean = bean;
+    this.callbackContext = callbackContext;
+    this.callbackType = callbackType;
+  }
+
+  protected void loadMethodContext(Method method, String ... candidates) {
+    for (String each : candidates) {
+      try {
+        Method signature = bean.getClass().getDeclaredMethod(each, method.getParameterTypes());
+        callbackContext.addCallbackContext(signature, bean);
+        LOG.debug("Found callback method [{}] in {}", each, bean.getClass().getCanonicalName());
+      } catch (NoSuchMethodException ex) {
+        throw new OmegaException(
+            "No such " + callbackType + " method [" + each + "] found in " + bean.getClass().getCanonicalName(), ex);
+      }
+    }
+  }
+}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ParticipateAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ParticipateAnnotationProcessor.java
new file mode 100644
index 00000000..5cef9ff4
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ParticipateAnnotationProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.util.ReflectionUtils;
+
+class ParticipateAnnotationProcessor implements BeanPostProcessor {
+
+  private final OmegaContext omegaContext;
+
+  private final CallbackContext callbackContext;
+
+  ParticipateAnnotationProcessor(OmegaContext omegaContext, CallbackContext callbackContext) {
+    this.omegaContext = omegaContext;
+    this.callbackContext = callbackContext;
+  }
+
+  @Override
+  public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+    checkMethod(bean);
+    checkFields(bean);
+    return bean;
+  }
+
+  @Override
+  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+    return bean;
+  }
+
+  private void checkMethod(Object bean) {
+    ReflectionUtils.doWithMethods(
+        bean.getClass(),
+        new ParticipateMethodCheckingCallback(bean, callbackContext));
+  }
+
+  private void checkFields(Object bean) {
+    ReflectionUtils.doWithFields(bean.getClass(), new ExecutorFieldCallback(bean, omegaContext));
+  }
+}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ParticipateMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ParticipateMethodCheckingCallback.java
new file mode 100644
index 00000000..b704f898
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ParticipateMethodCheckingCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import java.lang.reflect.Method;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
+
+public class ParticipateMethodCheckingCallback extends MethodCheckingCallback {
+
+  public ParticipateMethodCheckingCallback(Object bean, CallbackContext callbackContext) {
+    super(bean, callbackContext, "coordinate");
+  }
+
+  @Override
+  public void doWith(Method method) throws IllegalArgumentException {
+    if (!method.isAnnotationPresent(Participate.class)) {
+      return;
+    }
+    String confirmMethod = method.getAnnotation(Participate.class).confirmMethod();
+    String cancelMethod = method.getAnnotation(Participate.class).cancelMethod();
+    loadMethodContext(method, confirmMethod, cancelMethod);
+  }
+}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 4fd41881..1516394d 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -17,13 +17,18 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
-import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.CompensationMessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.SagaStartAspect;
 import org.apache.servicecomb.saga.omega.transaction.TransactionAspect;
+import org.apache.servicecomb.saga.omega.transaction.tcc.CoordinateMessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccParticipatorAspect;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccStartAspect;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
@@ -34,7 +39,8 @@
 public class TransactionAspectConfig {
 
   @Bean
-  MessageHandler messageHandler(MessageSender sender, CompensationContext context, OmegaContext omegaContext) {
+  MessageHandler messageHandler(MessageSender sender,
+      @Qualifier("compensationContext") CallbackContext context, OmegaContext omegaContext) {
     return new CompensationMessageHandler(sender, context);
   }
 
@@ -52,7 +58,33 @@ TransactionAspect transactionAspect(MessageSender sender, OmegaContext context)
 
   @Bean
   CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext,
-      CompensationContext compensationContext) {
+      @Qualifier("compensationContext") CallbackContext compensationContext) {
     return new CompensableAnnotationProcessor(omegaContext, compensationContext);
   }
+
+  @Bean
+  org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler coordinateMessageHandler(
+      TccEventService tccEventService,
+      @Qualifier("coordinateContext") CallbackContext coordinateContext,
+      OmegaContext omegaContext) {
+    return new CoordinateMessageHandler(tccEventService, coordinateContext, omegaContext);
+  }
+
+  @Order(0)
+  @Bean
+  TccStartAspect tccStartAspect(TccEventService tccEventService, OmegaContext context) {
+    return new TccStartAspect(tccEventService, context);
+  }
+
+  @Order(1)
+  @Bean
+  TccParticipatorAspect tccParticipatorAspect(TccEventService tccEventService, OmegaContext context) {
+    return new TccParticipatorAspect(tccEventService, context);
+  }
+
+  @Bean
+  ParticipateAnnotationProcessor participateAnnotationProcessor(OmegaContext omegaContext,
+      @Qualifier("coordinateContext") CallbackContext coordinateContext) {
+    return new ParticipateAnnotationProcessor(omegaContext, coordinateContext);
+  }
 }
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
new file mode 100644
index 00000000..66512482
--- /dev/null
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MessageConfig.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.mockito.Mockito;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MessageConfig {
+  private final List<String> messages = new ArrayList<>();
+
+  @Bean
+  IdGenerator<String> idGenerator() {
+    return Mockito.mock(IdGenerator.class);
+  }
+
+  @Bean(name = "compensationContext")
+  CallbackContext recoveryCompensationContext(OmegaContext omegaContext) {
+    return new CallbackContext(omegaContext);
+  }
+
+  @Bean(name = "coordinateContext")
+  CallbackContext coordinateContext(OmegaContext omegaContext) {
+    return new CallbackContext(omegaContext);
+  }
+
+  @Bean
+  OmegaContext omegaContext(IdGenerator<String> idGenerator) {
+    return new OmegaContext(idGenerator);
+  }
+
+  @Bean
+  List<String> messages() {
+    return messages;
+  }
+
+  @Bean
+  MessageSender sender() {
+    return new MessageSender() {
+      @Override
+      public void onConnected() {
+
+      }
+
+      @Override
+      public void onDisconnected() {
+
+      }
+
+      @Override
+      public void close() {
+
+      }
+
+      @Override
+      public String target() {
+        return "UNKNOW";
+      }
+
+      @Override
+      public AlphaResponse send(TxEvent event) {
+        messages.add(event.toString());
+        return new AlphaResponse(false);
+      }
+    };
+  }
+
+  @Bean
+  TccEventService tccEventService() {
+    return new TccEventService() {
+      @Override
+      public void onConnected() {
+      }
+
+      @Override
+      public void onDisconnected() {
+
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public String target() {
+        return "UNKNOWN";
+      }
+
+      @Override
+      public AlphaResponse participate(ParticipatedEvent participateEvent) {
+        messages.add(participateEvent.toString());
+        return new AlphaResponse(false);
+      }
+
+      @Override
+      public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
+        messages.add(tccStartEvent.toString());
+        return new AlphaResponse(false);
+      }
+
+      @Override
+      public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
+        messages.add(tccEndEvent.toString());
+        return new AlphaResponse(false);
+      }
+
+      @Override
+      public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+        messages.add(coordinatedEvent.toString());
+        return new AlphaResponse(false);
+      }
+    };
+  }
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccInterceptorTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccInterceptorTest.java
new file mode 100644
index 00000000..34f52e60
--- /dev/null
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccInterceptorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
+import static org.assertj.core.util.IterableUtil.toArray;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
+public class TccInterceptorTest {
+
+  private static final String globalTxId = UUID.randomUUID().toString();
+  private final String newLocalTxId = UUID.randomUUID().toString();
+  private final String anotherLocalTxId = UUID.randomUUID().toString();
+  private final String username = uniquify("username");
+  private final String email = uniquify("email");
+
+  private final User user = new User(username, email);
+  private final User illegalUser = new User(ILLEGAL_USER, email);
+
+  private final String usernameJack = uniquify("Jack");
+  private final User jack = new User(usernameJack, uniquify("jack@gmail.com"));
+
+  @Autowired
+  private IdGenerator<String> idGenerator;
+
+  @Autowired
+  private List<String> messages;
+
+  @Autowired
+  private OmegaContext omegaContext;
+
+  @Autowired
+  private TccUserServiceMain tccUserServiceMain;
+
+  @Autowired
+  private UserRepository userRepository;
+
+  private String confirmMethod;
+
+  private String cancelMethod;
+
+  @Autowired
+  private org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler coordinateMessageHandler;
+
+  @Before
+  public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(globalTxId, newLocalTxId, anotherLocalTxId);
+//    omegaContext.setGlobalTxId(globalTxId);
+//    omegaContext.setLocalTxId(globalTxId);
+    confirmMethod = TccUserService.class.getDeclaredMethod("confirm", User.class).toString();
+    cancelMethod = TccUserService.class.getDeclaredMethod("cancel", User.class).toString();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    messages.clear();
+    userRepository.deleteAll();
+    omegaContext.clear();
+    tccUserServiceMain.resetCount();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+  }
+
+  @Test
+  public void tccWorkflowSucceed() {
+    tccUserServiceMain.add(user, jack);
+
+    // TODO could't get parameters, for OmegaContext has been cleared after sending TccEndedEvent synchronous.
+    // TODO CoordinateCommand will be send from alpha asynchronous.
+    omegaContext.setParameters(newLocalTxId, user);
+    omegaContext.setParameters(anotherLocalTxId, jack);
+    coordinateMessageHandler.onReceive(globalTxId, newLocalTxId, globalTxId, confirmMethod);
+    coordinateMessageHandler.onReceive(globalTxId, anotherLocalTxId, globalTxId, confirmMethod);
+
+    assertArrayEquals(
+        new String[] {
+            new TccStartedEvent(globalTxId, globalTxId).toString(),
+            new ParticipatedEvent(globalTxId, newLocalTxId, globalTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed).toString(),
+            new ParticipatedEvent(globalTxId, anotherLocalTxId, globalTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed).toString(),
+            new TccEndedEvent(globalTxId, globalTxId, TransactionStatus.Succeed).toString(),
+            new CoordinatedEvent(globalTxId, newLocalTxId, globalTxId, confirmMethod, TransactionStatus.Succeed).toString(),
+            new CoordinatedEvent(globalTxId, anotherLocalTxId, globalTxId, confirmMethod, TransactionStatus.Succeed).toString()
+        },
+        toArray(messages)
+    );
+
+    User result = userRepository.findByUsername(user.username());
+    assertThat(result.username(), is(user.username()));
+    assertThat(result.email(), is(user.email()));
+
+    result = userRepository.findByUsername(jack.username());
+    assertThat(result.username(), is(jack.username()));
+    assertThat(result.email(), is(jack.email()));
+  }
+
+  @Test
+  public void tccWorkflowFailed() {
+    try {
+      tccUserServiceMain.add(user, illegalUser);
+      expectFailing(IllegalArgumentException.class);
+    } catch (IllegalArgumentException ignored) {
+
+    }
+
+    // TODO could't get parameters, for OmegaContext has been cleared after sending TccEndedEvent synchronous.
+    // TODO CoordinateCommand will be send from alpha asynchronous.
+    omegaContext.setParameters(newLocalTxId, user);
+    omegaContext.setParameters(anotherLocalTxId, illegalUser);
+    coordinateMessageHandler.onReceive(globalTxId, newLocalTxId, globalTxId, cancelMethod);
+    assertArrayEquals(
+        new String[] {
+            new TccStartedEvent(globalTxId, globalTxId).toString(),
+            new ParticipatedEvent(globalTxId, newLocalTxId, globalTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed).toString(),
+            new ParticipatedEvent(globalTxId, anotherLocalTxId, globalTxId, confirmMethod, cancelMethod, TransactionStatus.Failed).toString(),
+            new TccEndedEvent(globalTxId, globalTxId, TransactionStatus.Failed).toString(),
+            new CoordinatedEvent(globalTxId, newLocalTxId, globalTxId, cancelMethod, TransactionStatus.Succeed).toString()
+        },
+        toArray(messages)
+    );
+
+    User result = userRepository.findByUsername(user.username());
+    assertThat(result, is(nullValue()));
+
+    result = userRepository.findByUsername(jack.username());
+    assertThat(result, is(nullValue()));
+  }
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccUserService.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccUserService.java
new file mode 100644
index 00000000..eefe7497
--- /dev/null
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccUserService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+class TccUserService {
+  static final String ILLEGAL_USER = "Illegal User";
+  private final UserRepository userRepository;
+
+  private int count = 0;
+
+  @Autowired
+  TccUserService(UserRepository userRepository) {
+    this.userRepository = userRepository;
+  }
+
+  void resetCount() {
+    this.count = 0;
+  }
+
+  @Participate(confirmMethod = "confirm", cancelMethod = "cancel")
+  User add(User user) {
+    if (ILLEGAL_USER.equals(user.username())) {
+      throw new IllegalArgumentException("User is illegal");
+    }
+    return userRepository.save(user);
+  }
+
+  void confirm(User user) {
+    userRepository.findByUsername(user.username());
+  }
+
+  void cancel(User user) {
+    userRepository.delete(user);
+  }
+
+
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccUserServiceMain.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccUserServiceMain.java
new file mode 100644
index 00000000..94e52325
--- /dev/null
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TccUserServiceMain.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import org.apache.servicecomb.saga.omega.context.annotations.TccStart;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TccUserServiceMain {
+
+  @Autowired
+  private TccUserService tccUserService;
+
+  void resetCount() {
+    tccUserService.resetCount();
+  }
+
+  @TccStart
+  void add(User A, User B) {
+    tccUserService.add(A);
+    tccUserService.add(B);
+  }
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 5f3f7081..6b66befb 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -33,55 +33,43 @@
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
+import akka.actor.AbstractLoggingActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.japi.Creator;
+import akka.japi.pf.FI.UnitApply;
+import io.reactivex.Flowable;
+import io.reactivex.functions.Consumer;
+import io.reactivex.schedulers.Schedulers;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
-import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
 import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
-import io.reactivex.Flowable;
-import io.reactivex.functions.Consumer;
-import io.reactivex.schedulers.Schedulers;
-import akka.actor.AbstractLoggingActor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.japi.Creator;
-import akka.japi.pf.FI.UnitApply;
-
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 @AutoConfigureMockMvc
 public class TransactionInterceptionTest {
   @SuppressWarnings("unchecked")
-  private static final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
   private static final String globalTxId = UUID.randomUUID().toString();
   private final String newLocalTxId = UUID.randomUUID().toString();
   private final String anotherLocalTxId = UUID.randomUUID().toString();
@@ -97,6 +85,9 @@
   @OmegaContextAware
   private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
 
+  @Autowired
+  private IdGenerator<String> idGenerator;
+
   @Autowired
   private List<String> messages;
 
@@ -362,6 +353,11 @@ public void passesOmegaContextAmongActors() throws Exception {
     actorSystem.terminate();
   }
 
+  @Test
+  public void tccWorkflowTest() {
+
+  }
+
   private void waitTillSavedUser(final String username) {
     await().atMost(1000, MILLISECONDS).until(new Callable<Boolean>() {
       @Override
@@ -402,55 +398,4 @@ public void apply(User user) throws Exception {
   private String[] toArray(List<String> messages) {
     return messages.toArray(new String[messages.size()]);
   }
-
-  @Configuration
-  static class MessageConfig {
-    private final List<String> messages = new ArrayList<>();
-
-    @Bean
-    CompensationContext recoveryContext(OmegaContext omegaContext) {
-      return new CompensationContext(omegaContext);
-    }
-
-    @Bean
-    OmegaContext omegaContext() {
-      return new OmegaContext(idGenerator);
-    }
-
-    @Bean
-    List<String> messages() {
-      return messages;
-    }
-
-    @Bean
-    MessageSender sender() {
-      return new MessageSender() {
-        @Override
-        public void onConnected() {
-
-        }
-
-        @Override
-        public void onDisconnected() {
-
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        @Override
-        public String target() {
-          return "UNKNOW";
-        }
-
-        @Override
-        public AlphaResponse send(TxEvent event) {
-          messages.add(event.toString());
-          return new AlphaResponse(false);
-        }
-      };
-    }
-  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index fe2eea5f..75501b9d 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -17,14 +17,14 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
 
 public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
 
-  private final CompensationContext context;
+  private final CallbackContext context;
 
-  public CompensationMessageHandler(MessageSender sender, CompensationContext context) {
+  public CompensationMessageHandler(MessageSender sender, CallbackContext context) {
     this.sender = sender;
     this.context = context;
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
index 7663e722..65f3cd58 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
@@ -18,19 +18,28 @@
 package org.apache.servicecomb.saga.omega.transaction.tcc;
 
 import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 
 public class CoordinateMessageHandler implements MessageHandler {
 
   private final TccEventService tccEventService;
 
-  public CoordinateMessageHandler(TccEventService tccEventService) {
+  private final CallbackContext callbackContext;
+
+  private final OmegaContext omegaContext;
+
+  public CoordinateMessageHandler(TccEventService tccEventService,
+      CallbackContext callbackContext, OmegaContext omegaContext) {
     this.tccEventService = tccEventService;
+    this.callbackContext = callbackContext;
+    this.omegaContext = omegaContext;
   }
 
   @Override
   public void onReceive(String globalTxId, String localTxId, String parentTxId, String methodName) {
-    //TODO Omega Call the service
+    callbackContext.apply(globalTxId, localTxId, methodName, omegaContext.parameters(localTxId));
     tccEventService.coordinate(new CoordinatedEvent(globalTxId, localTxId, parentTxId, methodName, TransactionStatus.Succeed));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
index a78d819f..6ad92531 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
@@ -19,14 +19,8 @@
 
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
-
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.apache.servicecomb.saga.omega.transaction.OmegaException;
-import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy;
-import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.aspectj.lang.ProceedingJoinPoint;
@@ -52,8 +46,8 @@ public TccParticipatorAspect(TccEventService tccEventService, OmegaContext conte
   Object advise(ProceedingJoinPoint joinPoint, Participate participate) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     String localTxId = context.localTxId();
-    String cancelMethod = participate.cancelMethod();
-    String confirmMethod = participate.confirmMethod();
+    String cancelMethod = callbackMethodSignature(joinPoint, participate.cancelMethod(), method);
+    String confirmMethod = callbackMethodSignature(joinPoint, participate.confirmMethod(), method);
     
     context.newLocalTxId();
     LOG.debug("Updated context {} for participate method {} ", context, method.toString());
@@ -75,4 +69,12 @@ Object advise(ProceedingJoinPoint joinPoint, Participate participate) throws Thr
       context.setLocalTxId(localTxId);
     }
   }
+
+  String callbackMethodSignature(ProceedingJoinPoint joinPoint, String callbackMethod, Method tryMethod) throws NoSuchMethodException {
+    return callbackMethod.isEmpty() ? "" :
+        joinPoint.getTarget()
+        .getClass()
+        .getDeclaredMethod(callbackMethod, tryMethod.getParameterTypes())
+        .toString();
+  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
index b770ff3c..2a801538 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
@@ -54,4 +54,15 @@ public String getMethodName() {
   public TransactionStatus getStatus() {
     return status;
   }
+
+  @Override
+  public String toString() {
+    return "CoordinatedEvent{" +
+        "globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", methodName='" + methodName + '\'' +
+        ", status=" + status +
+        '}';
+  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
index ac2dcb95..f55fd43e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
@@ -62,4 +62,16 @@ public String getCancelMethod() {
   public TransactionStatus getStatus() {
     return status;
   }
+
+  @Override
+  public String toString() {
+    return "ParticipatedEvent{" +
+        "globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", confirmMethod='" + confirmMethod + '\'' +
+        ", cancelMethod='" + cancelMethod + '\'' +
+        ", status=" + status +
+        '}';
+  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
index 31a187fd..3944fb14 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
@@ -42,4 +42,13 @@ public TccEndedEvent(String globalTxId, String localTxId,
     this.localTxId = localTxId;
     this.status = status;
   }
+
+  @Override
+  public String toString() {
+    return "TccEndedEvent{" +
+        "globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", status=" + status +
+        '}';
+  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
index 8449cb4e..7a1e7b3b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
@@ -31,4 +31,12 @@ public TccStartedEvent(String globalTxId, String localTxId) {
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
   }
+
+  @Override
+  public String toString() {
+    return "TccStartedEvent{" +
+        "globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        '}';
+  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index ab1f9881..b9edb073 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -27,7 +27,7 @@
 import java.util.List;
 
 import org.apache.servicecomb.saga.common.EventType;
-import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -68,7 +68,7 @@ public AlphaResponse send(TxEvent event) {
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
-  private final CompensationContext context = mock(CompensationContext.class);
+  private final CallbackContext context = mock(CallbackContext.class);
 
   private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
index b260295e..fb715054 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
@@ -26,6 +26,8 @@
 import java.util.List;
 
 import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.context.CallbackContext;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
@@ -33,6 +35,8 @@
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 
 public class CoordinateMessageHandlerTest {
   private final List<CoordinatedEvent> coordinatedEvents = new ArrayList<>();
@@ -85,7 +89,9 @@ public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
   private final String parentTxId = uniquify("parentTxId");
   private final String methodName= uniquify("Method");
 
-  private final CoordinateMessageHandler handler = new CoordinateMessageHandler(eventService);
+  private final CallbackContext callbackContext = Mockito.mock(CallbackContext.class);
+  private final OmegaContext omegaContext = Mockito.mock(OmegaContext.class);
+  private final CoordinateMessageHandler handler = new CoordinateMessageHandler(eventService, callbackContext, omegaContext);
 
   @Before
   public void setUp() {
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
index b3cd8064..c56bf7a4 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
@@ -18,16 +18,14 @@
 package org.apache.servicecomb.saga.omega.transaction.tcc;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -52,6 +50,8 @@
 
   private final List<ParticipatedEvent> participatedEvents = new ArrayList<>();
   private final AlphaResponse response = new AlphaResponse(false);
+  private String confirmMethod;
+  private String cancelMethod;
   private final TccEventService eventService = new TccEventService() {
     @Override
     public void onConnected() {
@@ -117,6 +117,9 @@ public void setUp() throws Exception {
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
+
+    confirmMethod = TccParticipatorAspectTest.class.getDeclaredMethod("confirmMethod").toString();
+    cancelMethod = TccParticipatorAspectTest.class.getDeclaredMethod("cancelMethod").toString();
   }
 
   @Test
@@ -130,8 +133,8 @@ public void participateMethodIsCalledSuccessed() throws Throwable {
     assertThat(participatedEvent.getParentTxId(), is(localTxId));
     assertThat(participatedEvent.getLocalTxId(), is(newLocalTxId));
     assertThat(participatedEvent.getStatus(), is(TransactionStatus.Succeed));
-    assertThat(participatedEvent.getCancelMethod(), is("cancelMethod"));
-    assertThat(participatedEvent.getConfirmMethod(), is("confirmMethod"));
+    assertThat(participatedEvent.getCancelMethod(), is(cancelMethod));
+    assertThat(participatedEvent.getConfirmMethod(), is(confirmMethod));
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
     assertThat(omegaContext.localTxId(), is(localTxId));
@@ -157,8 +160,8 @@ public void participateMethodIsCalledFailed()  throws Throwable {
     assertThat(participatedEvent.getParentTxId(), is(localTxId));
     assertThat(participatedEvent.getLocalTxId(), is(newLocalTxId));
     assertThat(participatedEvent.getStatus(), is(TransactionStatus.Failed));
-    assertThat(participatedEvent.getCancelMethod(), is("cancelMethod"));
-    assertThat(participatedEvent.getConfirmMethod(), is("confirmMethod"));
+    assertThat(participatedEvent.getCancelMethod(), is(cancelMethod));
+    assertThat(participatedEvent.getConfirmMethod(), is(confirmMethod));
 
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
@@ -169,4 +172,12 @@ private String doNothing() {
     return "doNothing";
   }
 
+  private String cancelMethod() {
+    return "cancelMethod";
+  }
+
+  private String confirmMethod() {
+    return "confirmMethod";
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services