You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/05 01:15:50 UTC
[incubator-servicecomb-saga] branch master updated (b862131 ->
d38641a)
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.
from b862131 SCB-138 observer maintains its own services instead of using an extra map
new e2e63f3 SCB-100 async support for omega context
new 22f2150 SCB-100 added context checking on reactive
new 13bbe7b SCB-100 ensured context passing through akka actors
new 8428f35 SCB-100 cleaned up context after each test
new 887b6d8 SCB-100 delegated executors to proxy
new ece5ed6 SCB-100 more test coverage on executor proxy
new c74d8e7 SCB-100 extended timeout to avoid test failure
new d38641a SCB-100 minor refactoring
The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../saga/omega/context/OmegaContext.java | 12 +-
omega/omega-spring-tx/pom.xml | 30 ++++
.../spring/CompensableAnnotationProcessor.java | 5 +
.../transaction/spring/ExecutorFieldCallback.java | 167 ++++++++++++++++++++
.../spring/annotations/OmegaContextAware.java} | 15 +-
.../spring/CompensableAnnotationCheckingTest.java | 15 ++
...edService.java => MisconfiguredAnnotation.java} | 16 +-
.../spring/TransactionInterceptionTest.java | 170 ++++++++++++++++++---
.../omega/transaction/spring/UserRepository.java | 1 +
9 files changed, 389 insertions(+), 42 deletions(-)
create mode 100644 omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
copy omega/{omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java => omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java} (72%)
copy omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/{MisconfiguredService.java => MisconfiguredAnnotation.java} (75%)
--
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].
[incubator-servicecomb-saga] 02/08: SCB-100 added context checking
on reactive
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 22f2150c0ca759fa44e73db57c6ee42faf7b5db9
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 11:38:21 2018 +0800
SCB-100 added context checking on reactive
Signed-off-by: seanyinx <se...@huawei.com>
---
omega/omega-spring-tx/pom.xml | 6 ++++++
.../spring/TransactionInterceptionTest.java | 24 ++++++++++++++++++++++
2 files changed, 30 insertions(+)
diff --git a/omega/omega-spring-tx/pom.xml b/omega/omega-spring-tx/pom.xml
index 21661bb..815e080 100644
--- a/omega/omega-spring-tx/pom.xml
+++ b/omega/omega-spring-tx/pom.xml
@@ -76,6 +76,12 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>2.1.8</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 9ace53f..6dee1d8 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -54,6 +54,9 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
+import io.reactivex.Flowable;
+import io.reactivex.schedulers.Schedulers;
+
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
@AutoConfigureMockMvc
@@ -191,6 +194,27 @@ public class TransactionInterceptionTest {
);
}
+ @Test
+ public void passesOmegaContextThroughReactiveX() throws Exception {
+ User user = new User(username, email);
+
+ Flowable.just(user)
+ .parallel()
+ .runOn(Schedulers.io())
+ .doOnNext(userService::add)
+ .sequential()
+ .subscribe();
+
+ await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+ assertArrayEquals(
+ new String[]{
+ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ toArray(messages)
+ );
+ }
+
private String[] toArray(List<String> messages) {
return messages.toArray(new String[messages.size()]);
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.
[incubator-servicecomb-saga] 05/08: SCB-100 delegated executors to
proxy
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 887b6d8f6901deeaba6577a166f30a83d949f4a2
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 17:47:05 2018 +0800
SCB-100 delegated executors to proxy
Signed-off-by: seanyinx <se...@huawei.com>
---
.../spring/CompensableAnnotationProcessor.java | 5 +
.../transaction/spring/ExecutorFieldCallback.java | 163 +++++++++++++++++++++
.../spring/annotations/OmegaContextAware.java | 30 ++++
.../spring/CompensableAnnotationCheckingTest.java | 15 ++
.../spring/MisconfiguredAnnotation.java | 29 ++++
.../spring/TransactionInterceptionTest.java | 57 ++++---
6 files changed, 278 insertions(+), 21 deletions(-)
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 9cb5b27..87f9049 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -33,6 +33,7 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
checkMethod(bean);
+ checkFields(bean);
return bean;
}
@@ -44,4 +45,8 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
private void checkMethod(Object bean) {
ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext));
}
+
+ private void checkFields(Object bean) {
+ ReflectionUtils.doWithFields(bean.getClass(), new ExecutorFieldCallback(bean, omegaContext));
+ }
}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
new file mode 100644
index 0000000..3030fca
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.util.ReflectionUtils.FieldCallback;
+
+class ExecutorFieldCallback implements FieldCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OmegaContext omegaContext;
+ private final Object bean;
+
+ ExecutorFieldCallback(Object bean, OmegaContext omegaContext) {
+ this.omegaContext = omegaContext;
+ this.bean = bean;
+ }
+
+ @Override
+ public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
+ if (!field.isAnnotationPresent(OmegaContextAware.class)) {
+ return;
+ }
+
+ ReflectionUtils.makeAccessible(field);
+
+ Class<?> generic = field.getType();
+
+ if (!Executor.class.isAssignableFrom(generic)) {
+ throw new IllegalArgumentException(
+ "Only Executor, ExecutorService, and ScheduledExecutorService are supported for @"
+ + OmegaContextAware.class.getSimpleName());
+ }
+
+ field.set(bean, ExecutorProxy.newInstance(field.get(bean), field.getType(), omegaContext));
+ }
+
+ private static class RunnableProxy implements InvocationHandler {
+
+ private final String globalTxId;
+ private final String localTxId;
+ private final String parentTxId;
+ private final Object runnable;
+ private final OmegaContext omegaContext;
+
+ private static Object newInstance(Object runnable, OmegaContext omegaContext) {
+ RunnableProxy runnableProxy = new RunnableProxy(omegaContext, runnable);
+ return Proxy.newProxyInstance(
+ runnable.getClass().getClassLoader(),
+ runnable.getClass().getInterfaces(),
+ runnableProxy);
+ }
+
+ private RunnableProxy(OmegaContext omegaContext, Object runnable) {
+ this.omegaContext = omegaContext;
+ this.globalTxId = omegaContext.globalTxId();
+ this.localTxId = omegaContext.localTxId();
+ this.parentTxId = omegaContext.parentTxId();
+ this.runnable = runnable;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ try {
+ LOG.debug("Setting OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+ globalTxId,
+ localTxId,
+ parentTxId);
+
+ omegaContext.setGlobalTxId(globalTxId);
+ omegaContext.setLocalTxId(localTxId);
+ omegaContext.setParentTxId(parentTxId);
+
+ return method.invoke(runnable, args);
+ } finally {
+ omegaContext.clear();
+ LOG.debug("Cleared OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+ globalTxId,
+ localTxId,
+ parentTxId);
+ }
+ }
+ }
+
+ private static class ExecutorProxy implements InvocationHandler {
+ private final Object target;
+ private final OmegaContext omegaContext;
+
+ private ExecutorProxy(Object target, OmegaContext omegaContext) {
+ this.target = target;
+ this.omegaContext = omegaContext;
+ }
+
+ private static Object newInstance(Object target, Class<?> targetClass, OmegaContext omegaContext) {
+ Class<?>[] interfaces = targetClass.isInterface() ? new Class<?>[] {targetClass} : targetClass.getInterfaces();
+
+ return Proxy.newProxyInstance(
+ targetClass.getClassLoader(),
+ interfaces,
+ new ExecutorProxy(target, omegaContext));
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Object[] augmentedArgs = new Object[args.length];
+
+ for (int i = 0; i < args.length; i++) {
+ Object arg = args[i];
+ if (isExecutable(arg)) {
+ augmentedArgs[i] = RunnableProxy.newInstance(arg, omegaContext);
+ } else if (isCollectionOfExecutables(arg)) {
+ augmentedArgs[i] = ((Collection<?>) arg)
+ .stream()
+ .map(a -> RunnableProxy.newInstance(a, omegaContext))
+ .collect(Collectors.toList());
+ } else {
+ augmentedArgs[i] = arg;
+ }
+ }
+
+ return method.invoke(target, augmentedArgs);
+ }
+
+ private boolean isExecutable(Object arg) {
+ return arg instanceof Runnable || arg instanceof Callable;
+ }
+
+ private boolean isCollectionOfExecutables(Object arg) {
+ return arg instanceof Collection
+ && !((Collection<?>) arg).isEmpty()
+ && isExecutable(((Collection<?>) arg).iterator().next());
+ }
+ }
+}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
new file mode 100644
index 0000000..5a4e7e4
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring.annotations;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target({FIELD, METHOD})
+public @interface OmegaContextAware {
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
index 3b399b9..d2abfc9 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import org.junit.Test;
@@ -39,4 +40,18 @@ public class CompensableAnnotationCheckingTest {
assertThat(e.getCause().getMessage(), startsWith("No such compensation method [none]"));
}
}
+
+ @Test
+ public void blowsUpWhenAnnotationOnWrongType() throws Exception {
+ try {
+ try (ConfigurableApplicationContext ignored = new SpringApplicationBuilder(TransactionTestMain.class)
+ .profiles("omega-context-aware-checking")
+ .run()) {
+ expectFailing(BeanCreationException.class);
+ }
+ } catch (BeanCreationException e) {
+ assertThat(e.getCause().getMessage(),
+ is("Only Executor, ExecutorService, and ScheduledExecutorService are supported for @OmegaContextAware"));
+ }
+ }
}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
new file mode 100644
index 0000000..99459a5
--- /dev/null
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+
+@Configuration
+@Profile("omega-context-aware-checking")
+class MisconfiguredAnnotation {
+ @OmegaContextAware
+ private final User user = new User();
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 6de3d18..a2b7641 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
import static akka.actor.ActorRef.noSender;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -44,6 +44,7 @@ import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -67,13 +68,21 @@ import akka.actor.Props;
@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
@AutoConfigureMockMvc
public class TransactionInterceptionTest {
- private static final ExecutorService executor = Executors.newSingleThreadExecutor();
private static final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
private final String username = uniquify("username");
private final String email = uniquify("email");
+ private final User user = new User(username, email);
+ private final User illegalUser = new User(ILLEGAL_USER, email);
+
+ private final String usernameJack = uniquify("Jack");
+ private final User jack = new User(usernameJack, uniquify("jack@gmail.com"));
+
+ @OmegaContextAware
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
@Autowired
private List<String> messages;
@@ -108,12 +117,11 @@ public class TransactionInterceptionTest {
@AfterClass
public static void afterClass() throws Exception {
- executor.shutdown();
}
@Test
public void sendsUserToRemote_AroundTransaction() throws Exception {
- User user = userService.add(new User(username, email));
+ User user = userService.add(this.user);
assertArrayEquals(
new String[]{
@@ -129,9 +137,8 @@ public class TransactionInterceptionTest {
@Test
public void sendsAbortEvent_OnSubTransactionFailure() throws Exception {
Throwable throwable = null;
- User user = new User(ILLEGAL_USER, email);
try {
- userService.add(user);
+ userService.add(illegalUser);
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException ignored) {
throwable = ignored;
@@ -139,7 +146,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, illegalUser).toString(),
new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
@@ -147,11 +154,11 @@ public class TransactionInterceptionTest {
@Test
public void compensateOnTransactionException() throws Exception {
- User user = userService.add(new User(username, email));
+ User user = userService.add(this.user);
// another sub transaction to the same service within the same global transaction
String localTxId = omegaContext.newLocalTxId();
- User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
+ User anotherUser = userService.add(jack);
messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
@@ -174,38 +181,44 @@ public class TransactionInterceptionTest {
@Test
public void passesOmegaContextThroughDifferentThreads() throws Exception {
- User user = new User(username, email);
new Thread(() -> userService.add(user)).start();
+ waitTillSavedUser(username);
- await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ String newLocalTxId = omegaContext.newLocalTxId();
+ new Thread(() -> userService.add(jack)).start();
+ waitTillSavedUser(usernameJack);
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
toArray(messages)
);
}
@Test
public void passesOmegaContextInThreadPool() throws Exception {
- User user = new User(username, email);
executor.execute(() -> userService.add(user));
+ waitTillSavedUser(username);
- await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ String newLocalTxId = omegaContext.newLocalTxId();
+ executor.invokeAll(singletonList(() -> userService.add(jack)));
+ waitTillSavedUser(usernameJack);
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
toArray(messages)
);
}
@Test
public void passesOmegaContextThroughReactiveX() throws Exception {
- User user = new User(username, email);
-
Flowable.just(user)
.parallel()
.runOn(Schedulers.io())
@@ -213,7 +226,7 @@ public class TransactionInterceptionTest {
.sequential()
.subscribe();
- await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ waitTillSavedUser(username);
assertArrayEquals(
new String[]{
@@ -228,12 +241,10 @@ public class TransactionInterceptionTest {
// TODO: 2018/1/3 if actor system starts before omega context initialized, this test will fail
ActorSystem actorSystem = ActorSystem.create();
- User user = new User(username, email);
-
ActorRef actorRef = actorSystem.actorOf(UserServiceActor.props(userService));
actorRef.tell(user, noSender());
- await().atMost(1, SECONDS).until(() -> userRepository.findByUsername(username) != null);
+ waitTillSavedUser(username);
assertArrayEquals(
new String[] {
@@ -245,6 +256,10 @@ public class TransactionInterceptionTest {
actorSystem.terminate();
}
+ private void waitTillSavedUser(String username) {
+ await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ }
+
private static class UserServiceActor extends AbstractLoggingActor {
private final TransactionalUserService userService;
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.
[incubator-servicecomb-saga] 08/08: SCB-100 minor refactoring
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit d38641aa7794c0491d953c1e6a6396c02ac2fd29
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 4 11:51:27 2018 +0800
SCB-100 minor refactoring
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/omega/transaction/spring/ExecutorFieldCallback.java | 6 +++++-
.../saga/omega/transaction/spring/TransactionInterceptionTest.java | 3 ++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
index 3030fca..5ae949d 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -131,6 +131,10 @@ class ExecutorFieldCallback implements FieldCallback {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ return method.invoke(target, augmentRunnablesWithOmegaContext(args));
+ }
+
+ private Object[] augmentRunnablesWithOmegaContext(Object[] args) {
Object[] augmentedArgs = new Object[args.length];
for (int i = 0; i < args.length; i++) {
@@ -147,7 +151,7 @@ class ExecutorFieldCallback implements FieldCallback {
}
}
- return method.invoke(target, augmentedArgs);
+ return augmentedArgs;
}
private boolean isExecutable(Object arg) {
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 26b2de3..3dc8c73 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -217,6 +217,7 @@ public class TransactionInterceptionTest {
);
}
+ // TODO: 2018/1/4 reactive is not supported yet, omega context won't be updated on shared threads
@Test
public void passesOmegaContextThroughReactiveX() throws Exception {
Flowable.just(user)
@@ -236,9 +237,9 @@ public class TransactionInterceptionTest {
);
}
+ // TODO: 2018/1/4 actor system is not supported yet
@Test
public void passesOmegaContextAmongActors() throws Exception {
- // TODO: 2018/1/3 if actor system starts before omega context initialized, this test will fail
ActorSystem actorSystem = ActorSystem.create();
ActorRef actorRef = actorSystem.actorOf(UserServiceActor.props(userService));
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.
[incubator-servicecomb-saga] 04/08: SCB-100 cleaned up context
after each test
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 8428f35c71501cc165428a55eeb153637c65238a
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 14:22:20 2018 +0800
SCB-100 cleaned up context after each test
Signed-off-by: seanyinx <se...@huawei.com>
---
.../org/apache/servicecomb/saga/omega/context/OmegaContext.java | 6 ++++++
.../saga/omega/transaction/spring/TransactionInterceptionTest.java | 1 +
2 files changed, 7 insertions(+)
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index f336c4c..94de6ef 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -77,6 +77,12 @@ public class OmegaContext {
this.parentTxId.set(parentTxId);
}
+ public void clear() {
+ globalTxId.remove();
+ localTxId.remove();
+ parentTxId.remove();
+ }
+
public void addCompensationContext(Method compensationMethod, Object target) {
compensationMethod.setAccessible(true);
compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 253e636..6de3d18 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -103,6 +103,7 @@ public class TransactionInterceptionTest {
public void tearDown() throws Exception {
messages.clear();
userRepository.deleteAll();
+ omegaContext.clear();
}
@AfterClass
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.
[incubator-servicecomb-saga] 06/08: SCB-100 more test coverage on
executor proxy
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit ece5ed6d1677731b5ec4bb0d7026d2502df63c98
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 4 09:26:51 2018 +0800
SCB-100 more test coverage on executor proxy
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/omega/transaction/spring/TransactionInterceptionTest.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index a2b7641..9597c6c 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -32,8 +32,8 @@ import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
@@ -81,7 +81,7 @@ public class TransactionInterceptionTest {
private final User jack = new User(usernameJack, uniquify("jack@gmail.com"));
@OmegaContextAware
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
@Autowired
private List<String> messages;
@@ -200,7 +200,7 @@ public class TransactionInterceptionTest {
@Test
public void passesOmegaContextInThreadPool() throws Exception {
- executor.execute(() -> userService.add(user));
+ executor.schedule(() -> userService.add(user), 0, MILLISECONDS);
waitTillSavedUser(username);
String newLocalTxId = omegaContext.newLocalTxId();
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.
[incubator-servicecomb-saga] 03/08: SCB-100 ensured context passing
through akka actors
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 13bbe7bd6b991aef6de701d988d65e06760710ca
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 11:54:58 2018 +0800
SCB-100 ensured context passing through akka actors
Signed-off-by: seanyinx <se...@huawei.com>
---
omega/omega-spring-tx/pom.xml | 20 +++++++++
.../spring/TransactionInterceptionTest.java | 48 ++++++++++++++++++++++
2 files changed, 68 insertions(+)
diff --git a/omega/omega-spring-tx/pom.xml b/omega/omega-spring-tx/pom.xml
index 815e080..022f0ba 100644
--- a/omega/omega-spring-tx/pom.xml
+++ b/omega/omega-spring-tx/pom.xml
@@ -82,6 +82,26 @@
<version>2.1.8</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-slf4j_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 6dee1d8..253e636 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -17,9 +17,11 @@
package org.apache.servicecomb.saga.omega.transaction.spring;
+import static akka.actor.ActorRef.noSender;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -56,6 +58,10 @@ import org.springframework.test.context.junit4.SpringRunner;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
+import akka.actor.AbstractLoggingActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
@@ -96,6 +102,7 @@ public class TransactionInterceptionTest {
@After
public void tearDown() throws Exception {
messages.clear();
+ userRepository.deleteAll();
}
@AfterClass
@@ -215,6 +222,47 @@ public class TransactionInterceptionTest {
);
}
+ @Test
+ public void passesOmegaContextAmongActors() throws Exception {
+ // TODO: 2018/1/3 if actor system starts before omega context initialized, this test will fail
+ ActorSystem actorSystem = ActorSystem.create();
+
+ User user = new User(username, email);
+
+ ActorRef actorRef = actorSystem.actorOf(UserServiceActor.props(userService));
+ actorRef.tell(user, noSender());
+
+ await().atMost(1, SECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+ assertArrayEquals(
+ new String[] {
+ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ toArray(messages)
+ );
+
+ actorSystem.terminate();
+ }
+
+ private static class UserServiceActor extends AbstractLoggingActor {
+ private final TransactionalUserService userService;
+
+ private UserServiceActor(TransactionalUserService userService) {
+ this.userService = userService;
+ }
+
+ static Props props(TransactionalUserService userService) {
+ return Props.create(UserServiceActor.class, () -> new UserServiceActor(userService));
+ }
+
+ @Override
+ public Receive createReceive() {
+ return receiveBuilder()
+ .match(User.class, userService::add)
+ .build();
+ }
+ }
+
private String[] toArray(List<String> messages) {
return messages.toArray(new String[messages.size()]);
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.
[incubator-servicecomb-saga] 07/08: SCB-100 extended timeout to
avoid test failure
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit c74d8e7133176deaaf930a782adb63783537a3b6
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 4 09:48:02 2018 +0800
SCB-100 extended timeout to avoid test failure
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/omega/transaction/spring/TransactionInterceptionTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 9597c6c..26b2de3 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -257,7 +257,7 @@ public class TransactionInterceptionTest {
}
private void waitTillSavedUser(String username) {
- await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+ await().atMost(1000, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
}
private static class UserServiceActor extends AbstractLoggingActor {
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.
[incubator-servicecomb-saga] 01/08: SCB-100 async support for omega
context
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit e2e63f3cafe25a4ce4da26fe2a8e8dd6d94bc5da
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 11:25:45 2018 +0800
SCB-100 async support for omega context
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/omega/context/OmegaContext.java | 6 +-
omega/omega-spring-tx/pom.xml | 4 ++
.../spring/TransactionInterceptionTest.java | 71 ++++++++++++++++------
.../omega/transaction/spring/UserRepository.java | 1 +
4 files changed, 61 insertions(+), 21 deletions(-)
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index 6016b53..f336c4c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -31,9 +31,9 @@ public class OmegaContext {
public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
- private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
- private final ThreadLocal<String> localTxId = new ThreadLocal<>();
- private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
+ private final ThreadLocal<String> globalTxId = new InheritableThreadLocal<>();
+ private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>();
+ private final ThreadLocal<String> parentTxId = new InheritableThreadLocal<>();
private final IdGenerator<String> idGenerator;
private final Map<String, CompensationContext> compensationContexts = new HashMap<>();
diff --git a/omega/omega-spring-tx/pom.xml b/omega/omega-spring-tx/pom.xml
index e5fc8b3..21661bb 100644
--- a/omega/omega-spring-tx/pom.xml
+++ b/omega/omega-spring-tx/pom.xml
@@ -72,6 +72,10 @@
<artifactId>spring-boot-starter-data-jpa</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index dc23612..9ace53f 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -19,7 +19,9 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
@@ -28,8 +30,20 @@ import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
+import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -40,22 +54,11 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
-import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
-import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
-
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
@AutoConfigureMockMvc
public class TransactionInterceptionTest {
- private static final String TX_STARTED_EVENT = "TxStartedEvent";
- private static final String TX_ENDED_EVENT = "TxEndedEvent";
+ private static final ExecutorService executor = Executors.newSingleThreadExecutor();
private static final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
@@ -77,11 +80,14 @@ public class TransactionInterceptionTest {
@Autowired
private MessageHandler messageHandler;
+ private String compensationMethod;
+
@Before
public void setUp() throws Exception {
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(localTxId);
omegaContext.setParentTxId(parentTxId);
+ compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
}
@After
@@ -89,12 +95,15 @@ public class TransactionInterceptionTest {
messages.clear();
}
+ @AfterClass
+ public static void afterClass() throws Exception {
+ executor.shutdown();
+ }
+
@Test
public void sendsUserToRemote_AroundTransaction() throws Exception {
User user = userService.add(new User(username, email));
- String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
@@ -117,8 +126,6 @@ public class TransactionInterceptionTest {
throwable = ignored;
}
- String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
@@ -135,8 +142,6 @@ public class TransactionInterceptionTest {
String localTxId = omegaContext.newLocalTxId();
User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
- String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
@@ -156,6 +161,36 @@ public class TransactionInterceptionTest {
);
}
+ @Test
+ public void passesOmegaContextThroughDifferentThreads() throws Exception {
+ User user = new User(username, email);
+ new Thread(() -> userService.add(user)).start();
+
+ await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+ assertArrayEquals(
+ new String[]{
+ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ toArray(messages)
+ );
+ }
+
+ @Test
+ public void passesOmegaContextInThreadPool() throws Exception {
+ User user = new User(username, email);
+ executor.execute(() -> userService.add(user));
+
+ await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+ assertArrayEquals(
+ new String[]{
+ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ toArray(messages)
+ );
+ }
+
private String[] toArray(List<String> messages) {
return messages.toArray(new String[messages.size()]);
}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
index bcf3e14..729b7ab 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
@@ -20,4 +20,5 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
import org.springframework.data.repository.CrudRepository;
interface UserRepository extends CrudRepository<User, Long> {
+ User findByUsername(String username);
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.