You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:49:17 UTC
[incubator-servicecomb-saga] 07/09: SCB-212 added saga start timeout
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit edf12b5d4459834f60b9978ba81a2e853a5cb43b
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 09:32:23 2018 +0800
SCB-212 added saga start timeout
Signed-off-by: seanyinx <se...@huawei.com>
---
omega/omega-transaction/pom.xml | 16 +++++-
.../transaction/SagaStartAnnotationProcessor.java | 15 ++++--
.../saga/omega/transaction/SagaStartAspect.java | 32 ++++++++++--
.../omega/transaction/TimeAwareInterceptor.java | 16 +++---
.../SagaStartAnnotationProcessorTest.java | 4 +-
.../omega/transaction/SagaStartAspectTest.java | 59 +++++++++++++++++-----
.../src/test/resources/log4j2-test.xml | 30 +++++++++++
7 files changed, 141 insertions(+), 31 deletions(-)
diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index 1829650..1aa77fb 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -63,7 +63,21 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 64f47ce..6e8556b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -19,7 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
-class SagaStartAnnotationProcessor {
+class SagaStartAnnotationProcessor implements EventAwareInterceptor {
private final OmegaContext omegaContext;
private final MessageSender sender;
@@ -29,18 +29,27 @@ class SagaStartAnnotationProcessor {
this.sender = sender;
}
- void preIntercept() {
+ @Override
+ public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
String globalTxId = globalTxId();
// reuse the globalTxId as localTxId to differ localTxId in SagaStartedEvent and the first TxStartedEvent
sender.send(new SagaStartedEvent(globalTxId, globalTxId));
}
- void postIntercept() {
+ @Override
+ public void postIntercept(String parentTxId, String compensationMethod) {
String globalTxId = omegaContext.globalTxId();
sender.send(new SagaEndedEvent(globalTxId, globalTxId));
omegaContext.clear();
}
+ @Override
+ public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+ String globalTxId = omegaContext.globalTxId();
+ sender.send(new TxAbortedEvent(globalTxId, globalTxId, null, compensationMethod, throwable));
+ omegaContext.clear();
+ }
+
private String globalTxId() {
String globalTxId = omegaContext.newGlobalTxId();
omegaContext.setLocalTxId(globalTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 307af3c..3fa6322 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -17,10 +17,15 @@
package org.apache.servicecomb.saga.omega.transaction;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
@@ -33,6 +38,7 @@ public class SagaStartAspect {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+ private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final OmegaContext context;
public SagaStartAspect(MessageSender sender, OmegaContext context) {
@@ -40,13 +46,15 @@ public class SagaStartAspect {
this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
}
- @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
- Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
+ @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
+ Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
- sagaStartAnnotationProcessor.preIntercept();
+ TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor);
+ interceptor.preIntercept(context.globalTxId(), method.toString());
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
+ scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
try {
return joinPoint.proceed();
} catch (Throwable throwable) {
@@ -54,7 +62,23 @@ public class SagaStartAspect {
throw throwable;
} finally {
LOG.debug("Transaction with context {} has finished.", context);
- sagaStartAnnotationProcessor.postIntercept();
+ interceptor.postIntercept(context.globalTxId(), method.toString());
+ }
+ }
+
+ private void scheduleTimeoutTask(
+ TimeAwareInterceptor interceptor,
+ Method method,
+ int timeout) {
+
+ if (timeout > 0) {
+ executor.schedule(
+ () -> interceptor.onTimeout(
+ context.globalTxId(),
+ method.toString(),
+ new OmegaTxTimeoutException("Saga " + method.toString() + " timed out")),
+ timeout,
+ MILLISECONDS);
}
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 317f802..80ad03f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,27 +29,27 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
}
@Override
- public void preIntercept(String localTxId, String signature, Object... args) {
- interceptor.preIntercept(localTxId, signature, args);
+ public void preIntercept(String parentTxId, String signature, Object... args) {
+ interceptor.preIntercept(parentTxId, signature, args);
}
@Override
- public void postIntercept(String localTxId, String signature) {
+ public void postIntercept(String parentTxId, String signature) {
if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
- interceptor.postIntercept(localTxId, signature);
+ interceptor.postIntercept(parentTxId, signature);
}
}
@Override
- public void onError(String localTxId, String signature, Throwable throwable) {
+ public void onError(String parentTxId, String signature, Throwable throwable) {
if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
- interceptor.onError(localTxId, signature, throwable);
+ interceptor.onError(parentTxId, signature, throwable);
}
}
- void onTimeout(String localTxId, String signature, Throwable throwable) {
+ void onTimeout(String parentTxId, String signature, Throwable throwable) {
if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
- interceptor.onError(localTxId, signature, throwable);
+ interceptor.onError(parentTxId, signature, throwable);
}
}
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 0dadebe..8fa3568 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -53,7 +53,7 @@ public class SagaStartAnnotationProcessorTest {
public void sendsSagaStartedEvent() {
when(generator.nextId()).thenReturn(globalTxId, localTxId);
- sagaStartAnnotationProcessor.preIntercept();
+ sagaStartAnnotationProcessor.preIntercept(null, null);
assertThat(context.globalTxId(), is(globalTxId));
assertThat(context.localTxId(), is(globalTxId));
@@ -73,7 +73,7 @@ public class SagaStartAnnotationProcessorTest {
context.clear();
context.setGlobalTxId(globalTxId);
- sagaStartAnnotationProcessor.postIntercept();
+ sagaStartAnnotationProcessor.postIntercept(null, null);
TxEvent event = messages.get(0);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 432b3ad..c282ecc 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -18,18 +18,23 @@
package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import org.apache.servicecomb.saga.omega.context.IdGenerator;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Before;
@@ -40,7 +45,6 @@ public class SagaStartAspectTest {
private final List<TxEvent> messages = new ArrayList<>();
private final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
- private final String parentTxId = UUID.randomUUID().toString();
private final MessageSender sender = messages::add;
private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
@@ -48,29 +52,24 @@ public class SagaStartAspectTest {
@SuppressWarnings("unchecked")
private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
- private final Compensable compensable = Mockito.mock(Compensable.class);
+ private final SagaStart sagaStart = Mockito.mock(SagaStart.class);
private final OmegaContext omegaContext = new OmegaContext(idGenerator);
private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
@Before
public void setUp() throws Exception {
+ when(idGenerator.nextId()).thenReturn(globalTxId);
when(joinPoint.getSignature()).thenReturn(methodSignature);
when(joinPoint.getTarget()).thenReturn(this);
when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
- when(compensable.compensationMethod()).thenReturn("doNothing");
-
- omegaContext.setGlobalTxId(globalTxId);
- omegaContext.setLocalTxId(localTxId);
+ omegaContext.clear();
}
@Test
public void newGlobalTxIdInSagaStart() throws Throwable {
- omegaContext.clear();
- when(idGenerator.nextId()).thenReturn(globalTxId);
-
- aspect.advise(joinPoint);
+ aspect.advise(joinPoint, sagaStart);
TxEvent startedEvent = messages.get(0);
@@ -92,13 +91,12 @@ public class SagaStartAspectTest {
@Test
public void clearContextOnSagaStartError() throws Throwable {
- when(idGenerator.nextId()).thenReturn(globalTxId);
RuntimeException oops = new RuntimeException("oops");
when(joinPoint.proceed()).thenThrow(oops);
try {
- aspect.advise(joinPoint);
+ aspect.advise(joinPoint, sagaStart);
expectFailing(RuntimeException.class);
} catch (RuntimeException e) {
assertThat(e, is(oops));
@@ -115,6 +113,41 @@ public class SagaStartAspectTest {
assertThat(omegaContext.localTxId(), is(nullValue()));
}
+ @Test
+ public void sendsAbortEventOnTimeout() throws Throwable {
+ CountDownLatch latch = new CountDownLatch(1);
+ when(sagaStart.timeout()).thenReturn(100);
+ when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
+ latch.await();
+ assertThat(omegaContext.localTxId(), is(globalTxId));
+ return null;
+ });
+
+ CompletableFuture.runAsync(() -> {
+ try {
+ aspect.advise(joinPoint, sagaStart);
+ } catch (Throwable throwable) {
+ fail(throwable.getMessage());
+ }
+ });
+
+ await().atMost(1, SECONDS).until(() -> messages.size() == 2);
+
+ TxEvent event = messages.get(1);
+
+ assertThat(event.globalTxId(), is(globalTxId));
+ assertThat(event.localTxId(), is(globalTxId));
+ assertThat(event.parentTxId(), is(nullValue()));
+ assertThat(event.type(), is("TxAbortedEvent"));
+
+ latch.countDown();
+
+ await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null);
+
+ // no redundant ended message received
+ assertThat(messages.size(), is(2));
+ }
+
private String doNothing() {
return "doNothing";
}
diff --git a/omega/omega-transaction/src/test/resources/log4j2-test.xml b/omega/omega-transaction/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/omega/omega-transaction/src/test/resources/log4j2-test.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.