You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/05 10:38:16 UTC
[incubator-servicecomb-saga] 01/02: SCB-183 omega callback cleanup
on client down
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-183_callback_cleanup
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 99964a83301f24ebb782a23898ac42392cb3063f
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 5 17:55:59 2018 +0800
SCB-183 omega callback cleanup on client down
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/core/CompositeOmegaCallback.java | 13 +++-
...allback.java => SelfCleaningOmegaCallback.java} | 36 +++++++----
.../alpha/core/CompositeOmegaCallbackTest.java | 55 +++++++++++++++-
.../alpha/core/SelfCleaningOmegaCallbackTest.java | 74 ++++++++++++++++++++++
.../saga/alpha/server/AlphaIntegrationTest.java | 31 +++++++++
5 files changed, 190 insertions(+), 19 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
index 86f5839..32e5102 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
@@ -17,6 +17,8 @@
package org.apache.servicecomb.saga.alpha.core;
+import static java.util.Collections.emptyMap;
+
import java.util.Map;
public class CompositeOmegaCallback implements OmegaCallback {
@@ -28,7 +30,7 @@ public class CompositeOmegaCallback implements OmegaCallback {
@Override
public void compensate(TxEvent event) {
- Map<String, OmegaCallback> serviceCallbacks = callbacks.get(event.serviceName());
+ Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
if (serviceCallbacks.isEmpty()) {
throw new AlphaException("No such omega callback found for service " + event.serviceName());
@@ -36,9 +38,14 @@ public class CompositeOmegaCallback implements OmegaCallback {
OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
if (omegaCallback == null) {
- serviceCallbacks.values().iterator().next().compensate(event);
- } else {
+ omegaCallback = serviceCallbacks.values().iterator().next();
+ }
+
+ try {
omegaCallback.compensate(event);
+ } catch (Exception e) {
+ serviceCallbacks.values().remove(omegaCallback);
+ throw e;
}
}
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallback.java
similarity index 52%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallback.java
index 86f5839..d7b984d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallback.java
@@ -17,28 +17,38 @@
package org.apache.servicecomb.saga.alpha.core;
+import java.lang.invoke.MethodHandles;
import java.util.Map;
-public class CompositeOmegaCallback implements OmegaCallback {
- private final Map<String, Map<String, OmegaCallback>> callbacks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- public CompositeOmegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
+public class SelfCleaningOmegaCallback implements OmegaCallback {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final String instanceId;
+ private final OmegaCallback underlying;
+ private final Map<String, OmegaCallback> callbacks;
+
+ SelfCleaningOmegaCallback(String instanceId, OmegaCallback underlying, Map<String, OmegaCallback> callbacks) {
+ this.instanceId = instanceId;
+ this.underlying = underlying;
this.callbacks = callbacks;
}
@Override
public void compensate(TxEvent event) {
- Map<String, OmegaCallback> serviceCallbacks = callbacks.get(event.serviceName());
-
- if (serviceCallbacks.isEmpty()) {
- throw new AlphaException("No such omega callback found for service " + event.serviceName());
+ try {
+ underlying.compensate(event);
+ } catch (Exception e) {
+ callbacks.remove(instanceId);
+ log.error("Removed omega callback with instance id [{}] due to connection disruption", instanceId, e);
+ throw e;
}
+ }
- OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
- if (omegaCallback == null) {
- serviceCallbacks.values().iterator().next().compensate(event);
- } else {
- omegaCallback.compensate(event);
- }
+ @Override
+ public void disconnect() {
+ underlying.disconnect();
}
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
index 0c24549..fd38061 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
@@ -19,8 +19,12 @@ package org.apache.servicecomb.saga.alpha.core;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -65,7 +69,7 @@ public class CompositeOmegaCallbackTest {
@Test
public void compensateCorrespondingOmegaInstanceOnly() throws Exception {
- TxEvent event = eventOf(serviceName2, instanceId2One, EventType.TxStartedEvent);
+ TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
compositeOmegaCallback.compensate(event);
@@ -73,12 +77,15 @@ public class CompositeOmegaCallbackTest {
verify(callback1Two, never()).compensate(event);
verify(callback2One).compensate(event);
verify(callback2Two, never()).compensate(event);
+
+ assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One, callback1Two));
+ assertThat(callbacks.get(serviceName2).values(), containsInAnyOrder(callback2One, callback2Two));
}
@Test
public void compensateOtherOmegaInstance_IfTheRequestedIsUnreachable() throws Exception {
callbacks.get(serviceName2).remove(instanceId2One);
- TxEvent event = eventOf(serviceName2, instanceId2One, EventType.TxStartedEvent);
+ TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
compositeOmegaCallback.compensate(event);
@@ -86,12 +93,15 @@ public class CompositeOmegaCallbackTest {
verify(callback1Two, never()).compensate(event);
verify(callback2One, never()).compensate(event);
verify(callback2Two).compensate(event);
+
+ assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One, callback1Two));
+ assertThat(callbacks.get(serviceName2).values(), containsInAnyOrder(callback2Two));
}
@Test
public void blowsUpIfNoSuchServiceIsReachable() throws Exception {
callbacks.get(serviceName2).clear();
- TxEvent event = eventOf(serviceName2, instanceId2One, EventType.TxStartedEvent);
+ TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
try {
compositeOmegaCallback.compensate(event);
@@ -104,6 +114,45 @@ public class CompositeOmegaCallbackTest {
verify(callback1Two, never()).compensate(event);
verify(callback2One, never()).compensate(event);
verify(callback2Two, never()).compensate(event);
+
+ assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One, callback1Two));
+ assertThat(callbacks.get(serviceName2).isEmpty(), is(true));
+ }
+
+ @Test
+ public void blowsUpIfNoSuchServiceFound() throws Exception {
+ callbacks.remove(serviceName2);
+ TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
+
+ try {
+ compositeOmegaCallback.compensate(event);
+ expectFailing(AlphaException.class);
+ } catch (AlphaException e) {
+ assertThat(e.getMessage(), is("No such omega callback found for service " + serviceName2));
+ }
+
+ verify(callback1One, never()).compensate(event);
+ verify(callback1Two, never()).compensate(event);
+ verify(callback2One, never()).compensate(event);
+ verify(callback2Two, never()).compensate(event);
+
+ assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One, callback1Two));
+ assertThat(callbacks.containsKey(serviceName2), is(false));
+ }
+
+ @Test
+ public void removeCallbackOnException() throws Exception {
+ doThrow(RuntimeException.class).when(callback1Two).compensate(any(TxEvent.class));
+ TxEvent event = eventOf(serviceName1, instanceId1Two, TxStartedEvent);
+
+ try {
+ compositeOmegaCallback.compensate(event);
+ expectFailing(RuntimeException.class);
+ } catch (RuntimeException ignored) {
+ }
+
+ assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One));
+ assertThat(callbacks.get(serviceName2).values(), containsInAnyOrder(callback2One, callback2Two));
}
private TxEvent eventOf(String serviceName, String instanceId, EventType eventType) {
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallbackTest.java
new file mode 100644
index 0000000..d6253a8
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallbackTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SelfCleaningOmegaCallbackTest {
+ private final TxEvent event = Mockito.mock(TxEvent.class);
+ private final String someId = uniquify("someId");
+
+ private final Map<String, OmegaCallback> callbacks = new HashMap<>();
+ private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
+ private final SelfCleaningOmegaCallback callback = new SelfCleaningOmegaCallback(someId, underlying, callbacks);
+
+ @Before
+ public void setUp() throws Exception {
+ callbacks.put(someId, callback);
+ }
+
+ @Test
+ public void keepItselfInCallbacksWhenNormal() throws Exception {
+ callback.compensate(event);
+
+ assertThat(callbacks.get(someId), is(callback));
+ }
+
+ @Test
+ public void removeItselfFromCallbacksOnException() throws Exception {
+ doThrow(RuntimeException.class).when(underlying).compensate(any(TxEvent.class));
+
+ try {
+ callback.compensate(event);
+ expectFailing(RuntimeException.class);
+ } catch (RuntimeException ignored) {
+ }
+
+ assertThat(callbacks.isEmpty(), is(true));
+ }
+
+ @Test
+ public void disconnectWithUnderlying() throws Exception {
+ callback.disconnect();
+
+ verify(underlying).disconnect();
+ }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 2f3a3ec..21e46cd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -35,6 +36,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.servicecomb.saga.alpha.core.EventType;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
@@ -88,6 +91,9 @@ public class AlphaIntegrationTest {
@Autowired
private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+ @Autowired
+ private TxConsistentService consistentService;
+
private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver();
@@ -169,6 +175,18 @@ public class AlphaIntegrationTest {
}
@Test
+ public void removeCallbackOnClientDown() throws Exception {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+
+ omegaCallbacks.get(serviceName).get(instanceId).disconnect();
+
+ consistentService.handle(someTxAbortEvent(serviceName, instanceId));
+
+ await().atMost(1, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
+ }
+
+ @Test
public void doNotCompensateDuplicateTxOnFailure() {
// duplicate events with same content but different timestamp
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
@@ -238,6 +256,19 @@ public class AlphaIntegrationTest {
.build();
}
+ private TxEvent someTxAbortEvent(String serviceName, String instanceId) {
+ return new TxEvent(
+ serviceName,
+ instanceId,
+ new Date(),
+ globalTxId,
+ localTxId,
+ parentTxId,
+ TxAbortedEvent.name(),
+ compensationMethod,
+ payload.getBytes());
+ }
+
private GrpcTxEvent someGrpcEvent(EventType type) {
return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.