You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:49 UTC
[incubator-servicecomb-saga] 07/07: SCB-138 observer maintains its
own services instead of using an extra map
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit b862131fee700572daa8a5c99af9dafe2ba42749
Author: Eric Lee <da...@huawei.com>
AuthorDate: Thu Jan 4 15:37:30 2018 +0800
SCB-138 observer maintains its own services instead of using an extra map
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../servicecomb/saga/alpha/server/AlphaConfig.java | 20 ++++----------------
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 9 ++-------
.../alpha/server/GrpcTxEventStreamObserver.java | 22 ++++++++++++----------
.../server/GrpcTxEventStreamObserverTest.java | 21 ++++++++-------------
4 files changed, 26 insertions(+), 46 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index c8eabdc..f970ecb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -17,7 +17,6 @@
package org.apache.servicecomb.saga.alpha.server;
-import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,13 +30,10 @@ import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import io.grpc.stub.StreamObserver;
-
@Configuration
class AlphaConfig {
private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
@@ -45,18 +41,12 @@ class AlphaConfig {
@Value("${alpha.compensation.retry.delay:3000}")
private int delay;
- // TODO: 2018/01/03 optimize reverse visit of the map instead of using another map, namely omegaCallbacksReverse
@Bean
Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
return new ConcurrentHashMap<>();
}
@Bean
- Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse() {
- return new ConcurrentHashMap<>();
- }
-
- @Bean
OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks));
}
@@ -70,22 +60,20 @@ class AlphaConfig {
TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
TxEventRepository eventRepository,
OmegaCallback omegaCallback,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks,
- Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
- ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks, omegaCallbacksReverse);
+ ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
new Thread(startable::start).start();
return consistentService;
}
private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks,
- Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
return new GrpcStartable(port,
- new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
+ new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
}
@PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 27c524e..0b84217 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,7 +20,6 @@
package org.apache.servicecomb.saga.alpha.server;
-import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -37,18 +36,14 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
- private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
-
GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks,
- Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
this.txConsistentService = txConsistentService;
this.omegaCallbacks = omegaCallbacks;
- this.omegaCallbacksReverse = omegaCallbacksReverse;
}
@Override
public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
- return new GrpcTxEventStreamObserver(omegaCallbacks, omegaCallbacksReverse, txConsistentService, responseObserver);
+ return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 07fe093..108df14 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -24,9 +24,9 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import java.lang.invoke.MethodHandles;
import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Collection;
import java.util.Date;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.grpc.stub.StreamObserver;
+import io.netty.util.internal.ConcurrentSet;
class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
@@ -45,17 +46,15 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
- private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
+ private final Set<SimpleImmutableEntry<String, String>> serviceEntries = new ConcurrentSet<>();
private final TxConsistentService txConsistentService;
private final StreamObserver<GrpcCompensateCommand> responseObserver;
GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
- Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse,
TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
this.omegaCallbacks = omegaCallbacks;
- this.omegaCallbacksReverse = omegaCallbacksReverse;
this.txConsistentService = txConsistentService;
this.responseObserver = responseObserver;
}
@@ -69,7 +68,7 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
Map<String, OmegaCallback> instanceCallback = omegaCallbacks
.computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
instanceCallback.computeIfAbsent(instanceId, v -> new GrpcOmegaCallback(responseObserver));
- omegaCallbacksReverse.computeIfAbsent(responseObserver, v -> new SimpleImmutableEntry<>(serviceName, instanceId));
+ serviceEntries.add(new SimpleImmutableEntry<>(serviceName, instanceId));
}
// store received event
@@ -102,13 +101,16 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
}
private void removeInvalidCallback() {
- Collection<SimpleImmutableEntry<String, String>> services = omegaCallbacksReverse.values();
- for (SimpleImmutableEntry<String, String> pair : services) {
- Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(pair.getKey());
+ for (SimpleImmutableEntry<String, String> entry : serviceEntries) {
+ Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(entry.getKey());
if (instanceCallback != null) {
- instanceCallback.remove(pair.getValue());
+ instanceCallback.remove(entry.getValue());
}
}
- omegaCallbacksReverse.remove(responseObserver);
+ serviceEntries.clear();
+ }
+
+ Set<SimpleImmutableEntry<String, String>> serviceEntries() {
+ return serviceEntries;
}
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
index 2fb3593..6270f42 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -25,6 +25,7 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
@@ -36,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.saga.alpha.core.EventType;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
import org.junit.Before;
import org.junit.Test;
@@ -46,14 +46,12 @@ import io.grpc.stub.StreamObserver;
public class GrpcTxEventStreamObserverTest {
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
- private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
-
private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
private final StreamObserver responseObserver = mock(StreamObserver.class);
- private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks,
- omegaCallbacksReverse, txConsistentService, responseObserver);
+ private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService,
+ responseObserver);
private final String serviceName = "service a";
@@ -68,7 +66,6 @@ public class GrpcTxEventStreamObserverTest {
@Before
public void setUp() throws Exception {
omegaCallbacks.clear();
- omegaCallbacksReverse.clear();
}
@Test
@@ -81,10 +78,8 @@ public class GrpcTxEventStreamObserverTest {
assertThat(callback, is(notNullValue()));
assertThat(((GrpcOmegaCallback) callback).observer(), is(responseObserver));
- assertThat(omegaCallbacksReverse.size(), is(1));
- assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
- assertThat(omegaCallbacksReverse.get(responseObserver).getKey(), is(serviceName));
- assertThat(omegaCallbacksReverse.get(responseObserver).getValue(), is(instanceId));
+ assertThat(observer.serviceEntries().size(), is(1));
+ assertThat(observer.serviceEntries(), hasItem(new SimpleImmutableEntry<>(serviceName, instanceId)));
}
@Test
@@ -93,7 +88,7 @@ public class GrpcTxEventStreamObserverTest {
observer.onNext(startedEvent);
assertThat(omegaCallbacks.size(), is(1));
- assertThat(omegaCallbacksReverse.size(), is(1));
+ assertThat(observer.serviceEntries().size(), is(1));
}
@Test
@@ -108,11 +103,11 @@ public class GrpcTxEventStreamObserverTest {
public void removeOmegaCallbacksOnComplete() {
observer.onNext(startedEvent);
assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(false));
- assertThat(omegaCallbacksReverse.size(), is(1));
+ assertThat(observer.serviceEntries().size(), is(1));
observer.onCompleted();
assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(true));
- assertThat(omegaCallbacksReverse.isEmpty(), is(true));
+ assertThat(observer.serviceEntries().isEmpty(), is(true));
}
private GrpcTxEvent eventOf(String serviceName, String instanceId, EventType type) {
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.