You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 01:27:25 UTC
[incubator-servicecomb-saga] 01/02: SCB-100 delegated executors to
proxy
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-100_async_support
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 1c5d313b55e03df03107b7d10e18f8e492c1dbe1
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 17:47:05 2018 +0800
SCB-100 delegated executors to proxy
Signed-off-by: seanyinx <se...@huawei.com>
---
.../spring/CompensableAnnotationProcessor.java | 5 +
.../transaction/spring/ExecutorFieldCallback.java | 163 +++++++++++++++++++++
.../spring/annotations/OmegaContextAware.java | 30 ++++
.../spring/CompensableAnnotationCheckingTest.java | 15 ++
.../spring/MisconfiguredAnnotation.java | 29 ++++
.../spring/TransactionInterceptionTest.java | 57 ++++---
6 files changed, 278 insertions(+), 21 deletions(-)
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 9cb5b27..87f9049 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -33,6 +33,7 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
checkMethod(bean);
+ checkFields(bean);
return bean;
}
@@ -44,4 +45,8 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
private void checkMethod(Object bean) {
ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext));
}
+
+ private void checkFields(Object bean) {
+ ReflectionUtils.doWithFields(bean.getClass(), new ExecutorFieldCallback(bean, omegaContext));
+ }
}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
new file mode 100644
index 0000000..3030fca
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.util.ReflectionUtils.FieldCallback;
+
+class ExecutorFieldCallback implements FieldCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OmegaContext omegaContext;
+ private final Object bean;
+
+ ExecutorFieldCallback(Object bean, OmegaContext omegaContext) {
+ this.omegaContext = omegaContext;
+ this.bean = bean;
+ }
+
+ @Override
+ public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
+ if (!field.isAnnotationPresent(OmegaContextAware.class)) {
+ return;
+ }
+
+ ReflectionUtils.makeAccessible(field);
+
+ Class<?> generic = field.getType();
+
+ if (!Executor.class.isAssignableFrom(generic)) {
+ throw new IllegalArgumentException(
+ "Only Executor, ExecutorService, and ScheduledExecutorService are supported for @"
+ + OmegaContextAware.class.getSimpleName());
+ }
+
+ field.set(bean, ExecutorProxy.newInstance(field.get(bean), field.getType(), omegaContext));
+ }
+
+ private static class RunnableProxy implements InvocationHandler {
+
+ private final String globalTxId;
+ private final String localTxId;
+ private final String parentTxId;
+ private final Object runnable;
+ private final OmegaContext omegaContext;
+
+ private static Object newInstance(Object runnable, OmegaContext omegaContext) {
+ RunnableProxy runnableProxy = new RunnableProxy(omegaContext, runnable);
+ return Proxy.newProxyInstance(
+ runnable.getClass().getClassLoader(),
+ runnable.getClass().getInterfaces(),
+ runnableProxy);
+ }
+
+ private RunnableProxy(OmegaContext omegaContext, Object runnable) {
+ this.omegaContext = omegaContext;
+ this.globalTxId = omegaContext.globalTxId();
+ this.localTxId = omegaContext.localTxId();
+ this.parentTxId = omegaContext.parentTxId();
+ this.runnable = runnable;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ try {
+ LOG.debug("Setting OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+ globalTxId,
+ localTxId,
+ parentTxId);
+
+ omegaContext.setGlobalTxId(globalTxId);
+ omegaContext.setLocalTxId(localTxId);
+ omegaContext.setParentTxId(parentTxId);
+
+ return method.invoke(runnable, args);
+ } finally {
+ omegaContext.clear();
+ LOG.debug("Cleared OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+ globalTxId,
+ localTxId,
+ parentTxId);
+ }
+ }
+ }
+
+ private static class ExecutorProxy implements InvocationHandler {
+ private final Object target;
+ private final OmegaContext omegaContext;
+
+ private ExecutorProxy(Object target, OmegaContext omegaContext) {
+ this.target = target;
+ this.omegaContext = omegaContext;
+ }
+
+ private static Object newInstance(Object target, Class<?> targetClass, OmegaContext omegaContext) {
+ Class<?>[] interfaces = targetClass.isInterface() ? new Class<?>[] {targetClass} : targetClass.getInterfaces();
+
+ return Proxy.newProxyInstance(
+ targetClass.getClassLoader(),
+ interfaces,
+ new ExecutorProxy(target, omegaContext));
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Object[] augmentedArgs = new Object[args.length];
+
+ for (int i = 0; i < args.length; i++) {
+ Object arg = args[i];
+ if (isExecutable(arg)) {
+ augmentedArgs[i] = RunnableProxy.newInstance(arg, omegaContext);
+ } else if (isCollectionOfExecutables(arg)) {
+ augmentedArgs[i] = ((Collection<?>) arg)
+ .stream()
+ .map(a -> RunnableProxy.newInstance(a, omegaContext))
+ .collect(Collectors.toList());
+ } else {
+ augmentedArgs[i] = arg;
+ }
+ }
+
+ return method.invoke(target, augmentedArgs);
+ }
+
+ private boolean isExecutable(Object arg) {
+ return arg instanceof Runnable || arg instanceof Callable;
+ }
+
+ private boolean isCollectionOfExecutables(Object arg) {
+ return arg instanceof Collection
+ && !((Collection<?>) arg).isEmpty()
+ && isExecutable(((Collection<?>) arg).iterator().next());
+ }
+ }
+}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
new file mode 100644
index 0000000..5a4e7e4
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring.annotations;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target({FIELD, METHOD})
+public @interface OmegaContextAware {
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
index 3b399b9..d2abfc9 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import org.junit.Test;
@@ -39,4 +40,18 @@ public class CompensableAnnotationCheckingTest {
assertThat(e.getCause().getMessage(), startsWith("No such compensation method [none]"));
}
}
+
+ @Test
+ public void blowsUpWhenAnnotationOnWrongType() throws Exception {
+ try {
+ try (ConfigurableApplicationContext ignored = new SpringApplicationBuilder(TransactionTestMain.class)
+ .profiles("omega-context-aware-checking")
+ .run()) {
+ expectFailing(BeanCreationException.class);
+ }
+ } catch (BeanCreationException e) {
+ assertThat(e.getCause().getMessage(),
+ is("Only Executor, ExecutorService, and ScheduledExecutorService are supported for @OmegaContextAware"));
+ }
+ }
}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
new file mode 100644
index 0000000..99459a5
--- /dev/null
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+
+@Configuration
+@Profile("omega-context-aware-checking")
+class MisconfiguredAnnotation {
+ @OmegaContextAware
+ private final User user = new User();
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 6de3d18..a2b7641 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
import static akka.actor.ActorRef.noSender;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -44,6 +44,7 @@ import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -67,13 +68,21 @@ import akka.actor.Props;
@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
@AutoConfigureMockMvc
public class TransactionInterceptionTest {
- private static final ExecutorService executor = Executors.newSingleThreadExecutor();
private static final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
private final String username = uniquify("username");
private final String email = uniquify("email");
+ private final User user = new User(username, email);
+ private final User illegalUser = new User(ILLEGAL_USER, email);
+
+ private final String usernameJack = uniquify("Jack");
+ private final User jack = new User(usernameJack, uniquify("jack@gmail.com"));
+
+ @OmegaContextAware
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
@Autowired
private List<String> messages;
@@ -108,12 +117,11 @@ public class TransactionInterceptionTest {
@AfterClass
public static void afterClass() throws Exception {
- executor.shutdown();
}
@Test
public void sendsUserToRemote_AroundTransaction() throws Exception {
- User user = userService.add(new User(username, email));
+ User user = userService.add(this.user);
assertArrayEquals(
new String[]{
@@ -129,9 +137,8 @@ public class TransactionInterceptionTest {
@Test
public void sendsAbortEvent_OnSubTransactionFailure() throws Exception {
Throwable throwable = null;
- User user = new User(ILLEGAL_USER, email);
try {
- userService.add(user);
+ userService.add(illegalUser);
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException ignored) {
throwable = ignored;
@@ -139,7 +146,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, illegalUser).toString(),
new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
@@ -147,11 +154,11 @@ public class TransactionInterceptionTest {
@Test
public void compensateOnTransactionException() throws Exception {
- User user = userService.add(new User(username, email));
+ User user = userService.add(this.user);
// another sub transaction to the same service within the same global transaction
String localTxId = omegaContext.newLocalTxId();
- User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
+ User anotherUser = userService.add(jack);
messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
@@ -174,38 +181,44 @@ public class TransactionInterceptionTest {
@Test
public void passesOmegaContextThroughDifferentThreads() throws Exception {
- User user = new User(username, email);
new Thread(() -> userService.add(user)).start();
+ waitTillSavedUser(username);
- await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ String newLocalTxId = omegaContext.newLocalTxId();
+ new Thread(() -> userService.add(jack)).start();
+ waitTillSavedUser(usernameJack);
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
toArray(messages)
);
}
@Test
public void passesOmegaContextInThreadPool() throws Exception {
- User user = new User(username, email);
executor.execute(() -> userService.add(user));
+ waitTillSavedUser(username);
- await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ String newLocalTxId = omegaContext.newLocalTxId();
+ executor.invokeAll(singletonList(() -> userService.add(jack)));
+ waitTillSavedUser(usernameJack);
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
toArray(messages)
);
}
@Test
public void passesOmegaContextThroughReactiveX() throws Exception {
- User user = new User(username, email);
-
Flowable.just(user)
.parallel()
.runOn(Schedulers.io())
@@ -213,7 +226,7 @@ public class TransactionInterceptionTest {
.sequential()
.subscribe();
- await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ waitTillSavedUser(username);
assertArrayEquals(
new String[]{
@@ -228,12 +241,10 @@ public class TransactionInterceptionTest {
// TODO: 2018/1/3 if actor system starts before omega context initialized, this test will fail
ActorSystem actorSystem = ActorSystem.create();
- User user = new User(username, email);
-
ActorRef actorRef = actorSystem.actorOf(UserServiceActor.props(userService));
actorRef.tell(user, noSender());
- await().atMost(1, SECONDS).until(() -> userRepository.findByUsername(username) != null);
+ waitTillSavedUser(username);
assertArrayEquals(
new String[] {
@@ -245,6 +256,10 @@ public class TransactionInterceptionTest {
actorSystem.terminate();
}
+ private void waitTillSavedUser(String username) {
+ await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ }
+
private static class UserServiceActor extends AbstractLoggingActor {
private final TransactionalUserService userService;
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.