You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2017/12/29 06:40:58 UTC
[incubator-servicecomb-saga] 02/05: SCB-98 supported compensation
context from remote instead of local caching
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit e717f952fda1f52f64e9a2159a2f9faad152ae1a
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Dec 28 16:14:12 2017 +0800
SCB-98 supported compensation context from remote instead of local caching
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/omega/context/OmegaContext.java | 63 ++++++----------------
.../spring/CompensableAnnotationProcessor.java | 10 +++-
.../spring/CompensableMethodCheckingCallback.java | 10 ++--
.../spring/TransactionAspectConfig.java | 4 +-
.../spring/TransactionInterceptionTest.java | 11 ++--
.../saga/omega/transaction/MessageHandler.java | 2 +-
.../saga/omega/transaction/TransactionAspect.java | 6 ---
7 files changed, 42 insertions(+), 64 deletions(-)
diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
index 1fe8661..ccc3738 100644
--- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
@@ -20,8 +20,8 @@ package io.servicecomb.saga.omega.context;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +34,8 @@ public class OmegaContext {
private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
private final ThreadLocal<String> localTxId = new ThreadLocal<>();
private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
- private final Map<String, Map<String, CompensationContext>> compensationContexts = new ConcurrentHashMap<>();
private final IdGenerator<String> idGenerator;
+ private final Map<String, CompensationContext> compensationContexts = new HashMap<>();
public OmegaContext(IdGenerator<String> idGenerator) {
this.idGenerator = idGenerator;
@@ -77,50 +77,23 @@ public class OmegaContext {
this.parentTxId.set(parentTxId);
}
- // TODO: 2017/12/23 remove this context entry by the end of its corresponding global tx
- public void addContext(String globalTxId, String localTxId, Object target, String compensationMethod, Object... args) {
- compensationContexts.computeIfAbsent(globalTxId, k -> new ConcurrentHashMap<>())
- .put(localTxId, new CompensationContext(target, compensationMethod, args));
+ public void addCompensationContext(Method compensationMethod, Object target) {
+ compensationMethod.setAccessible(true);
+ compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
}
- public boolean containsContext(String globalTxId) {
- return compensationContexts.containsKey(globalTxId);
- }
-
- public void compensate(String globalTxId) {
- Map<String, CompensationContext> contexts = compensationContexts.remove(globalTxId);
-
- for (CompensationContext compensationContext : contexts.values()) {
- try {
- invokeMethod(compensationContext);
- } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
- LOG.error(
- "Pre-checking for compensate method " + compensationContext.compensationMethod
- + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
- e);
- }
- }
- }
-
- private void invokeMethod(CompensationContext compensationContext)
- throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ public void compensate(String globalTxId, String localTxId, String compensationMethod, Object[] payloads) {
+ CompensationContext compensationContext = compensationContexts.get(compensationMethod);
- Method method = compensationContext.target
- .getClass()
- .getDeclaredMethod(compensationContext.compensationMethod, argClasses(compensationContext));
- method.setAccessible(true);
-
- method.invoke(compensationContext.target, compensationContext.args);
- }
-
- private Class<?>[] argClasses(CompensationContext compensationContext) {
- Class<?>[] classes = new Class<?>[compensationContext.args.length];
-
- for (int i = 0; i < compensationContext.args.length; i++) {
- classes[i] = compensationContext.args[i].getClass();
+ try {
+ compensationContext.compensationMethod.invoke(compensationContext.target, payloads);
+ LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ LOG.error(
+ "Pre-checking for compensate method " + compensationContext.compensationMethod.toString()
+ + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
+ e);
}
-
- return classes;
}
@Override
@@ -134,13 +107,11 @@ public class OmegaContext {
private static final class CompensationContext {
private final Object target;
- private final String compensationMethod;
- private final Object[] args;
+ private final Method compensationMethod;
- private CompensationContext(Object target, String compensationMethod, Object... args) {
+ private CompensationContext(Object target, Method compensationMethod) {
this.target = target;
this.compensationMethod = compensationMethod;
- this.args = args;
}
}
}
diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 81318ad..e97d4d3 100644
--- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -21,8 +21,16 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.util.ReflectionUtils;
+import io.servicecomb.saga.omega.context.OmegaContext;
+
class CompensableAnnotationProcessor implements BeanPostProcessor {
+ private final OmegaContext omegaContext;
+
+ CompensableAnnotationProcessor(OmegaContext omegaContext) {
+ this.omegaContext = omegaContext;
+ }
+
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
checkMethod(bean);
@@ -35,6 +43,6 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
}
private void checkMethod(Object bean) {
- ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean));
+ ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext));
}
}
diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 23352f4..64779d7 100644
--- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -24,16 +24,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils.MethodCallback;
+import io.servicecomb.saga.omega.context.OmegaContext;
import io.servicecomb.saga.omega.transaction.OmegaException;
import io.servicecomb.saga.omega.transaction.annotations.Compensable;
class CompensableMethodCheckingCallback implements MethodCallback {
- private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Object bean;
+ private final OmegaContext omegaContext;
- CompensableMethodCheckingCallback(Object bean) {
+ CompensableMethodCheckingCallback(Object bean, OmegaContext omegaContext) {
this.bean = bean;
+ this.omegaContext = omegaContext;
}
@Override
@@ -45,7 +48,8 @@ class CompensableMethodCheckingCallback implements MethodCallback {
String compensationMethod = method.getAnnotation(Compensable.class).compensationMethod();
try {
- bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+ Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+ omegaContext.addCompensationContext(signature, bean);
LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
} catch (NoSuchMethodException e) {
throw new OmegaException(
diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 9fe5956..5982109 100644
--- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -35,7 +35,7 @@ public class TransactionAspectConfig {
}
@Bean
- CompensableAnnotationProcessor compensableAnnotationProcessor() {
- return new CompensableAnnotationProcessor();
+ CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext) {
+ return new CompensableAnnotationProcessor(omegaContext);
}
}
diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 83e57b0..8b873b8 100644
--- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -106,15 +106,16 @@ public class TransactionInterceptionTest {
User user = userService.add(new User(username, email));
// another sub transaction to the same service within the same global transaction
- omegaContext.newLocalTxId();
+ String localTxId = omegaContext.newLocalTxId();
User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
- messageHandler.onReceive("to be compensated".getBytes());
+ String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
+
+ messageHandler.onReceive(globalTxId, this.localTxId, compensationMethod, user);
+ messageHandler.onReceive(globalTxId, localTxId, compensationMethod, anotherUser);
assertThat(userRepository.findOne(user.id()), is(nullValue()));
assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
-
- assertThat(omegaContext.containsContext(globalTxId), is(false));
}
private List<String> toString(List<byte[]> messages) {
@@ -158,7 +159,7 @@ public class TransactionInterceptionTest {
@Bean
MessageHandler handler(OmegaContext omegaContext) {
- return bytes -> omegaContext.compensate(globalTxId);
+ return omegaContext::compensate;
}
}
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
index e954381..caf2da8 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
@@ -18,5 +18,5 @@
package io.servicecomb.saga.omega.transaction;
public interface MessageHandler {
- void onReceive(byte[] message);
+ void onReceive(String globalTxId, String localTxId, String compensationMethod, Object... payloads);
}
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
index ecce0ee..e4de9c9 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -48,12 +48,6 @@ public class TransactionAspect {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
- context.addContext(context.globalTxId(),
- context.localTxId(),
- joinPoint.getTarget(),
- compensable.compensationMethod(),
- joinPoint.getArgs());
-
preIntercept(joinPoint);
Object result = joinPoint.proceed();
postIntercept();
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.