You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:48 UTC

[incubator-servicecomb-saga] 06/07: SCB-138 use SimpleImmutableEntry instead of map to record relationship of response observer and service

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 78b1dd73cf5be288b74cf492a37e3fd3e9b658a0
Author: Eric Lee <da...@huawei.com>
AuthorDate: Thu Jan 4 11:34:38 2018 +0800

    SCB-138 use SimpleImmutableEntry instead of map to record relationship of response observer and service
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  7 +++--
 .../saga/alpha/server/GrpcOmegaCallback.java       | 23 +++------------
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  5 ++--
 .../alpha/server/GrpcTxEventStreamObserver.java    | 33 +++++++---------------
 .../saga/alpha/server/AlphaIntegrationTest.java    |  6 ++--
 .../server/GrpcTxEventStreamObserverTest.java      | 11 +++++---
 .../connector/grpc/GrpcClientMessageSender.java    |  2 +-
 .../grpc/GrpcCompensateStreamObserver.java         |  2 +-
 8 files changed, 34 insertions(+), 55 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 7353fa4..c8eabdc 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -51,7 +52,7 @@ class AlphaConfig {
   }
 
   @Bean
-  Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse() {
+  Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse() {
     return new ConcurrentHashMap<>();
   }
 
@@ -70,7 +71,7 @@ class AlphaConfig {
       TxEventRepository eventRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
@@ -82,7 +83,7 @@ class AlphaConfig {
 
   private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
     return new GrpcStartable(port,
         new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index c9cda3a..c576552 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -20,8 +20,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import java.util.Objects;
-
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -38,6 +36,10 @@ public class GrpcOmegaCallback implements OmegaCallback {
     this.observer = observer;
   }
 
+  StreamObserver<GrpcCompensateCommand> observer() {
+    return observer;
+  }
+
   @Override
   public void compensate(TxEvent event) {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
@@ -49,21 +51,4 @@ public class GrpcOmegaCallback implements OmegaCallback {
         .build();
     observer.onNext(command);
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    GrpcOmegaCallback that = (GrpcOmegaCallback) o;
-    return Objects.equals(observer, that.observer);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(observer);
-  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index d8d1cea..27c524e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,6 +20,7 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -36,11 +37,11 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
 
   GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
     this.txConsistentService = txConsistentService;
     this.omegaCallbacks = omegaCallbacks;
     this.omegaCallbacksReverse = omegaCallbacksReverse;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 623355c..07fe093 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -23,12 +23,10 @@ package org.apache.servicecomb.saga.alpha.server;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Collection;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -47,14 +45,14 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
 
   private final TxConsistentService txConsistentService;
 
   private final StreamObserver<GrpcCompensateCommand> responseObserver;
 
   GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse,
+      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse,
       TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
     this.omegaCallbacks = omegaCallbacks;
     this.omegaCallbacksReverse = omegaCallbacksReverse;
@@ -70,10 +68,8 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
     if (message.getType().equals(TxStartedEvent.name())) {
       Map<String, OmegaCallback> instanceCallback = omegaCallbacks
           .computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
-      instanceCallback.putIfAbsent(instanceId, new GrpcOmegaCallback(responseObserver));
-      Map<String, String> serviceInstanceId = omegaCallbacksReverse
-          .computeIfAbsent(responseObserver, v -> new ConcurrentHashMap<>());
-      serviceInstanceId.putIfAbsent(serviceName, instanceId);
+      instanceCallback.computeIfAbsent(instanceId, v -> new GrpcOmegaCallback(responseObserver));
+      omegaCallbacksReverse.computeIfAbsent(responseObserver, v -> new SimpleImmutableEntry<>(serviceName, instanceId));
     }
 
     // store received event
@@ -106,20 +102,11 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   }
 
   private void removeInvalidCallback() {
-    Collection<Map<String, String>> services = omegaCallbacksReverse.values();
-    for (Map<String, String> service : services) {
-      Set<String> removedServices = new HashSet<>();
-      for (Entry<String, String> entry : service.entrySet()) {
-        String serviceName = entry.getKey();
-        String instanceId = entry.getValue();
-        Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(serviceName);
-        if (instanceCallback != null) {
-          instanceCallback.remove(instanceId);
-          removedServices.add(serviceName);
-        }
-      }
-      for (String removedService : removedServices) {
-        service.remove(removedService);
+    Collection<SimpleImmutableEntry<String, String>> services = omegaCallbacksReverse.values();
+    for (SimpleImmutableEntry<String, String> pair : services) {
+      Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(pair.getKey());
+      if (instanceCallback != null) {
+        instanceCallback.remove(pair.getValue());
       }
     }
     omegaCallbacksReverse.remove(responseObserver);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 55c095e..a406acd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -94,7 +94,7 @@ public class AlphaIntegrationTest {
     // use the asynchronous stub need to wait for some time
     await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
 
-    assertThat(receivedCommands.size(), is(0));
+    assertThat(receivedCommands.isEmpty(), is(true));
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
@@ -122,8 +122,8 @@ public class AlphaIntegrationTest {
     requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method b"));
 
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
-
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+
     assertThat(receivedCommands, containsInAnyOrder(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
             .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
@@ -137,8 +137,10 @@ public class AlphaIntegrationTest {
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxStartedEvent));
     await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
     assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
     assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
     assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
index a40498e..2fb3593 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +46,7 @@ import io.grpc.stub.StreamObserver;
 public class GrpcTxEventStreamObserverTest {
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
+  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
 
   private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
 
@@ -76,12 +77,14 @@ public class GrpcTxEventStreamObserverTest {
 
     assertThat(omegaCallbacks.size(), is(1));
     assertThat(omegaCallbacks.getOrDefault(serviceName, null), is(notNullValue()));
-    assertThat(omegaCallbacks.get(serviceName).getOrDefault(instanceId, null),
-        is(new GrpcOmegaCallback(responseObserver)));
+    OmegaCallback callback = omegaCallbacks.get(serviceName).getOrDefault(instanceId, null);
+    assertThat(callback, is(notNullValue()));
+    assertThat(((GrpcOmegaCallback) callback).observer(), is(responseObserver));
 
     assertThat(omegaCallbacksReverse.size(), is(1));
     assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
-    assertThat(omegaCallbacksReverse.get(responseObserver).getOrDefault(serviceName, null), is(instanceId));
+    assertThat(omegaCallbacksReverse.get(responseObserver).getKey(), is(serviceName));
+    assertThat(omegaCallbacksReverse.get(responseObserver).getValue(), is(instanceId));
   }
 
   @Test
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 482e03a..ac522b0 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -42,7 +42,7 @@ public class GrpcClientMessageSender implements MessageSender {
   private final MessageSerializer serializer;
   private final ServiceConfig serviceConfig;
 
-  private StreamObserver<GrpcTxEvent> requestObserver;
+  private final StreamObserver<GrpcTxEvent> requestObserver;
 
   public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
       MessageHandler handler) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index d5c757d..1429da6 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -41,7 +41,7 @@ public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensa
 
   @Override
   public void onNext(GrpcCompensateCommand command) {
-    LOG.info("receive compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
+    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
         command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
     messageHandler.onReceive(
         command.getGlobalTxId(),

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.