You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:48 UTC
[incubator-servicecomb-saga] 06/07: SCB-138 use
SimpleImmutableEntry instead of map to record relationship of response
observer and service
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 78b1dd73cf5be288b74cf492a37e3fd3e9b658a0
Author: Eric Lee <da...@huawei.com>
AuthorDate: Thu Jan 4 11:34:38 2018 +0800
SCB-138 use SimpleImmutableEntry instead of map to record relationship of response observer and service
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../servicecomb/saga/alpha/server/AlphaConfig.java | 7 +++--
.../saga/alpha/server/GrpcOmegaCallback.java | 23 +++------------
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 5 ++--
.../alpha/server/GrpcTxEventStreamObserver.java | 33 +++++++---------------
.../saga/alpha/server/AlphaIntegrationTest.java | 6 ++--
.../server/GrpcTxEventStreamObserverTest.java | 11 +++++---
.../connector/grpc/GrpcClientMessageSender.java | 2 +-
.../grpc/GrpcCompensateStreamObserver.java | 2 +-
8 files changed, 34 insertions(+), 55 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 7353fa4..c8eabdc 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.saga.alpha.server;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -51,7 +52,7 @@ class AlphaConfig {
}
@Bean
- Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse() {
+ Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse() {
return new ConcurrentHashMap<>();
}
@@ -70,7 +71,7 @@ class AlphaConfig {
TxEventRepository eventRepository,
OmegaCallback omegaCallback,
Map<String, Map<String, OmegaCallback>> omegaCallbacks,
- Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+ Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
@@ -82,7 +83,7 @@ class AlphaConfig {
private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
Map<String, Map<String, OmegaCallback>> omegaCallbacks,
- Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+ Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
return new GrpcStartable(port,
new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index c9cda3a..c576552 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -20,8 +20,6 @@
package org.apache.servicecomb.saga.alpha.server;
-import java.util.Objects;
-
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -38,6 +36,10 @@ public class GrpcOmegaCallback implements OmegaCallback {
this.observer = observer;
}
+ StreamObserver<GrpcCompensateCommand> observer() {
+ return observer;
+ }
+
@Override
public void compensate(TxEvent event) {
GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
@@ -49,21 +51,4 @@ public class GrpcOmegaCallback implements OmegaCallback {
.build();
observer.onNext(command);
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- GrpcOmegaCallback that = (GrpcOmegaCallback) o;
- return Objects.equals(observer, that.observer);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(observer);
- }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index d8d1cea..27c524e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,6 +20,7 @@
package org.apache.servicecomb.saga.alpha.server;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -36,11 +37,11 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
- private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+ private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
Map<String, Map<String, OmegaCallback>> omegaCallbacks,
- Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+ Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
this.txConsistentService = txConsistentService;
this.omegaCallbacks = omegaCallbacks;
this.omegaCallbacksReverse = omegaCallbacksReverse;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 623355c..07fe093 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -23,12 +23,10 @@ package org.apache.servicecomb.saga.alpha.server;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import java.lang.invoke.MethodHandles;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Collection;
import java.util.Date;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -47,14 +45,14 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
- private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+ private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
private final TxConsistentService txConsistentService;
private final StreamObserver<GrpcCompensateCommand> responseObserver;
GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
- Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse,
+ Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse,
TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
this.omegaCallbacks = omegaCallbacks;
this.omegaCallbacksReverse = omegaCallbacksReverse;
@@ -70,10 +68,8 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
if (message.getType().equals(TxStartedEvent.name())) {
Map<String, OmegaCallback> instanceCallback = omegaCallbacks
.computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
- instanceCallback.putIfAbsent(instanceId, new GrpcOmegaCallback(responseObserver));
- Map<String, String> serviceInstanceId = omegaCallbacksReverse
- .computeIfAbsent(responseObserver, v -> new ConcurrentHashMap<>());
- serviceInstanceId.putIfAbsent(serviceName, instanceId);
+ instanceCallback.computeIfAbsent(instanceId, v -> new GrpcOmegaCallback(responseObserver));
+ omegaCallbacksReverse.computeIfAbsent(responseObserver, v -> new SimpleImmutableEntry<>(serviceName, instanceId));
}
// store received event
@@ -106,20 +102,11 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
}
private void removeInvalidCallback() {
- Collection<Map<String, String>> services = omegaCallbacksReverse.values();
- for (Map<String, String> service : services) {
- Set<String> removedServices = new HashSet<>();
- for (Entry<String, String> entry : service.entrySet()) {
- String serviceName = entry.getKey();
- String instanceId = entry.getValue();
- Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(serviceName);
- if (instanceCallback != null) {
- instanceCallback.remove(instanceId);
- removedServices.add(serviceName);
- }
- }
- for (String removedService : removedServices) {
- service.remove(removedService);
+ Collection<SimpleImmutableEntry<String, String>> services = omegaCallbacksReverse.values();
+ for (SimpleImmutableEntry<String, String> pair : services) {
+ Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(pair.getKey());
+ if (instanceCallback != null) {
+ instanceCallback.remove(pair.getValue());
}
}
omegaCallbacksReverse.remove(responseObserver);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 55c095e..a406acd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -94,7 +94,7 @@ public class AlphaIntegrationTest {
// use the asynchronous stub need to wait for some time
await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
- assertThat(receivedCommands.size(), is(0));
+ assertThat(receivedCommands.isEmpty(), is(true));
TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
@@ -122,8 +122,8 @@ public class AlphaIntegrationTest {
requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method b"));
requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
-
await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+
assertThat(receivedCommands, containsInAnyOrder(
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
.setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
@@ -137,8 +137,10 @@ public class AlphaIntegrationTest {
StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
requestObserver.onNext(someGrpcEvent(TxStartedEvent));
await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+
requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
index a40498e..2fb3593 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +46,7 @@ import io.grpc.stub.StreamObserver;
public class GrpcTxEventStreamObserverTest {
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
- private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
+ private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
@@ -76,12 +77,14 @@ public class GrpcTxEventStreamObserverTest {
assertThat(omegaCallbacks.size(), is(1));
assertThat(omegaCallbacks.getOrDefault(serviceName, null), is(notNullValue()));
- assertThat(omegaCallbacks.get(serviceName).getOrDefault(instanceId, null),
- is(new GrpcOmegaCallback(responseObserver)));
+ OmegaCallback callback = omegaCallbacks.get(serviceName).getOrDefault(instanceId, null);
+ assertThat(callback, is(notNullValue()));
+ assertThat(((GrpcOmegaCallback) callback).observer(), is(responseObserver));
assertThat(omegaCallbacksReverse.size(), is(1));
assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
- assertThat(omegaCallbacksReverse.get(responseObserver).getOrDefault(serviceName, null), is(instanceId));
+ assertThat(omegaCallbacksReverse.get(responseObserver).getKey(), is(serviceName));
+ assertThat(omegaCallbacksReverse.get(responseObserver).getValue(), is(instanceId));
}
@Test
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 482e03a..ac522b0 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -42,7 +42,7 @@ public class GrpcClientMessageSender implements MessageSender {
private final MessageSerializer serializer;
private final ServiceConfig serviceConfig;
- private StreamObserver<GrpcTxEvent> requestObserver;
+ private final StreamObserver<GrpcTxEvent> requestObserver;
public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
MessageHandler handler) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index d5c757d..1429da6 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -41,7 +41,7 @@ public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensa
@Override
public void onNext(GrpcCompensateCommand command) {
- LOG.info("receive compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
+ LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
messageHandler.onReceive(
command.getGlobalTxId(),
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.