You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:49:16 UTC

[incubator-servicecomb-saga] 06/09: SCB-212 replaced timeout impl with atomic to avoid locking

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 32a226b51e2f5be6ec3d9a09448c48b1b7ce3fe9
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 22:21:58 2018 +0800

    SCB-212 replaced timeout impl with atomic to avoid locking
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../transaction/OmegaTxTimeoutException.java}      |  10 +-
 .../omega/transaction/TimeAwareInterceptor.java    |  25 ++---
 .../saga/omega/transaction/TransactionAspect.java  |  23 ++++-
 .../transaction/TimeAwareInterceptorTest.java      | 113 ++++++++++++++++++++-
 4 files changed, 150 insertions(+), 21 deletions(-)

diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
similarity index 86%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
index fb2ee1d..eb820d6 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
@@ -17,8 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static org.junit.Assert.*;
-
-public class TimeAwareInterceptorTest {
-
-}
\ No newline at end of file
+public class OmegaTxTimeoutException extends RuntimeException {
+  public OmegaTxTimeoutException(String cause) {
+    super(cause);
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index d633630..317f802 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -17,17 +17,15 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 class TimeAwareInterceptor implements EventAwareInterceptor {
   private final EventAwareInterceptor interceptor;
-  private final BlockingDeque<EventAwareInterceptor> interceptors = new LinkedBlockingDeque<>(2);
+  private final AtomicReference<EventAwareInterceptor> interceptorRef;
 
   TimeAwareInterceptor(EventAwareInterceptor interceptor) {
     this.interceptor = interceptor;
-    this.interceptors.offer(interceptor);
+    this.interceptorRef = new AtomicReference<>(interceptor);
   }
 
   @Override
@@ -37,18 +35,21 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
 
   @Override
   public void postIntercept(String localTxId, String signature) {
-    interceptors.offerLast(NO_OP_INTERCEPTOR);
-    interceptors.pollFirst().postIntercept(localTxId, signature);
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.postIntercept(localTxId, signature);
+    }
   }
 
   @Override
   public void onError(String localTxId, String signature, Throwable throwable) {
-    interceptors.offerLast(NO_OP_INTERCEPTOR);
-    interceptors.pollFirst().onError(localTxId, signature, throwable);
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.onError(localTxId, signature, throwable);
+    }
   }
 
-  void onTimeout(String signature, String localTxId) {
-    interceptors.offerFirst(NO_OP_INTERCEPTOR);
-    interceptors.pollLast().onError(localTxId, signature, new TimeoutException());
+  void onTimeout(String localTxId, String signature, Throwable throwable) {
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.onError(localTxId, signature, throwable);
+    }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 9bee829..d3e091c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -59,9 +59,8 @@ public class TransactionAspect {
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
     interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
-    if (compensable.timeout() > 0) {
-      executor.schedule(() -> interceptor.onTimeout(signature, localTxId), compensable.timeout(), MILLISECONDS);
-    }
+
+    scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
 
     try {
       Object result = joinPoint.proceed();
@@ -77,6 +76,24 @@ public class TransactionAspect {
     }
   }
 
+  private void scheduleTimeoutTask(
+      TimeAwareInterceptor interceptor,
+      String localTxId,
+      String signature,
+      Method method,
+      int timeout) {
+
+    if (timeout > 0) {
+      executor.schedule(
+          () -> interceptor.onTimeout(
+              localTxId,
+              signature,
+              new OmegaTxTimeoutException("Transaction " + method.toString() + " timed out")),
+          timeout,
+          MILLISECONDS);
+    }
+  }
+
   private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
       throws NoSuchMethodException {
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index fb2ee1d..d26f04f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -17,8 +17,119 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static org.junit.Assert.*;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
 
 public class TimeAwareInterceptorTest {
+  private static final int runningCounts = 1000;
+
+  private final String localTxId = uniquify("localTxId");
+  private final String signature = uniquify("signature");
+
+  private final AtomicInteger postInterceptInvoked = new AtomicInteger();
+  private final AtomicInteger onErrorInvoked = new AtomicInteger();
+  private final AtomicInteger onTimeoutInvoked = new AtomicInteger();
+
+  private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
+    @Override
+    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    }
+
+    @Override
+    public void postIntercept(String parentTxId, String compensationMethod) {
+      postInterceptInvoked.incrementAndGet();
+    }
+
+    @Override
+    public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+      if (throwable instanceof OmegaTxTimeoutException) {
+        onTimeoutInvoked.incrementAndGet();
+      } else {
+        onErrorInvoked.incrementAndGet();
+      }
+    }
+  };
+
+  private final ExecutorService executorService = Executors.newFixedThreadPool(2);
+  private final RuntimeException timeoutException = new OmegaTxTimeoutException("timed out");
+
+
+  @Test(timeout = 5000)
+  public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws Exception {
+    List<Future<?>> futures = new LinkedList<>();
+
+    for (int i = 0; i < runningCounts; i++) {
+      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
+      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.postIntercept(localTxId, signature);
+      }));
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onTimeout(localTxId, signature, timeoutException);
+      }));
+    }
+
+    waitTillAllDone(futures);
+
+    assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+  }
+
+  @Test(timeout = 5000)
+  public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception {
+    RuntimeException oops = new RuntimeException("oops");
+    List<Future<?>> futures = new LinkedList<>();
+
+    for (int i = 0; i < runningCounts; i++) {
+      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
+      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onError(localTxId, signature, oops);
+      }));
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onTimeout(localTxId, signature, timeoutException);
+      }));
+    }
+
+    waitTillAllDone(futures);
+
+    assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+  }
+
+  private void waitForSignal(CyclicBarrier cyclicBarrier) {
+    try {
+      cyclicBarrier.await();
+    } catch (InterruptedException | BrokenBarrierException e) {
+      fail(e.getMessage());
+    }
+  }
 
+  private void waitTillAllDone(List<Future<?>> futures)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    for (Future<?> future : futures) {
+      future.get();
+    }
+  }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.