You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:46 UTC
[incubator-servicecomb-saga] 04/07: SCB-138 remove omegaCallback
when grpc client disconnects
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 6be42657232e0b58c7325b392323b7ebd8487b33
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 3 18:59:13 2018 +0800
SCB-138 remove omegaCallback when grpc client disconnects
Signed-off-by: Eric Lee <da...@huawei.com>
---
alpha/alpha-server/pom.xml | 12 ++
.../servicecomb/saga/alpha/server/AlphaConfig.java | 28 +++--
.../saga/alpha/server/GrpcOmegaCallback.java | 26 +++--
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 8 +-
.../alpha/server/GrpcTxEventStreamObserver.java | 45 +++++++-
.../saga/alpha/server/AlphaIntegrationTest.java | 25 ++---
.../server/GrpcTxEventStreamObserverTest.java | 122 +++++++++++++++++++++
7 files changed, 227 insertions(+), 39 deletions(-)
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index cc4115d..7ecd396 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -88,6 +88,18 @@
<groupId>com.github.seanyinx</groupId>
<artifactId>unit-scaffolding</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index e33ba99..7353fa4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -24,16 +24,18 @@ import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import io.grpc.stub.StreamObserver;
@Configuration
class AlphaConfig {
@@ -42,13 +44,18 @@ class AlphaConfig {
@Value("${alpha.compensation.retry.delay:3000}")
private int delay;
- // TODO: 2017/12/27 to be filled with actual callbacks on completion of SCB-138
+ // TODO: 2018/01/03 optimize reverse visit of the map instead of using another map, namely omegaCallbacksReverse
@Bean
Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
return new ConcurrentHashMap<>();
}
@Bean
+ Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse() {
+ return new ConcurrentHashMap<>();
+ }
+
+ @Bean
OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks));
}
@@ -62,19 +69,22 @@ class AlphaConfig {
TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
TxEventRepository eventRepository,
OmegaCallback omegaCallback,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+ Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
- ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
+ ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks, omegaCallbacksReverse);
new Thread(startable::start).start();
return consistentService;
}
private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
- return new GrpcStartable(port, new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+ Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+ return new GrpcStartable(port,
+ new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
}
@PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 43d4ac4..c9cda3a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -20,23 +20,18 @@
package org.apache.servicecomb.saga.alpha.server;
+import java.util.Objects;
+
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
-import java.lang.invoke.MethodHandles;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
public class GrpcOmegaCallback implements OmegaCallback {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
private final StreamObserver<GrpcCompensateCommand> observer;
public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
@@ -54,4 +49,21 @@ public class GrpcOmegaCallback implements OmegaCallback {
.build();
observer.onNext(command);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GrpcOmegaCallback that = (GrpcOmegaCallback) o;
+ return Objects.equals(observer, that.observer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(observer);
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 0b84217..d8d1cea 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -36,14 +36,18 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+ private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+
GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+ Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
this.txConsistentService = txConsistentService;
this.omegaCallbacks = omegaCallbacks;
+ this.omegaCallbacksReverse = omegaCallbacksReverse;
}
@Override
public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
- return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
+ return new GrpcTxEventStreamObserver(omegaCallbacks, omegaCallbacksReverse, txConsistentService, responseObserver);
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 16ee788..6a2ae8e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -23,8 +23,12 @@ package org.apache.servicecomb.saga.alpha.server;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import java.lang.invoke.MethodHandles;
+import java.util.Collection;
import java.util.Date;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -43,13 +47,17 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+ private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+
private final TxConsistentService txConsistentService;
private final StreamObserver<GrpcCompensateCommand> responseObserver;
GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+ Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse,
TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
this.omegaCallbacks = omegaCallbacks;
+ this.omegaCallbacksReverse = omegaCallbacksReverse;
this.txConsistentService = txConsistentService;
this.responseObserver = responseObserver;
}
@@ -57,15 +65,21 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
@Override
public void onNext(GrpcTxEvent message) {
// register a callback on started event
+ String serviceName = message.getServiceName();
+ String instanceId = message.getInstanceId();
if (message.getType().equals(TxStartedEvent.name())) {
- omegaCallbacks.computeIfAbsent(message.getServiceName(), (key) -> new ConcurrentHashMap<>())
- .put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+ Map<String, OmegaCallback> instanceCallback = omegaCallbacks
+ .computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
+ instanceCallback.putIfAbsent(instanceId, new GrpcOmegaCallback(responseObserver));
+ Map<String, String> serviceInstanceId = omegaCallbacksReverse
+ .computeIfAbsent(responseObserver, v -> new ConcurrentHashMap<>());
+ serviceInstanceId.putIfAbsent(serviceName, instanceId);
}
// store received event
txConsistentService.handle(new TxEvent(
- message.getServiceName(),
- message.getInstanceId(),
+ serviceName,
+ instanceId,
new Date(message.getTimestamp()),
message.getGlobalTxId(),
message.getLocalTxId(),
@@ -79,11 +93,32 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
@Override
public void onError(Throwable t) {
LOG.error("failed to process grpc message.", t);
- responseObserver.onCompleted();
+ onCompleted();
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
+ removeInvalidCallback();
+ }
+
+ private void removeInvalidCallback() {
+ Collection<Map<String, String>> services = omegaCallbacksReverse.values();
+ for (Map<String, String> service : services) {
+ Set<String> removedServices = new HashSet<>();
+ for (Entry<String, String> entry : service.entrySet()) {
+ String serviceName = entry.getKey();
+ String instanceId = entry.getValue();
+ Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(serviceName);
+ if (instanceCallback != null) {
+ instanceCallback.remove(instanceId);
+ removedServices.add(serviceName);
+ }
+ }
+ for (String removedService : removedServices) {
+ service.remove(removedService);
+ }
+ }
+ omegaCallbacksReverse.remove(responseObserver);
}
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 5eb1d1b..ca4b7bd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -73,9 +73,6 @@ public class AlphaIntegrationTest {
@Autowired
private TxEventEnvelopeRepository eventRepo;
- // use an empty response observer as we don't need the response in client side
- private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new EmptyStreamObserver();
-
private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
@@ -85,18 +82,20 @@ public class AlphaIntegrationTest {
}
@Before
- public void before() throws Exception {
+ public void before() {
eventRepo.deleteAll();
receivedCommands.clear();
}
@Test
- public void persistsEvent() throws Exception {
- StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
+ public void persistsEvent() {
+ StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
requestObserver.onNext(someGrpcEvent(TxStartedEvent));
// use the asynchronous stub need to wait for some time
await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+ assertThat(receivedCommands.size(), is(0));
+
TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
assertThat(envelope.serviceName(), is(serviceName));
@@ -110,7 +109,7 @@ public class AlphaIntegrationTest {
}
@Test
- public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+ public void doNotCompensateDuplicateTxOnFailure() {
// duplicate events with same content but different timestamp
StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
@@ -171,9 +170,11 @@ public class AlphaIntegrationTest {
.build();
}
- private static class EmptyStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+ private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
@Override
public void onNext(GrpcCompensateCommand command) {
+ // intercept received command
+ receivedCommands.add(command);
}
@Override
@@ -184,12 +185,4 @@ public class AlphaIntegrationTest {
public void onCompleted() {
}
}
-
- private static class CompensateStreamObserver extends EmptyStreamObserver {
- @Override
- public void onNext(GrpcCompensateCommand command) {
- // intercept received command
- receivedCommands.add(command);
- }
- }
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
new file mode 100644
index 0000000..a40498e
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.servicecomb.saga.alpha.core.EventType;
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcTxEventStreamObserverTest {
+ private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
+
+ private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
+
+ private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
+
+ private final StreamObserver responseObserver = mock(StreamObserver.class);
+
+ private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks,
+ omegaCallbacksReverse, txConsistentService, responseObserver);
+
+ private final String serviceName = "service a";
+
+ private final String instanceId = "instance a";
+
+ private final GrpcTxEvent startedEvent = eventOf(serviceName, instanceId, TxStartedEvent);
+
+ private final GrpcTxEvent abortedEvent = eventOf(serviceName, instanceId, TxAbortedEvent);
+
+ private final GrpcTxEvent endedEvent = eventOf(serviceName, instanceId, TxEndedEvent);
+
+ @Before
+ public void setUp() throws Exception {
+ omegaCallbacks.clear();
+ omegaCallbacksReverse.clear();
+ }
+
+ @Test
+ public void updateOmegaCallbacksOnStartedEvent() {
+ observer.onNext(startedEvent);
+
+ assertThat(omegaCallbacks.size(), is(1));
+ assertThat(omegaCallbacks.getOrDefault(serviceName, null), is(notNullValue()));
+ assertThat(omegaCallbacks.get(serviceName).getOrDefault(instanceId, null),
+ is(new GrpcOmegaCallback(responseObserver)));
+
+ assertThat(omegaCallbacksReverse.size(), is(1));
+ assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
+ assertThat(omegaCallbacksReverse.get(responseObserver).getOrDefault(serviceName, null), is(instanceId));
+ }
+
+ @Test
+ public void duplicateEventsOnlyHoldsOneOmegaCallback() {
+ observer.onNext(startedEvent);
+ observer.onNext(startedEvent);
+
+ assertThat(omegaCallbacks.size(), is(1));
+ assertThat(omegaCallbacksReverse.size(), is(1));
+ }
+
+ @Test
+ public void omegaCallbacksNotChangeOnOtherEvents() {
+ observer.onNext(abortedEvent);
+ observer.onNext(endedEvent);
+
+ assertThat(omegaCallbacks.isEmpty(), is(true));
+ }
+
+ @Test
+ public void removeOmegaCallbacksOnComplete() {
+ observer.onNext(startedEvent);
+ assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(false));
+ assertThat(omegaCallbacksReverse.size(), is(1));
+
+ observer.onCompleted();
+ assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(true));
+ assertThat(omegaCallbacksReverse.isEmpty(), is(true));
+ }
+
+ private GrpcTxEvent eventOf(String serviceName, String instanceId, EventType type) {
+ return GrpcTxEvent.newBuilder()
+ .setServiceName(serviceName)
+ .setInstanceId(instanceId)
+ .setType(type.name())
+ .build();
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.