You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:49:17 UTC

[incubator-servicecomb-saga] 07/09: SCB-212 added saga start timeout

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit edf12b5d4459834f60b9978ba81a2e853a5cb43b
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 09:32:23 2018 +0800

    SCB-212 added saga start timeout
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 omega/omega-transaction/pom.xml                    | 16 +++++-
 .../transaction/SagaStartAnnotationProcessor.java  | 15 ++++--
 .../saga/omega/transaction/SagaStartAspect.java    | 32 ++++++++++--
 .../omega/transaction/TimeAwareInterceptor.java    | 16 +++---
 .../SagaStartAnnotationProcessorTest.java          |  4 +-
 .../omega/transaction/SagaStartAspectTest.java     | 59 +++++++++++++++++-----
 .../src/test/resources/log4j2-test.xml             | 30 +++++++++++
 7 files changed, 141 insertions(+), 31 deletions(-)

diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index 1829650..1aa77fb 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -63,7 +63,21 @@
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 64f47ce..6e8556b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -19,7 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
-class SagaStartAnnotationProcessor {
+class SagaStartAnnotationProcessor implements EventAwareInterceptor {
 
   private final OmegaContext omegaContext;
   private final MessageSender sender;
@@ -29,18 +29,27 @@ class SagaStartAnnotationProcessor {
     this.sender = sender;
   }
 
-  void preIntercept() {
+  @Override
+  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
     String globalTxId = globalTxId();
     // reuse the globalTxId as localTxId to differ localTxId in SagaStartedEvent and the first TxStartedEvent
     sender.send(new SagaStartedEvent(globalTxId, globalTxId));
   }
 
-  void postIntercept() {
+  @Override
+  public void postIntercept(String parentTxId, String compensationMethod) {
     String globalTxId = omegaContext.globalTxId();
     sender.send(new SagaEndedEvent(globalTxId, globalTxId));
     omegaContext.clear();
   }
 
+  @Override
+  public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+    String globalTxId = omegaContext.globalTxId();
+    sender.send(new TxAbortedEvent(globalTxId, globalTxId, null, compensationMethod, throwable));
+    omegaContext.clear();
+  }
+
   private String globalTxId() {
     String globalTxId = omegaContext.newGlobalTxId();
     omegaContext.setLocalTxId(globalTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 307af3c..3fa6322 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -17,10 +17,15 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
@@ -33,6 +38,7 @@ public class SagaStartAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
   private final OmegaContext context;
 
   public SagaStartAspect(MessageSender sender, OmegaContext context) {
@@ -40,13 +46,15 @@ public class SagaStartAspect {
     this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
   }
 
-  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
-  Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
+  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
+  Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    sagaStartAnnotationProcessor.preIntercept();
+    TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor);
+    interceptor.preIntercept(context.globalTxId(), method.toString());
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
 
+    scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
     try {
       return joinPoint.proceed();
     } catch (Throwable throwable) {
@@ -54,7 +62,23 @@ public class SagaStartAspect {
       throw throwable;
     } finally {
       LOG.debug("Transaction with context {} has finished.", context);
-      sagaStartAnnotationProcessor.postIntercept();
+      interceptor.postIntercept(context.globalTxId(), method.toString());
+    }
+  }
+
+  private void scheduleTimeoutTask(
+      TimeAwareInterceptor interceptor,
+      Method method,
+      int timeout) {
+
+    if (timeout > 0) {
+      executor.schedule(
+          () -> interceptor.onTimeout(
+              context.globalTxId(),
+              method.toString(),
+              new OmegaTxTimeoutException("Saga " + method.toString() + " timed out")),
+          timeout,
+          MILLISECONDS);
     }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 317f802..80ad03f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,27 +29,27 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String localTxId, String signature, Object... args) {
-    interceptor.preIntercept(localTxId, signature, args);
+  public void preIntercept(String parentTxId, String signature, Object... args) {
+    interceptor.preIntercept(parentTxId, signature, args);
   }
 
   @Override
-  public void postIntercept(String localTxId, String signature) {
+  public void postIntercept(String parentTxId, String signature) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.postIntercept(localTxId, signature);
+      interceptor.postIntercept(parentTxId, signature);
     }
   }
 
   @Override
-  public void onError(String localTxId, String signature, Throwable throwable) {
+  public void onError(String parentTxId, String signature, Throwable throwable) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(parentTxId, signature, throwable);
     }
   }
 
-  void onTimeout(String localTxId, String signature, Throwable throwable) {
+  void onTimeout(String parentTxId, String signature, Throwable throwable) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(parentTxId, signature, throwable);
     }
   }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 0dadebe..8fa3568 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -53,7 +53,7 @@ public class SagaStartAnnotationProcessorTest {
   public void sendsSagaStartedEvent() {
     when(generator.nextId()).thenReturn(globalTxId, localTxId);
 
-    sagaStartAnnotationProcessor.preIntercept();
+    sagaStartAnnotationProcessor.preIntercept(null, null);
 
     assertThat(context.globalTxId(), is(globalTxId));
     assertThat(context.localTxId(), is(globalTxId));
@@ -73,7 +73,7 @@ public class SagaStartAnnotationProcessorTest {
     context.clear();
     context.setGlobalTxId(globalTxId);
 
-    sagaStartAnnotationProcessor.postIntercept();
+    sagaStartAnnotationProcessor.postIntercept(null, null);
 
     TxEvent event = messages.get(0);
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 432b3ad..c282ecc 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -18,18 +18,23 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
@@ -40,7 +45,6 @@ public class SagaStartAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = messages::add;
   private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
@@ -48,29 +52,24 @@ public class SagaStartAspectTest {
 
   @SuppressWarnings("unchecked")
   private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
-  private final Compensable compensable = Mockito.mock(Compensable.class);
+  private final SagaStart sagaStart = Mockito.mock(SagaStart.class);
 
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
 
   @Before
   public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(globalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
     when(joinPoint.getTarget()).thenReturn(this);
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
-    when(compensable.compensationMethod()).thenReturn("doNothing");
-
-    omegaContext.setGlobalTxId(globalTxId);
-    omegaContext.setLocalTxId(localTxId);
+    omegaContext.clear();
   }
 
   @Test
   public void newGlobalTxIdInSagaStart() throws Throwable {
-    omegaContext.clear();
-    when(idGenerator.nextId()).thenReturn(globalTxId);
-
-    aspect.advise(joinPoint);
+    aspect.advise(joinPoint, sagaStart);
 
     TxEvent startedEvent = messages.get(0);
 
@@ -92,13 +91,12 @@ public class SagaStartAspectTest {
 
   @Test
   public void clearContextOnSagaStartError() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(globalTxId);
     RuntimeException oops = new RuntimeException("oops");
 
     when(joinPoint.proceed()).thenThrow(oops);
 
     try {
-      aspect.advise(joinPoint);
+      aspect.advise(joinPoint, sagaStart);
       expectFailing(RuntimeException.class);
     } catch (RuntimeException e) {
       assertThat(e, is(oops));
@@ -115,6 +113,41 @@ public class SagaStartAspectTest {
     assertThat(omegaContext.localTxId(), is(nullValue()));
   }
 
+  @Test
+  public void sendsAbortEventOnTimeout() throws Throwable {
+    CountDownLatch latch = new CountDownLatch(1);
+    when(sagaStart.timeout()).thenReturn(100);
+    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
+      latch.await();
+      assertThat(omegaContext.localTxId(), is(globalTxId));
+      return null;
+    });
+
+    CompletableFuture.runAsync(() -> {
+      try {
+        aspect.advise(joinPoint, sagaStart);
+      } catch (Throwable throwable) {
+        fail(throwable.getMessage());
+      }
+    });
+
+    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(globalTxId));
+    assertThat(event.parentTxId(), is(nullValue()));
+    assertThat(event.type(), is("TxAbortedEvent"));
+
+    latch.countDown();
+
+    await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null);
+
+    // no redundant ended message received
+    assertThat(messages.size(), is(2));
+  }
+
   private String doNothing() {
     return "doNothing";
   }
diff --git a/omega/omega-transaction/src/test/resources/log4j2-test.xml b/omega/omega-transaction/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/omega/omega-transaction/src/test/resources/log4j2-test.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.