You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/02/08 08:49:57 UTC
[incubator-servicecomb-saga] 02/06: SCB-239 remove timeout from
omega
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 46096fe2f0f0d3d03fb256ed2fd5f221c0b9e851
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 23 16:09:18 2018 +0800
SCB-239 remove timeout from omega
Signed-off-by: Eric Lee <da...@huawei.com>
---
...eInterceptor.java => OnceAwareInterceptor.java} | 4 +-
.../saga/omega/transaction/SagaStartAspect.java | 25 +--------
.../saga/omega/transaction/TransactionAspect.java | 29 +----------
...ptorTest.java => OnceAwareInterceptorTest.java} | 59 ++++------------------
.../omega/transaction/SagaStartAspectTest.java | 44 ----------------
.../omega/transaction/TransactionAspectTest.java | 48 ------------------
6 files changed, 17 insertions(+), 192 deletions(-)
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
similarity index 94%
rename from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
rename to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
index 9de26d2..2ab7d12 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
@@ -19,12 +19,12 @@ package org.apache.servicecomb.saga.omega.transaction;
import java.util.concurrent.atomic.AtomicReference;
-class TimeAwareInterceptor implements EventAwareInterceptor {
+class OnceAwareInterceptor implements EventAwareInterceptor {
private final EventAwareInterceptor interceptor;
private final AtomicReference<EventAwareInterceptor> interceptorRef;
private Throwable throwable = null;
- TimeAwareInterceptor(EventAwareInterceptor interceptor) {
+ OnceAwareInterceptor(EventAwareInterceptor interceptor) {
this.interceptor = interceptor;
this.interceptorRef = new AtomicReference<>(interceptor);
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index db8e3a0..7328fef 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -17,12 +17,8 @@
package org.apache.servicecomb.saga.omega.transaction;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
@@ -38,7 +34,7 @@ public class SagaStartAspect {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
private final OmegaContext context;
public SagaStartAspect(MessageSender sender, OmegaContext context) {
@@ -51,11 +47,10 @@ public class SagaStartAspect {
initializeOmegaContext();
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
- TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor);
+ OnceAwareInterceptor interceptor = new OnceAwareInterceptor(sagaStartAnnotationProcessor);
interceptor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout());
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
- scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
try {
Object result = joinPoint.proceed();
@@ -74,20 +69,4 @@ public class SagaStartAspect {
private void initializeOmegaContext() {
context.setLocalTxId(context.newGlobalTxId());
}
-
- private void scheduleTimeoutTask(
- TimeAwareInterceptor interceptor,
- Method method,
- int timeout) {
-
- if (timeout > 0) {
- executor.schedule(
- () -> interceptor.onTimeout(
- context.globalTxId(),
- method.toString(),
- new OmegaTxTimeoutException("Saga " + method.toString() + " timed out")),
- timeout,
- MILLISECONDS);
- }
- }
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 718d1fd..090fe2e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -17,12 +17,8 @@
package org.apache.servicecomb.saga.omega.transaction;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import javax.transaction.InvalidTransactionException;
@@ -40,7 +36,7 @@ public class TransactionAspect {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OmegaContext context;
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
private final CompensableInterceptor interceptor;
public TransactionAspect(MessageSender sender, OmegaContext context) {
@@ -58,7 +54,7 @@ public class TransactionAspect {
String localTxId = context.localTxId();
context.newLocalTxId();
- TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
+ OnceAwareInterceptor interceptor = new OnceAwareInterceptor(this.interceptor);
AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs());
if (response.aborted()) {
String abortedLocalTxId = context.localTxId();
@@ -68,9 +64,6 @@ public class TransactionAspect {
}
LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
- // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha
- scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
-
try {
Object result = joinPoint.proceed();
interceptor.postIntercept(localTxId, signature);
@@ -85,24 +78,6 @@ public class TransactionAspect {
}
}
- private void scheduleTimeoutTask(
- TimeAwareInterceptor interceptor,
- String localTxId,
- String signature,
- Method method,
- int timeout) {
-
- if (timeout > 0) {
- executor.schedule(
- () -> interceptor.onTimeout(
- localTxId,
- signature,
- new OmegaTxTimeoutException("Transaction " + method.toString() + " timed out")),
- timeout,
- MILLISECONDS);
- }
- }
-
private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
throws NoSuchMethodException {
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
similarity index 62%
rename from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
rename to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
index 0f2d2eb..0a87491 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
@@ -20,12 +20,9 @@ package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -34,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-public class TimeAwareInterceptorTest {
+public class OnceAwareInterceptorTest {
private static final int runningCounts = 1000;
private final String localTxId = uniquify("localTxId");
@@ -66,70 +63,36 @@ public class TimeAwareInterceptorTest {
};
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
- private final RuntimeException timeoutException = new OmegaTxTimeoutException("timed out");
-
- @Test(timeout = 5000)
- public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws Exception {
+ @Test
+ public void invokePostIntercept() throws Exception {
List<Future<?>> futures = new LinkedList<>();
for (int i = 0; i < runningCounts; i++) {
- TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
- CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
- ExpectedException exception = ExpectedException.none();
-
- futures.add(executorService.submit(() -> {
- try {
- waitForSignal(cyclicBarrier);
- interceptor.postIntercept(localTxId, signature);
- } catch (Throwable throwable) {
- exception.expect(OmegaTxTimeoutException.class);
- }
- }));
-
- futures.add(executorService.submit(() -> {
- waitForSignal(cyclicBarrier);
- interceptor.onTimeout(localTxId, signature, timeoutException);
- }));
+ OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying);
+
+ futures.add(executorService.submit(() -> interceptor.postIntercept(localTxId, signature)));
}
waitTillAllDone(futures);
- assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+ assertThat(postInterceptInvoked.get(), is(runningCounts));
}
- @Test(timeout = 5000)
+ @Test
public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception {
RuntimeException oops = new RuntimeException("oops");
List<Future<?>> futures = new LinkedList<>();
for (int i = 0; i < runningCounts; i++) {
- TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
- CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
-
-
- futures.add(executorService.submit(() -> {
- waitForSignal(cyclicBarrier);
- interceptor.onError(localTxId, signature, oops);
- }));
+ OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying);
- futures.add(executorService.submit(() -> {
- waitForSignal(cyclicBarrier);
- interceptor.onTimeout(localTxId, signature, timeoutException);
- }));
+ futures.add(executorService.submit(() -> interceptor.onError(localTxId, signature, oops)));
}
waitTillAllDone(futures);
- assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
- }
-
- private void waitForSignal(CyclicBarrier cyclicBarrier) {
- try {
- cyclicBarrier.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- fail(e.getMessage());
- }
+ assertThat(onErrorInvoked.get(), is(runningCounts));
}
private void waitTillAllDone(List<Future<?>> futures)
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 1bc2b28..77d40ef 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -18,20 +18,14 @@
package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.omega.context.IdGenerator;
@@ -62,8 +56,6 @@ public class SagaStartAspectTest {
private final OmegaContext omegaContext = new OmegaContext(idGenerator);
private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
@Before
public void setUp() throws Exception {
when(idGenerator.nextId()).thenReturn(globalTxId);
@@ -120,42 +112,6 @@ public class SagaStartAspectTest {
assertThat(omegaContext.localTxId(), is(nullValue()));
}
- @Test
- public void sendsAbortEventOnTimeout() throws Throwable {
- CountDownLatch latch = new CountDownLatch(1);
- when(sagaStart.timeout()).thenReturn(100);
- when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
- latch.await();
- assertThat(omegaContext.localTxId(), is(globalTxId));
- return null;
- });
-
- ExpectedException exception = ExpectedException.none();
- executor.execute(() -> {
- try {
- aspect.advise(joinPoint, sagaStart);
- } catch (Throwable throwable) {
- exception.expect(OmegaTxTimeoutException.class);
- }
- });
-
- await().atMost(1, SECONDS).until(() -> messages.size() == 2);
-
- TxEvent event = messages.get(1);
-
- assertThat(event.globalTxId(), is(globalTxId));
- assertThat(event.localTxId(), is(globalTxId));
- assertThat(event.parentTxId(), is(nullValue()));
- assertThat(event.type(), is(EventType.TxAbortedEvent));
-
- latch.countDown();
-
- await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null);
-
- // no redundant ended message received
- assertThat(messages.size(), is(2));
- }
-
private String doNothing() {
return "doNothing";
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 8689a1e..31d148f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -18,11 +18,8 @@
package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.awaitility.Awaitility.await;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -32,9 +29,6 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import javax.transaction.InvalidTransactionException;
@@ -69,8 +63,6 @@ public class TransactionAspectTest {
private final OmegaContext omegaContext = new OmegaContext(idGenerator);
private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
@Before
public void setUp() throws Exception {
when(idGenerator.nextId()).thenReturn(newLocalTxId);
@@ -131,46 +123,6 @@ public class TransactionAspectTest {
}
@Test
- public void sendsAbortEventOnTimeout() throws Throwable {
- CountDownLatch latch = new CountDownLatch(1);
- when(compensable.timeout()).thenReturn(100);
- when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
- latch.await();
- assertThat(omegaContext.localTxId(), is(newLocalTxId));
- return null;
- });
-
- ExpectedException exception = ExpectedException.none();
- executor.execute(() -> {
- try {
- // need to setup the thread local for it
- omegaContext.setGlobalTxId(globalTxId);
- omegaContext.setLocalTxId(localTxId);
-
- aspect.advise(joinPoint, compensable);
- } catch (Throwable throwable) {
- exception.expect(OmegaTxTimeoutException.class);
- }
- });
-
- await().atMost(1, SECONDS).until(() -> messages.size() == 2);
-
- TxEvent event = messages.get(1);
-
- assertThat(event.globalTxId(), is(globalTxId));
- assertThat(event.localTxId(), is(newLocalTxId));
- assertThat(event.parentTxId(), is(localTxId));
- assertThat(event.type(), is(EventType.TxAbortedEvent));
-
- latch.countDown();
-
- await().atMost(1, SECONDS).until(() -> localTxId.equals(omegaContext.localTxId()));
-
- // no redundant ended message received
- assertThat(messages.size(), is(2));
- }
-
- @Test
public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
MessageSender sender = mock(MessageSender.class);
when(sender.send(any())).thenReturn(new AlphaResponse(true));
--
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.