You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/02 10:50:31 UTC
[incubator-servicecomb-saga] 06/06: SCB-149 pushed failed
compensations to a scheduled task queue
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 49235b50798956ffdd1a65feb503cf9c1bfb7baa
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 2 16:29:43 2018 +0800
SCB-149 pushed failed compensations to a scheduled task queue
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/core/PendingTaskRunner.java | 45 ++++++++++++
...egaCallback.java => PushBackOmegaCallback.java} | 38 ++++-------
.../saga/alpha/core/TxConsistentService.java | 1 -
.../saga/alpha/core/PendingTaskRunnerTest.java | 60 ++++++++++++++++
.../saga/alpha/core/PushBackOmegaCallbackTest.java | 65 ++++++++++++++++++
.../saga/alpha/core/RetryOmegaCallbackTest.java | 79 ----------------------
.../servicecomb/saga/alpha/core/TxEventMaker.java | 36 ++++++++++
.../servicecomb/saga/alpha/server/AlphaConfig.java | 23 +++++--
8 files changed, 235 insertions(+), 112 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java
new file mode 100644
index 0000000..c9a06fa
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class PendingTaskRunner {
+ private final BlockingQueue<Runnable> pendingTasks;
+ private final int delay;
+
+ public PendingTaskRunner(BlockingQueue<Runnable> pendingTasks, int delay) {
+ this.pendingTasks = pendingTasks;
+ this.delay = delay;
+ }
+
+ public Future<?> run() {
+ return Executors.newSingleThreadScheduledExecutor()
+ .scheduleWithFixedDelay(() -> {
+ try {
+ pendingTasks.take().run();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }, 0, delay, MILLISECONDS);
+ }
+}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
similarity index 58%
rename from alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
rename to alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 6f1a7dd..8403af0 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -18,49 +18,35 @@
package io.servicecomb.saga.alpha.core;
import java.lang.invoke.MethodHandles;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RetryOmegaCallback implements OmegaCallback {
+public class PushBackOmegaCallback implements OmegaCallback {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final String ERROR_MESSAGE = "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]";
+ private final BlockingQueue<Runnable> pendingCompensations;
private final OmegaCallback underlying;
- private final int delay;
- public RetryOmegaCallback(OmegaCallback underlying, int delay) {
+ public PushBackOmegaCallback(BlockingQueue<Runnable> pendingCompensations, OmegaCallback underlying) {
+ this.pendingCompensations = pendingCompensations;
this.underlying = underlying;
- this.delay = delay;
}
@Override
public void compensate(TxEvent event) {
- boolean success = false;
- do {
- try {
- underlying.compensate(event);
- success = true;
- } catch (Exception e) {
- logError(ERROR_MESSAGE, event, e);
- sleep(event);
- }
- } while (!success && !Thread.currentThread().isInterrupted());
- }
-
- private void sleep(TxEvent event) {
try {
- TimeUnit.MILLISECONDS.sleep(delay);
- } catch (InterruptedException e) {
- logError(ERROR_MESSAGE + " due to interruption", event, e);
-
- Thread.currentThread().interrupt();
+ underlying.compensate(event);
+ } catch (Exception e) {
+ logError(event, e);
+ pendingCompensations.offer(() -> compensate(event));
}
}
- private void logError(String message, TxEvent event, Exception e) {
- log.error(message,
+ private void logError(TxEvent event, Exception e) {
+ log.error(
+ "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
event.serviceName(),
event.instanceId(),
event.compensationMethod(),
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index 5dcb7bc..6fd9193 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -46,7 +46,6 @@ public class TxConsistentService {
CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
}
- // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
private void compensate(TxEvent event) {
List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
events.forEach(omegaCallback::compensate);
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java
new file mode 100644
index 0000000..d806eec
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.Test;
+
+public class PendingTaskRunnerTest {
+ private final List<String> messages = new ArrayList<>();
+ private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>();
+ private final PendingTaskRunner taskRunner = new PendingTaskRunner(runnables, 10);
+
+ @Test
+ public void burnsAllTasksInQueue() throws Exception {
+ runnables.offer(() -> messages.add("hello"));
+ runnables.offer(() -> messages.add("world"));
+
+ taskRunner.run();
+
+ await().atMost(500, MILLISECONDS).until(runnables::isEmpty);
+
+ assertThat(messages, contains("hello", "world"));
+ }
+
+ @Test
+ public void exitOnInterruption() throws Exception {
+ taskRunner.run().cancel(true);
+
+ runnables.offer(() -> messages.add("hello"));
+ Thread.sleep(300);
+
+ assertThat(runnables.isEmpty(), is(false));
+ assertThat(messages.isEmpty(), is(true));
+ }
+}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
new file mode 100644
index 0000000..f53624c
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static io.servicecomb.saga.alpha.core.TxEventMaker.someEvent;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PushBackOmegaCallbackTest {
+ private static final Runnable NO_OP_RUNNABLE = () -> {
+ };
+
+ private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
+ private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>();
+ private final PushBackOmegaCallback pushBack = new PushBackOmegaCallback(runnables, underlying);
+
+ @Before
+ public void setUp() throws Exception {
+ runnables.offer(NO_OP_RUNNABLE);
+ }
+
+ @Test
+ public void pushFailedCallbackToEndOfQueue() throws Exception {
+ TxEvent event = someEvent();
+ doThrow(AlphaException.class).doThrow(AlphaException.class).doNothing().when(underlying).compensate(event);
+
+ pushBack.compensate(event);
+
+ assertThat(runnables.size(), is(2));
+ assertThat(runnables.poll(), is(NO_OP_RUNNABLE));
+
+ // failed again and pushed back itself to queue
+ runnables.poll().run();
+ assertThat(runnables.size(), is(1));
+
+ runnables.poll().run();
+
+ verify(underlying, times(3)).compensate(event);
+ }
+}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
deleted file mode 100644
index 27cc16f..0000000
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.servicecomb.saga.alpha.core;
-
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static org.mockito.Mockito.atMost;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.Date;
-import java.util.UUID;
-
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class RetryOmegaCallbackTest {
- private final int delay = 100;
- private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
- private final RetryOmegaCallback callback = new RetryOmegaCallback(underlying, delay);
-
- @Test
- public void retryOnFailure() throws Exception {
- TxEvent event = someEvent();
-
- doThrow(AlphaException.class)
- .doThrow(AlphaException.class)
- .doNothing()
- .when(underlying)
- .compensate(event);
-
- callback.compensate(event);
-
- verify(underlying, times(3)).compensate(event);
- }
-
- @Test
- public void exitOnInterruption() throws Exception {
- TxEvent event = someEvent();
-
- doThrow(AlphaException.class).when(underlying).compensate(event);
-
- Thread thread = new Thread(() -> callback.compensate(event));
- thread.start();
-
- Thread.sleep(300);
- thread.interrupt();
-
- verify(underlying, atMost(4)).compensate(event);
- }
-
- private TxEvent someEvent() {
- return new TxEvent(
- uniquify("serviceName"),
- uniquify("instanceId"),
- new Date(),
- uniquify("globalTxId"),
- uniquify("localTxId"),
- UUID.randomUUID().toString(),
- EventType.TxStartedEvent.name(),
- getClass().getCanonicalName(),
- uniquify("blah").getBytes());
- }
-}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java
new file mode 100644
index 0000000..77ef920
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+
+import java.util.Date;
+import java.util.UUID;
+
+class TxEventMaker {
+ static TxEvent someEvent() {
+ return new TxEvent(
+ uniquify("serviceName"),
+ uniquify("instanceId"),
+ new Date(),
+ uniquify("globalTxId"),
+ uniquify("localTxId"),
+ UUID.randomUUID().toString(),
+ EventType.TxStartedEvent.name(),
+ TxEventMaker.class.getCanonicalName(),
+ uniquify("blah").getBytes());
+ }
+}
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index f44c951..3685894 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -18,7 +18,11 @@
package io.servicecomb.saga.alpha.server;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@@ -26,12 +30,17 @@ import org.springframework.context.annotation.Configuration;
import io.servicecomb.saga.alpha.core.CompositeOmegaCallback;
import io.servicecomb.saga.alpha.core.OmegaCallback;
-import io.servicecomb.saga.alpha.core.RetryOmegaCallback;
+import io.servicecomb.saga.alpha.core.PendingTaskRunner;
+import io.servicecomb.saga.alpha.core.PushBackOmegaCallback;
import io.servicecomb.saga.alpha.core.TxConsistentService;
import io.servicecomb.saga.alpha.core.TxEventRepository;
@Configuration
class AlphaConfig {
+ private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
+
+ @Value("${alpha.compensation.retry.delay:3000}")
+ private int delay;
// TODO: 2017/12/27 to be filled with actual callbacks on completion of SCB-138
@Bean
@@ -40,11 +49,8 @@ class AlphaConfig {
}
@Bean
- OmegaCallback omegaCallback(
- Map<String, Map<String, OmegaCallback>> callbacks,
- @Value("${alpha.compensation.retry.delay:3000}") int delay) {
-
- return new RetryOmegaCallback(new CompositeOmegaCallback(callbacks), delay);
+ OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
+ return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks));
}
@Bean
@@ -68,4 +74,9 @@ class AlphaConfig {
eventRepository,
omegaCallback)));
}
+
+ @PostConstruct
+ void init() {
+ new PendingTaskRunner(pendingCompensations, delay).run();
+ }
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.