You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2017/12/29 06:40:58 UTC

[incubator-servicecomb-saga] 02/05: SCB-98 supported compensation context from remote instead of local caching

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit e717f952fda1f52f64e9a2159a2f9faad152ae1a
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Dec 28 16:14:12 2017 +0800

    SCB-98 supported compensation context from remote instead of local caching
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/OmegaContext.java           | 63 ++++++----------------
 .../spring/CompensableAnnotationProcessor.java     | 10 +++-
 .../spring/CompensableMethodCheckingCallback.java  | 10 ++--
 .../spring/TransactionAspectConfig.java            |  4 +-
 .../spring/TransactionInterceptionTest.java        | 11 ++--
 .../saga/omega/transaction/MessageHandler.java     |  2 +-
 .../saga/omega/transaction/TransactionAspect.java  |  6 ---
 7 files changed, 42 insertions(+), 64 deletions(-)

diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
index 1fe8661..ccc3738 100644
--- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
@@ -20,8 +20,8 @@ package io.servicecomb.saga.omega.context;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +34,8 @@ public class OmegaContext {
   private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
   private final ThreadLocal<String> localTxId = new ThreadLocal<>();
   private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
-  private final Map<String, Map<String, CompensationContext>> compensationContexts = new ConcurrentHashMap<>();
   private final IdGenerator<String> idGenerator;
+  private final Map<String, CompensationContext> compensationContexts = new HashMap<>();
 
   public OmegaContext(IdGenerator<String> idGenerator) {
     this.idGenerator = idGenerator;
@@ -77,50 +77,23 @@ public class OmegaContext {
     this.parentTxId.set(parentTxId);
   }
 
-  // TODO: 2017/12/23 remove this context entry by the end of its corresponding global tx
-  public void addContext(String globalTxId, String localTxId, Object target, String compensationMethod, Object... args) {
-    compensationContexts.computeIfAbsent(globalTxId, k -> new ConcurrentHashMap<>())
-        .put(localTxId, new CompensationContext(target, compensationMethod, args));
+  public void addCompensationContext(Method compensationMethod, Object target) {
+    compensationMethod.setAccessible(true);
+    compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
   }
 
-  public boolean containsContext(String globalTxId) {
-    return compensationContexts.containsKey(globalTxId);
-  }
-
-  public void compensate(String globalTxId) {
-    Map<String, CompensationContext> contexts = compensationContexts.remove(globalTxId);
-
-    for (CompensationContext compensationContext : contexts.values()) {
-      try {
-        invokeMethod(compensationContext);
-      } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
-        LOG.error(
-            "Pre-checking for compensate method " + compensationContext.compensationMethod
-                + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
-            e);
-      }
-    }
-  }
-
-  private void invokeMethod(CompensationContext compensationContext)
-      throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object[] payloads) {
+    CompensationContext compensationContext = compensationContexts.get(compensationMethod);
 
-    Method method = compensationContext.target
-        .getClass()
-        .getDeclaredMethod(compensationContext.compensationMethod, argClasses(compensationContext));
-    method.setAccessible(true);
-
-    method.invoke(compensationContext.target, compensationContext.args);
-  }
-
-  private Class<?>[] argClasses(CompensationContext compensationContext) {
-    Class<?>[] classes = new Class<?>[compensationContext.args.length];
-
-    for (int i = 0; i < compensationContext.args.length; i++) {
-      classes[i] = compensationContext.args[i].getClass();
+    try {
+      compensationContext.compensationMethod.invoke(compensationContext.target, payloads);
+      LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      LOG.error(
+          "Pre-checking for compensate method " + compensationContext.compensationMethod.toString()
+              + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
+          e);
     }
-
-    return classes;
   }
 
   @Override
@@ -134,13 +107,11 @@ public class OmegaContext {
 
   private static final class CompensationContext {
     private final Object target;
-    private final String compensationMethod;
-    private final Object[] args;
+    private final Method compensationMethod;
 
-    private CompensationContext(Object target, String compensationMethod, Object... args) {
+    private CompensationContext(Object target, Method compensationMethod) {
       this.target = target;
       this.compensationMethod = compensationMethod;
-      this.args = args;
     }
   }
 }
diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 81318ad..e97d4d3 100644
--- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -21,8 +21,16 @@ import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.util.ReflectionUtils;
 
+import io.servicecomb.saga.omega.context.OmegaContext;
+
 class CompensableAnnotationProcessor implements BeanPostProcessor {
 
+  private final OmegaContext omegaContext;
+
+  CompensableAnnotationProcessor(OmegaContext omegaContext) {
+    this.omegaContext = omegaContext;
+  }
+
   @Override
   public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
     checkMethod(bean);
@@ -35,6 +43,6 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
   }
 
   private void checkMethod(Object bean) {
-    ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean));
+    ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext));
   }
 }
diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 23352f4..64779d7 100644
--- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -24,16 +24,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.ReflectionUtils.MethodCallback;
 
+import io.servicecomb.saga.omega.context.OmegaContext;
 import io.servicecomb.saga.omega.transaction.OmegaException;
 import io.servicecomb.saga.omega.transaction.annotations.Compensable;
 
 class CompensableMethodCheckingCallback implements MethodCallback {
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Object bean;
+  private final OmegaContext omegaContext;
 
-  CompensableMethodCheckingCallback(Object bean) {
+  CompensableMethodCheckingCallback(Object bean, OmegaContext omegaContext) {
     this.bean = bean;
+    this.omegaContext = omegaContext;
   }
 
   @Override
@@ -45,7 +48,8 @@ class CompensableMethodCheckingCallback implements MethodCallback {
     String compensationMethod = method.getAnnotation(Compensable.class).compensationMethod();
 
     try {
-      bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+      Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+      omegaContext.addCompensationContext(signature, bean);
       LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
     } catch (NoSuchMethodException e) {
       throw new OmegaException(
diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 9fe5956..5982109 100644
--- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -35,7 +35,7 @@ public class TransactionAspectConfig {
   }
 
   @Bean
-  CompensableAnnotationProcessor compensableAnnotationProcessor() {
-    return new CompensableAnnotationProcessor();
+  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext) {
+    return new CompensableAnnotationProcessor(omegaContext);
   }
 }
diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 83e57b0..8b873b8 100644
--- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -106,15 +106,16 @@ public class TransactionInterceptionTest {
     User user = userService.add(new User(username, email));
 
     // another sub transaction to the same service within the same global transaction
-    omegaContext.newLocalTxId();
+    String localTxId = omegaContext.newLocalTxId();
     User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
 
-    messageHandler.onReceive("to be compensated".getBytes());
+    String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
+
+    messageHandler.onReceive(globalTxId, this.localTxId, compensationMethod, user);
+    messageHandler.onReceive(globalTxId, localTxId, compensationMethod, anotherUser);
 
     assertThat(userRepository.findOne(user.id()), is(nullValue()));
     assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
-
-    assertThat(omegaContext.containsContext(globalTxId), is(false));
   }
 
   private List<String> toString(List<byte[]> messages) {
@@ -158,7 +159,7 @@ public class TransactionInterceptionTest {
 
     @Bean
     MessageHandler handler(OmegaContext omegaContext) {
-      return bytes -> omegaContext.compensate(globalTxId);
+      return omegaContext::compensate;
     }
   }
 
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
index e954381..caf2da8 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
@@ -18,5 +18,5 @@
 package io.servicecomb.saga.omega.transaction;
 
 public interface MessageHandler {
-  void onReceive(byte[] message);
+  void onReceive(String globalTxId, String localTxId, String compensationMethod, Object... payloads);
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
index ecce0ee..e4de9c9 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -48,12 +48,6 @@ public class TransactionAspect {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
 
-    context.addContext(context.globalTxId(),
-        context.localTxId(),
-        joinPoint.getTarget(),
-        compensable.compensationMethod(),
-        joinPoint.getArgs());
-
     preIntercept(joinPoint);
     Object result = joinPoint.proceed();
     postIntercept();

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.