You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:49 UTC

[incubator-servicecomb-saga] 07/07: SCB-138 observer maintains its own services instead of using an extra map

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit b862131fee700572daa8a5c99af9dafe2ba42749
Author: Eric Lee <da...@huawei.com>
AuthorDate: Thu Jan 4 15:37:30 2018 +0800

    SCB-138 observer maintains its own services instead of using an extra map
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 20 ++++----------------
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  9 ++-------
 .../alpha/server/GrpcTxEventStreamObserver.java    | 22 ++++++++++++----------
 .../server/GrpcTxEventStreamObserverTest.java      | 21 ++++++++-------------
 4 files changed, 26 insertions(+), 46 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index c8eabdc..f970ecb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -17,7 +17,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,13 +30,10 @@ import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import io.grpc.stub.StreamObserver;
-
 @Configuration
 class AlphaConfig {
   private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
@@ -45,18 +41,12 @@ class AlphaConfig {
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
 
-  // TODO: 2018/01/03 optimize reverse visit of the map instead of using another map, namely omegaCallbacksReverse
   @Bean
   Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
     return new ConcurrentHashMap<>();
   }
 
   @Bean
-  Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse() {
-    return new ConcurrentHashMap<>();
-  }
-
-  @Bean
   OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
     return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks));
   }
@@ -70,22 +60,20 @@ class AlphaConfig {
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
       TxEventRepository eventRepository,
       OmegaCallback omegaCallback,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
-    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks, omegaCallbacksReverse);
+    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
 
     return consistentService;
   }
 
   private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     return new GrpcStartable(port,
-        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
+        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
   }
 
   @PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 27c524e..0b84217 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,7 +20,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -37,18 +36,14 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
-
   GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     this.txConsistentService = txConsistentService;
     this.omegaCallbacks = omegaCallbacks;
-    this.omegaCallbacksReverse = omegaCallbacksReverse;
   }
 
   @Override
   public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
-    return new GrpcTxEventStreamObserver(omegaCallbacks, omegaCallbacksReverse, txConsistentService, responseObserver);
+    return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 07fe093..108df14 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -24,9 +24,9 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
 import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Collection;
 import java.util.Date;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.grpc.stub.StreamObserver;
+import io.netty.util.internal.ConcurrentSet;
 
 class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
@@ -45,17 +46,15 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
+  private final Set<SimpleImmutableEntry<String, String>> serviceEntries = new ConcurrentSet<>();
 
   private final TxConsistentService txConsistentService;
 
   private final StreamObserver<GrpcCompensateCommand> responseObserver;
 
   GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse,
       TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
     this.omegaCallbacks = omegaCallbacks;
-    this.omegaCallbacksReverse = omegaCallbacksReverse;
     this.txConsistentService = txConsistentService;
     this.responseObserver = responseObserver;
   }
@@ -69,7 +68,7 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
       Map<String, OmegaCallback> instanceCallback = omegaCallbacks
           .computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
       instanceCallback.computeIfAbsent(instanceId, v -> new GrpcOmegaCallback(responseObserver));
-      omegaCallbacksReverse.computeIfAbsent(responseObserver, v -> new SimpleImmutableEntry<>(serviceName, instanceId));
+      serviceEntries.add(new SimpleImmutableEntry<>(serviceName, instanceId));
     }
 
     // store received event
@@ -102,13 +101,16 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   }
 
   private void removeInvalidCallback() {
-    Collection<SimpleImmutableEntry<String, String>> services = omegaCallbacksReverse.values();
-    for (SimpleImmutableEntry<String, String> pair : services) {
-      Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(pair.getKey());
+    for (SimpleImmutableEntry<String, String> entry : serviceEntries) {
+      Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(entry.getKey());
       if (instanceCallback != null) {
-        instanceCallback.remove(pair.getValue());
+        instanceCallback.remove(entry.getValue());
       }
     }
-    omegaCallbacksReverse.remove(responseObserver);
+    serviceEntries.clear();
+  }
+
+  Set<SimpleImmutableEntry<String, String>> serviceEntries() {
+    return serviceEntries;
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
index 2fb3593..6270f42 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -25,6 +25,7 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 
@@ -36,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.alpha.core.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,14 +46,12 @@ import io.grpc.stub.StreamObserver;
 public class GrpcTxEventStreamObserverTest {
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
-
   private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
 
   private final StreamObserver responseObserver = mock(StreamObserver.class);
 
-  private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks,
-      omegaCallbacksReverse, txConsistentService, responseObserver);
+  private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService,
+      responseObserver);
 
   private final String serviceName = "service a";
 
@@ -68,7 +66,6 @@ public class GrpcTxEventStreamObserverTest {
   @Before
   public void setUp() throws Exception {
     omegaCallbacks.clear();
-    omegaCallbacksReverse.clear();
   }
 
   @Test
@@ -81,10 +78,8 @@ public class GrpcTxEventStreamObserverTest {
     assertThat(callback, is(notNullValue()));
     assertThat(((GrpcOmegaCallback) callback).observer(), is(responseObserver));
 
-    assertThat(omegaCallbacksReverse.size(), is(1));
-    assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
-    assertThat(omegaCallbacksReverse.get(responseObserver).getKey(), is(serviceName));
-    assertThat(omegaCallbacksReverse.get(responseObserver).getValue(), is(instanceId));
+    assertThat(observer.serviceEntries().size(), is(1));
+    assertThat(observer.serviceEntries(), hasItem(new SimpleImmutableEntry<>(serviceName, instanceId)));
   }
 
   @Test
@@ -93,7 +88,7 @@ public class GrpcTxEventStreamObserverTest {
     observer.onNext(startedEvent);
 
     assertThat(omegaCallbacks.size(), is(1));
-    assertThat(omegaCallbacksReverse.size(), is(1));
+    assertThat(observer.serviceEntries().size(), is(1));
   }
 
   @Test
@@ -108,11 +103,11 @@ public class GrpcTxEventStreamObserverTest {
   public void removeOmegaCallbacksOnComplete() {
     observer.onNext(startedEvent);
     assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(false));
-    assertThat(omegaCallbacksReverse.size(), is(1));
+    assertThat(observer.serviceEntries().size(), is(1));
 
     observer.onCompleted();
     assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(true));
-    assertThat(omegaCallbacksReverse.isEmpty(), is(true));
+    assertThat(observer.serviceEntries().isEmpty(), is(true));
   }
 
   private GrpcTxEvent eventOf(String serviceName, String instanceId, EventType type) {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.