You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:46 UTC

[incubator-servicecomb-saga] 04/07: SCB-138 remove omegaCallback when grpc client disconnects

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 6be42657232e0b58c7325b392323b7ebd8487b33
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 3 18:59:13 2018 +0800

    SCB-138 remove omegaCallback when grpc client disconnects
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 alpha/alpha-server/pom.xml                         |  12 ++
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  28 +++--
 .../saga/alpha/server/GrpcOmegaCallback.java       |  26 +++--
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |   8 +-
 .../alpha/server/GrpcTxEventStreamObserver.java    |  45 +++++++-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  25 ++---
 .../server/GrpcTxEventStreamObserverTest.java      | 122 +++++++++++++++++++++
 7 files changed, 227 insertions(+), 39 deletions(-)

diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index cc4115d..7ecd396 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -88,6 +88,18 @@
       <groupId>com.github.seanyinx</groupId>
       <artifactId>unit-scaffolding</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index e33ba99..7353fa4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -24,16 +24,18 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.PostConstruct;
 
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import io.grpc.stub.StreamObserver;
 
 @Configuration
 class AlphaConfig {
@@ -42,13 +44,18 @@ class AlphaConfig {
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
 
-  // TODO: 2017/12/27 to be filled with actual callbacks on completion of SCB-138
+  // TODO: 2018/01/03 optimize reverse visit of the map instead of using another map, namely omegaCallbacksReverse
   @Bean
   Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
     return new ConcurrentHashMap<>();
   }
 
   @Bean
+  Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse() {
+    return new ConcurrentHashMap<>();
+  }
+
+  @Bean
   OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
     return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks));
   }
