You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/05 10:38:16 UTC

[incubator-servicecomb-saga] 01/02: SCB-183 omega callback cleanup on client down

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-183_callback_cleanup
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 99964a83301f24ebb782a23898ac42392cb3063f
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 5 17:55:59 2018 +0800

    SCB-183 omega callback cleanup on client down
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/CompositeOmegaCallback.java    | 13 +++-
 ...allback.java => SelfCleaningOmegaCallback.java} | 36 +++++++----
 .../alpha/core/CompositeOmegaCallbackTest.java     | 55 +++++++++++++++-
 .../alpha/core/SelfCleaningOmegaCallbackTest.java  | 74 ++++++++++++++++++++++
 .../saga/alpha/server/AlphaIntegrationTest.java    | 31 +++++++++
 5 files changed, 190 insertions(+), 19 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
index 86f5839..32e5102 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.Collections.emptyMap;
+
 import java.util.Map;
 
 public class CompositeOmegaCallback implements OmegaCallback {
@@ -28,7 +30,7 @@ public class CompositeOmegaCallback implements OmegaCallback {
 
   @Override
   public void compensate(TxEvent event) {
-    Map<String, OmegaCallback> serviceCallbacks = callbacks.get(event.serviceName());
+    Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
 
     if (serviceCallbacks.isEmpty()) {
       throw new AlphaException("No such omega callback found for service " + event.serviceName());
@@ -36,9 +38,14 @@ public class CompositeOmegaCallback implements OmegaCallback {
 
     OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
     if (omegaCallback == null) {
-      serviceCallbacks.values().iterator().next().compensate(event);
-    } else {
+      omegaCallback = serviceCallbacks.values().iterator().next();
+    }
+
+    try {
       omegaCallback.compensate(event);
+    } catch (Exception e) {
+      serviceCallbacks.values().remove(omegaCallback);
+      throw e;
     }
   }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallback.java
similarity index 52%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallback.java
index 86f5839..d7b984d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallback.java
@@ -17,28 +17,38 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
 
-public class CompositeOmegaCallback implements OmegaCallback {
-  private final Map<String, Map<String, OmegaCallback>> callbacks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-  public CompositeOmegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
+public class SelfCleaningOmegaCallback implements OmegaCallback {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final String instanceId;
+  private final OmegaCallback underlying;
+  private final Map<String, OmegaCallback> callbacks;
+
+  SelfCleaningOmegaCallback(String instanceId, OmegaCallback underlying, Map<String, OmegaCallback> callbacks) {
+    this.instanceId = instanceId;
+    this.underlying = underlying;
     this.callbacks = callbacks;
   }
 
   @Override
   public void compensate(TxEvent event) {
-    Map<String, OmegaCallback> serviceCallbacks = callbacks.get(event.serviceName());
-
-    if (serviceCallbacks.isEmpty()) {
-      throw new AlphaException("No such omega callback found for service " + event.serviceName());
+    try {
+      underlying.compensate(event);
+    } catch (Exception e) {
+      callbacks.remove(instanceId);
+      log.error("Removed omega callback with instance id [{}] due to connection disruption", instanceId, e);
+      throw e;
     }
+  }
 
-    OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
-    if (omegaCallback == null) {
-      serviceCallbacks.values().iterator().next().compensate(event);
-    } else {
-      omegaCallback.compensate(event);
-    }
+  @Override
+  public void disconnect() {
+    underlying.disconnect();
   }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
index 0c24549..fd38061 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
@@ -19,8 +19,12 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
@@ -65,7 +69,7 @@ public class CompositeOmegaCallbackTest {
 
   @Test
   public void compensateCorrespondingOmegaInstanceOnly() throws Exception {
-    TxEvent event = eventOf(serviceName2, instanceId2One, EventType.TxStartedEvent);
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
 
     compositeOmegaCallback.compensate(event);
 
@@ -73,12 +77,15 @@ public class CompositeOmegaCallbackTest {
     verify(callback1Two, never()).compensate(event);
     verify(callback2One).compensate(event);
     verify(callback2Two, never()).compensate(event);
+
+    assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One, callback1Two));
+    assertThat(callbacks.get(serviceName2).values(), containsInAnyOrder(callback2One, callback2Two));
   }
 
   @Test
   public void compensateOtherOmegaInstance_IfTheRequestedIsUnreachable() throws Exception {
     callbacks.get(serviceName2).remove(instanceId2One);
-    TxEvent event = eventOf(serviceName2, instanceId2One, EventType.TxStartedEvent);
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
 
     compositeOmegaCallback.compensate(event);
 
@@ -86,12 +93,15 @@ public class CompositeOmegaCallbackTest {
     verify(callback1Two, never()).compensate(event);
     verify(callback2One, never()).compensate(event);
     verify(callback2Two).compensate(event);
+
+    assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One, callback1Two));
+    assertThat(callbacks.get(serviceName2).values(), containsInAnyOrder(callback2Two));
   }
 
   @Test
   public void blowsUpIfNoSuchServiceIsReachable() throws Exception {
     callbacks.get(serviceName2).clear();
-    TxEvent event = eventOf(serviceName2, instanceId2One, EventType.TxStartedEvent);
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
 
     try {
       compositeOmegaCallback.compensate(event);
@@ -104,6 +114,45 @@ public class CompositeOmegaCallbackTest {
     verify(callback1Two, never()).compensate(event);
     verify(callback2One, never()).compensate(event);
     verify(callback2Two, never()).compensate(event);
+
+    assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One, callback1Two));
+    assertThat(callbacks.get(serviceName2).isEmpty(), is(true));
+  }
+
+  @Test
+  public void blowsUpIfNoSuchServiceFound() throws Exception {
+    callbacks.remove(serviceName2);
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
+
+    try {
+      compositeOmegaCallback.compensate(event);
+      expectFailing(AlphaException.class);
+    } catch (AlphaException e) {
+      assertThat(e.getMessage(), is("No such omega callback found for service " + serviceName2));
+    }
+
+    verify(callback1One, never()).compensate(event);
+    verify(callback1Two, never()).compensate(event);
+    verify(callback2One, never()).compensate(event);
+    verify(callback2Two, never()).compensate(event);
+
+    assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One, callback1Two));
+    assertThat(callbacks.containsKey(serviceName2), is(false));
+  }
+
+  @Test
+  public void removeCallbackOnException() throws Exception {
+    doThrow(RuntimeException.class).when(callback1Two).compensate(any(TxEvent.class));
+    TxEvent event = eventOf(serviceName1, instanceId1Two, TxStartedEvent);
+
+    try {
+      compositeOmegaCallback.compensate(event);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException ignored) {
+    }
+
+    assertThat(callbacks.get(serviceName1).values(), containsInAnyOrder(callback1One));
+    assertThat(callbacks.get(serviceName2).values(), containsInAnyOrder(callback2One, callback2Two));
   }
 
   private TxEvent eventOf(String serviceName, String instanceId, EventType eventType) {
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallbackTest.java
new file mode 100644
index 0000000..d6253a8
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/SelfCleaningOmegaCallbackTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SelfCleaningOmegaCallbackTest {
+  private final TxEvent event = Mockito.mock(TxEvent.class);
+  private final String someId = uniquify("someId");
+
+  private final Map<String, OmegaCallback> callbacks = new HashMap<>();
+  private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
+  private final SelfCleaningOmegaCallback callback = new SelfCleaningOmegaCallback(someId, underlying, callbacks);
+
+  @Before
+  public void setUp() throws Exception {
+    callbacks.put(someId, callback);
+  }
+
+  @Test
+  public void keepItselfInCallbacksWhenNormal() throws Exception {
+    callback.compensate(event);
+
+    assertThat(callbacks.get(someId), is(callback));
+  }
+
+  @Test
+  public void removeItselfFromCallbacksOnException() throws Exception {
+    doThrow(RuntimeException.class).when(underlying).compensate(any(TxEvent.class));
+
+    try {
+      callback.compensate(event);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException ignored) {
+    }
+
+    assertThat(callbacks.isEmpty(), is(true));
+  }
+
+  @Test
+  public void disconnectWithUnderlying() throws Exception {
+    callback.disconnect();
+
+    verify(underlying).disconnect();
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 2f3a3ec..21e46cd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -35,6 +36,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.servicecomb.saga.alpha.core.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
@@ -88,6 +91,9 @@ public class AlphaIntegrationTest {
   @Autowired
   private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
+  @Autowired
+  private TxConsistentService consistentService;
+
   private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
   private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver();
 
@@ -169,6 +175,18 @@ public class AlphaIntegrationTest {
   }
 
   @Test
+  public void removeCallbackOnClientDown() throws Exception {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+
+    omegaCallbacks.get(serviceName).get(instanceId).disconnect();
+
+    consistentService.handle(someTxAbortEvent(serviceName, instanceId));
+
+    await().atMost(1, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
+  }
+
+  @Test
   public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
@@ -238,6 +256,19 @@ public class AlphaIntegrationTest {
         .build();
   }
 
+  private TxEvent someTxAbortEvent(String serviceName, String instanceId) {
+    return new TxEvent(
+        serviceName,
+        instanceId,
+        new Date(),
+        globalTxId,
+        localTxId,
+        parentTxId,
+        TxAbortedEvent.name(),
+        compensationMethod,
+        payload.getBytes());
+  }
+
   private GrpcTxEvent someGrpcEvent(EventType type) {
     return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.