You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:49:16 UTC
[incubator-servicecomb-saga] 06/09: SCB-212 replaced timeout impl
with atomic to avoid locking
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 32a226b51e2f5be6ec3d9a09448c48b1b7ce3fe9
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 22:21:58 2018 +0800
SCB-212 replaced timeout impl with atomic to avoid locking
Signed-off-by: seanyinx <se...@huawei.com>
---
.../transaction/OmegaTxTimeoutException.java} | 10 +-
.../omega/transaction/TimeAwareInterceptor.java | 25 ++---
.../saga/omega/transaction/TransactionAspect.java | 23 ++++-
.../transaction/TimeAwareInterceptorTest.java | 113 ++++++++++++++++++++-
4 files changed, 150 insertions(+), 21 deletions(-)
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
similarity index 86%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
index fb2ee1d..eb820d6 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
@@ -17,8 +17,8 @@
package org.apache.servicecomb.saga.omega.transaction;
-import static org.junit.Assert.*;
-
-public class TimeAwareInterceptorTest {
-
-}
\ No newline at end of file
+public class OmegaTxTimeoutException extends RuntimeException {
+ public OmegaTxTimeoutException(String cause) {
+ super(cause);
+ }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index d633630..317f802 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -17,17 +17,15 @@
package org.apache.servicecomb.saga.omega.transaction;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
class TimeAwareInterceptor implements EventAwareInterceptor {
private final EventAwareInterceptor interceptor;
- private final BlockingDeque<EventAwareInterceptor> interceptors = new LinkedBlockingDeque<>(2);
+ private final AtomicReference<EventAwareInterceptor> interceptorRef;
TimeAwareInterceptor(EventAwareInterceptor interceptor) {
this.interceptor = interceptor;
- this.interceptors.offer(interceptor);
+ this.interceptorRef = new AtomicReference<>(interceptor);
}
@Override
@@ -37,18 +35,21 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
@Override
public void postIntercept(String localTxId, String signature) {
- interceptors.offerLast(NO_OP_INTERCEPTOR);
- interceptors.pollFirst().postIntercept(localTxId, signature);
+ if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+ interceptor.postIntercept(localTxId, signature);
+ }
}
@Override
public void onError(String localTxId, String signature, Throwable throwable) {
- interceptors.offerLast(NO_OP_INTERCEPTOR);
- interceptors.pollFirst().onError(localTxId, signature, throwable);
+ if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+ interceptor.onError(localTxId, signature, throwable);
+ }
}
- void onTimeout(String signature, String localTxId) {
- interceptors.offerFirst(NO_OP_INTERCEPTOR);
- interceptors.pollLast().onError(localTxId, signature, new TimeoutException());
+ void onTimeout(String localTxId, String signature, Throwable throwable) {
+ if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+ interceptor.onError(localTxId, signature, throwable);
+ }
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 9bee829..d3e091c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -59,9 +59,8 @@ public class TransactionAspect {
TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
- if (compensable.timeout() > 0) {
- executor.schedule(() -> interceptor.onTimeout(signature, localTxId), compensable.timeout(), MILLISECONDS);
- }
+
+ scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
try {
Object result = joinPoint.proceed();
@@ -77,6 +76,24 @@ public class TransactionAspect {
}
}
+ private void scheduleTimeoutTask(
+ TimeAwareInterceptor interceptor,
+ String localTxId,
+ String signature,
+ Method method,
+ int timeout) {
+
+ if (timeout > 0) {
+ executor.schedule(
+ () -> interceptor.onTimeout(
+ localTxId,
+ signature,
+ new OmegaTxTimeoutException("Transaction " + method.toString() + " timed out")),
+ timeout,
+ MILLISECONDS);
+ }
+ }
+
private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
throws NoSuchMethodException {
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index fb2ee1d..d26f04f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -17,8 +17,119 @@
package org.apache.servicecomb.saga.omega.transaction;
-import static org.junit.Assert.*;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
public class TimeAwareInterceptorTest {
+ private static final int runningCounts = 1000;
+
+ private final String localTxId = uniquify("localTxId");
+ private final String signature = uniquify("signature");
+
+ private final AtomicInteger postInterceptInvoked = new AtomicInteger();
+ private final AtomicInteger onErrorInvoked = new AtomicInteger();
+ private final AtomicInteger onTimeoutInvoked = new AtomicInteger();
+
+ private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
+ @Override
+ public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ }
+
+ @Override
+ public void postIntercept(String parentTxId, String compensationMethod) {
+ postInterceptInvoked.incrementAndGet();
+ }
+
+ @Override
+ public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+ if (throwable instanceof OmegaTxTimeoutException) {
+ onTimeoutInvoked.incrementAndGet();
+ } else {
+ onErrorInvoked.incrementAndGet();
+ }
+ }
+ };
+
+ private final ExecutorService executorService = Executors.newFixedThreadPool(2);
+ private final RuntimeException timeoutException = new OmegaTxTimeoutException("timed out");
+
+
+ @Test(timeout = 5000)
+ public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws Exception {
+ List<Future<?>> futures = new LinkedList<>();
+
+ for (int i = 0; i < runningCounts; i++) {
+ TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
+ CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+
+
+ futures.add(executorService.submit(() -> {
+ waitForSignal(cyclicBarrier);
+ interceptor.postIntercept(localTxId, signature);
+ }));
+
+ futures.add(executorService.submit(() -> {
+ waitForSignal(cyclicBarrier);
+ interceptor.onTimeout(localTxId, signature, timeoutException);
+ }));
+ }
+
+ waitTillAllDone(futures);
+
+ assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+ }
+
+ @Test(timeout = 5000)
+ public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception {
+ RuntimeException oops = new RuntimeException("oops");
+ List<Future<?>> futures = new LinkedList<>();
+
+ for (int i = 0; i < runningCounts; i++) {
+ TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
+ CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+
+
+ futures.add(executorService.submit(() -> {
+ waitForSignal(cyclicBarrier);
+ interceptor.onError(localTxId, signature, oops);
+ }));
+
+ futures.add(executorService.submit(() -> {
+ waitForSignal(cyclicBarrier);
+ interceptor.onTimeout(localTxId, signature, timeoutException);
+ }));
+ }
+
+ waitTillAllDone(futures);
+
+ assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+ }
+
+ private void waitForSignal(CyclicBarrier cyclicBarrier) {
+ try {
+ cyclicBarrier.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ fail(e.getMessage());
+ }
+ }
+ private void waitTillAllDone(List<Future<?>> futures)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.