@@ -62,19 +69,22 @@ class AlphaConfig {
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
       TxEventRepository eventRepository,
       OmegaCallback omegaCallback,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
-    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
+    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks, omegaCallbacksReverse);
     new Thread(startable::start).start();
 
     return consistentService;
   }
 
   private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-    return new GrpcStartable(port, new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+    return new GrpcStartable(port,
+        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
   }
 
   @PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 43d4ac4..c9cda3a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -20,23 +20,18 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import java.util.Objects;
+
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 
-import java.lang.invoke.MethodHandles;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.protobuf.ByteString;
 
 import io.grpc.stub.StreamObserver;
 
 public class GrpcOmegaCallback implements OmegaCallback {
 
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   private final StreamObserver<GrpcCompensateCommand> observer;
 
   public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
@@ -54,4 +49,21 @@ public class GrpcOmegaCallback implements OmegaCallback {
         .build();
     observer.onNext(command);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    GrpcOmegaCallback that = (GrpcOmegaCallback) o;
+    return Objects.equals(observer, that.observer);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(observer);
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 0b84217..d8d1cea 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -36,14 +36,18 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
+  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+
   GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
     this.txConsistentService = txConsistentService;
     this.omegaCallbacks = omegaCallbacks;
+    this.omegaCallbacksReverse = omegaCallbacksReverse;
   }
 
   @Override
   public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
-    return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
+    return new GrpcTxEventStreamObserver(omegaCallbacks, omegaCallbacksReverse, txConsistentService, responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 16ee788..6a2ae8e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -23,8 +23,12 @@ package org.apache.servicecomb.saga.alpha.server;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -43,13 +47,17 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
+  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+
   private final TxConsistentService txConsistentService;
 
   private final StreamObserver<GrpcCompensateCommand> responseObserver;
 
   GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse,
       TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
     this.omegaCallbacks = omegaCallbacks;
+    this.omegaCallbacksReverse = omegaCallbacksReverse;
     this.txConsistentService = txConsistentService;
     this.responseObserver = responseObserver;
   }
@@ -57,15 +65,21 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   @Override
   public void onNext(GrpcTxEvent message) {
     // register a callback on started event
+    String serviceName = message.getServiceName();
+    String instanceId = message.getInstanceId();
     if (message.getType().equals(TxStartedEvent.name())) {
-      omegaCallbacks.computeIfAbsent(message.getServiceName(), (key) -> new ConcurrentHashMap<>())
-          .put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+      Map<String, OmegaCallback> instanceCallback = omegaCallbacks
+          .computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
+      instanceCallback.putIfAbsent(instanceId, new GrpcOmegaCallback(responseObserver));
+      Map<String, String> serviceInstanceId = omegaCallbacksReverse
+          .computeIfAbsent(responseObserver, v -> new ConcurrentHashMap<>());
+      serviceInstanceId.putIfAbsent(serviceName, instanceId);
     }
 
     // store received event
     txConsistentService.handle(new TxEvent(
-        message.getServiceName(),
-        message.getInstanceId(),
+        serviceName,
+        instanceId,
         new Date(message.getTimestamp()),
         message.getGlobalTxId(),
         message.getLocalTxId(),
@@ -79,11 +93,32 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   @Override
   public void onError(Throwable t) {
     LOG.error("failed to process grpc message.", t);
-    responseObserver.onCompleted();
+    onCompleted();
   }
 
   @Override
   public void onCompleted() {
     responseObserver.onCompleted();
+    removeInvalidCallback();
+  }
+
+  private void removeInvalidCallback() {
+    Collection<Map<String, String>> services = omegaCallbacksReverse.values();
+    for (Map<String, String> service : services) {
+      Set<String> removedServices = new HashSet<>();
+      for (Entry<String, String> entry : service.entrySet()) {
+        String serviceName = entry.getKey();
+        String instanceId = entry.getValue();
+        Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(serviceName);
+        if (instanceCallback != null) {
+          instanceCallback.remove(instanceId);
+          removedServices.add(serviceName);
+        }
+      }
+      for (String removedService : removedServices) {
+        service.remove(removedService);
+      }
+    }
+    omegaCallbacksReverse.remove(responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 5eb1d1b..ca4b7bd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -73,9 +73,6 @@ public class AlphaIntegrationTest {
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
-  // use an empty response observer as we don't need the response in client side
-  private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new EmptyStreamObserver();
-
   private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
   private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
 
@@ -85,18 +82,20 @@ public class AlphaIntegrationTest {
   }
 
   @Before
-  public void before() throws Exception {
+  public void before() {
     eventRepo.deleteAll();
     receivedCommands.clear();
   }
 
   @Test
-  public void persistsEvent() throws Exception {
-    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
+  public void persistsEvent() {
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
     await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
 
+    assertThat(receivedCommands.size(), is(0));
+
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
     assertThat(envelope.serviceName(), is(serviceName));
@@ -110,7 +109,7 @@ public class AlphaIntegrationTest {
   }
 
   @Test
-  public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+  public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
     requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
@@ -171,9 +170,11 @@ public class AlphaIntegrationTest {
         .build();
   }
 
-  private static class EmptyStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+  private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
     @Override
     public void onNext(GrpcCompensateCommand command) {
+      // intercept received command
+      receivedCommands.add(command);
     }
 
     @Override
@@ -184,12 +185,4 @@ public class AlphaIntegrationTest {
     public void onCompleted() {
     }
   }
-
-  private static class CompensateStreamObserver extends EmptyStreamObserver {
-    @Override
-    public void onNext(GrpcCompensateCommand command) {
-      // intercept received command
-      receivedCommands.add(command);
-    }
-  }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
new file mode 100644
index 0000000..a40498e
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.servicecomb.saga.alpha.core.EventType;
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcTxEventStreamObserverTest {
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
+
+  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
+
+  private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
+
+  private final StreamObserver responseObserver = mock(StreamObserver.class);
+
+  private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks,
+      omegaCallbacksReverse, txConsistentService, responseObserver);
+
+  private final String serviceName = "service a";
+
+  private final String instanceId = "instance a";
+
+  private final GrpcTxEvent startedEvent = eventOf(serviceName, instanceId, TxStartedEvent);
+
+  private final GrpcTxEvent abortedEvent = eventOf(serviceName, instanceId, TxAbortedEvent);
+
+  private final GrpcTxEvent endedEvent = eventOf(serviceName, instanceId, TxEndedEvent);
+
+  @Before
+  public void setUp() throws Exception {
+    omegaCallbacks.clear();
+    omegaCallbacksReverse.clear();
+  }
+
+  @Test
+  public void updateOmegaCallbacksOnStartedEvent() {
+    observer.onNext(startedEvent);
+
+    assertThat(omegaCallbacks.size(), is(1));
+    assertThat(omegaCallbacks.getOrDefault(serviceName, null), is(notNullValue()));
+    assertThat(omegaCallbacks.get(serviceName).getOrDefault(instanceId, null),
+        is(new GrpcOmegaCallback(responseObserver)));
+
+    assertThat(omegaCallbacksReverse.size(), is(1));
+    assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
+    assertThat(omegaCallbacksReverse.get(responseObserver).getOrDefault(serviceName, null), is(instanceId));
+  }
+
+  @Test
+  public void duplicateEventsOnlyHoldsOneOmegaCallback() {
+    observer.onNext(startedEvent);
+    observer.onNext(startedEvent);
+
+    assertThat(omegaCallbacks.size(), is(1));
+    assertThat(omegaCallbacksReverse.size(), is(1));
+  }
+
+  @Test
+  public void omegaCallbacksNotChangeOnOtherEvents() {
+    observer.onNext(abortedEvent);
+    observer.onNext(endedEvent);
+
+    assertThat(omegaCallbacks.isEmpty(), is(true));
+  }
+
+  @Test
+  public void removeOmegaCallbacksOnComplete() {
+    observer.onNext(startedEvent);
+    assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(false));
+    assertThat(omegaCallbacksReverse.size(), is(1));
+
+    observer.onCompleted();
+    assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(true));
+    assertThat(omegaCallbacksReverse.isEmpty(), is(true));
+  }
+
+  private GrpcTxEvent eventOf(String serviceName, String instanceId, EventType type) {
+    return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setType(type.name())
+        .build();
+  }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.