You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/05 08:46:40 UTC
[incubator-servicecomb-saga] 03/03: SCB-167 added disconnection
call to properly close compensation stream
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit fdce17bb7890c5332617f641c81b0f5e3509f335
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 5 15:32:30 2018 +0800
SCB-167 added disconnection call to properly close compensation stream
Signed-off-by: seanyinx <se...@huawei.com>
---
.../servicecomb/saga/alpha/core/OmegaCallback.java | 3 +
.../saga/alpha/server/GrpcOmegaCallback.java | 13 ++--
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 16 +++++
.../saga/alpha/server/AlphaIntegrationTest.java | 82 ++++++++++++++++++++--
integration-tests/pack-tests/pom.xml | 5 ++
.../pack/tests/GreetingApplication.java | 9 +++
.../integration/pack/tests/GreetingController.java | 17 ++++-
.../integration/pack/tests/GreetingService.java | 31 ++++++--
.../saga/integration/pack/tests/PackIT.java | 71 ++++++++++++++++++-
.../integration/pack/tests/TxEventEnvelope.java | 16 +++++
.../connector/grpc/GrpcClientMessageSender.java | 14 +++-
.../grpc/GrpcCompensateStreamObserver.java | 7 +-
.../saga/omega/spring/OmegaSpringConfig.java | 5 +-
.../saga/omega/transaction/MessageSender.java | 3 +
.../src/main/proto/GrpcTxEvent.proto | 1 +
15 files changed, 267 insertions(+), 26 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index bc51238..f60a44d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -19,4 +19,7 @@ package org.apache.servicecomb.saga.alpha.core;
public interface OmegaCallback {
void compensate(TxEvent event);
+
+ default void disconnect() {
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index c576552..5a95281 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -28,18 +28,14 @@ import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
-public class GrpcOmegaCallback implements OmegaCallback {
+class GrpcOmegaCallback implements OmegaCallback {
private final StreamObserver<GrpcCompensateCommand> observer;
- public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
+ GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
this.observer = observer;
}
- StreamObserver<GrpcCompensateCommand> observer() {
- return observer;
- }
-
@Override
public void compensate(TxEvent event) {
GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
@@ -51,4 +47,9 @@ public class GrpcOmegaCallback implements OmegaCallback {
.build();
observer.onNext(command);
}
+
+ @Override
+ public void disconnect() {
+ observer.onCompleted();
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 1e34f21..fc5f608 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,6 +20,8 @@
package org.apache.servicecomb.saga.alpha.server;
+import static java.util.Collections.emptyMap;
+
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -55,6 +57,20 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
.computeIfAbsent(request.getInstanceId(), key -> new GrpcOmegaCallback(responseObserver));
}
+ // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
+ @Override
+ public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+ OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap())
+ .remove(request.getInstanceId());
+
+ if (callback != null) {
+ callback.disconnect();
+ }
+
+ responseObserver.onNext(GrpcAck.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+
@Override
public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
txConsistentService.handle(new TxEvent(
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a266ed2..2f3a3ec 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -24,14 +24,17 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.servicecomb.saga.alpha.core.EventType;
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
@@ -39,6 +42,7 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
import org.hamcrest.core.Is;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -81,8 +85,11 @@ public class AlphaIntegrationTest {
@Autowired
private TxEventEnvelopeRepository eventRepo;
+ @Autowired
+ private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
- private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
+ private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver();
@AfterClass
public static void tearDown() throws Exception {
@@ -95,6 +102,11 @@ public class AlphaIntegrationTest {
receivedCommands.clear();
}
+ @After
+ public void after() throws Exception {
+ blockingStub.onDisconnected(serviceConfig);
+ }
+
@Test
public void persistsEvent() {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
@@ -117,6 +129,46 @@ public class AlphaIntegrationTest {
}
@Test
+ public void closeStreamOnDisconnected() {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+ await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+
+ assertThat(
+ omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()),
+ is(notNullValue()));
+
+ blockingStub.onDisconnected(serviceConfig);
+ assertThat(
+ omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()),
+ is(false));
+
+ assertThat(compensateResponseObserver.isCompleted(), is(true));
+ }
+
+ @Test
+ public void closeStreamOfDisconnectedClientOnly() {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+
+ GrpcServiceConfig anotherServiceConfig = someServiceConfig();
+ CompensateStreamObserver anotherResponseObserver = new CompensateStreamObserver();
+ TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
+
+ await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
+
+ blockingStub.onDisconnected(serviceConfig);
+
+ assertThat(
+ omegaCallbacks.get(anotherServiceConfig.getServiceName()).containsKey(anotherServiceConfig.getInstanceId()),
+ is(true));
+
+ assertThat(anotherResponseObserver.isCompleted(), is(false));
+
+ TxEventServiceGrpc.newBlockingStub(clientChannel).onDisconnected(anotherServiceConfig);
+ }
+
+ @Test
public void doNotCompensateDuplicateTxOnFailure() {
// duplicate events with same content but different timestamp
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
@@ -161,8 +213,12 @@ public class AlphaIntegrationTest {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
- blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
+ // simulates connection from another service with different globalTxId
+ GrpcServiceConfig anotherServiceConfig = someServiceConfig();
+ TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensateStreamObserver());
+
+ TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
+ anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
await().atMost(1, SECONDS).until(() -> eventRepo.count() == 2);
@@ -171,10 +227,15 @@ public class AlphaIntegrationTest {
assertThat(receivedCommands.size(), is(1));
assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
- assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
- assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
- assertThat(receivedCommands.get(0).getCompensateMethod(), is(compensationMethod));
- assertThat(receivedCommands.get(0).getPayloads().toByteArray(), is(payload.getBytes()));
+
+ anotherBlockingStub.onDisconnected(anotherServiceConfig);
+ }
+
+ private GrpcServiceConfig someServiceConfig() {
+ return GrpcServiceConfig.newBuilder()
+ .setServiceName(uniquify("serviceName"))
+ .setInstanceId(uniquify("instanceId"))
+ .build();
}
private GrpcTxEvent someGrpcEvent(EventType type) {
@@ -214,6 +275,8 @@ public class AlphaIntegrationTest {
}
private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+ private boolean completed = false;
+
@Override
public void onNext(GrpcCompensateCommand command) {
// intercept received command
@@ -226,6 +289,11 @@ public class AlphaIntegrationTest {
@Override
public void onCompleted() {
+ completed = true;
+ }
+
+ boolean isCompleted() {
+ return completed;
}
}
}
diff --git a/integration-tests/pack-tests/pom.xml b/integration-tests/pack-tests/pom.xml
index 944f45d..fb81a0e 100644
--- a/integration-tests/pack-tests/pom.xml
+++ b/integration-tests/pack-tests/pom.xml
@@ -94,6 +94,11 @@
<artifactId>groovy-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingApplication.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingApplication.java
index a9461dd..9d1b23a 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingApplication.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingApplication.java
@@ -17,9 +17,13 @@
package org.apache.servicecomb.saga.integration.pack.tests;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import org.apache.servicecomb.saga.omega.spring.EnableOmega;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
@EnableOmega
@SpringBootApplication
@@ -27,4 +31,9 @@ public class GreetingApplication {
public static void main(String[] args) {
SpringApplication.run(GreetingApplication.class, args);
}
+
+ @Bean
+ Queue<String> compensated() {
+ return new ConcurrentLinkedQueue<>();
+ }
}
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 90fd4ac..12356ee 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -28,6 +28,7 @@ import org.springframework.web.client.RestTemplate;
@Controller
@RequestMapping("/")
public class GreetingController {
+ static final String TRESPASSER = "trespasser";
private final GreetingService greetingService;
private final RestTemplate restTemplate;
@@ -41,13 +42,25 @@ public class GreetingController {
@GetMapping("/greet")
ResponseEntity<String> greet(@RequestParam String name) {
String greetings = greetingService.greet(name);
- String bonjour = restTemplate.getForObject("http://localhost:8080/bonjour?name={name}", String.class, name);
- return ResponseEntity.ok(greetings + "; " + bonjour);
+ if (!TRESPASSER.equals(name)) {
+ String bonjour = restTemplate.getForObject("http://localhost:8080/bonjour?name={name}", String.class, name);
+
+ return ResponseEntity.ok(greetings + "; " + bonjour);
+ }
+
+ String rude = restTemplate.getForObject("http://localhost:8080/rude?name={name}", String.class, name);
+
+ return ResponseEntity.ok(greetings + "; " + rude);
}
@GetMapping("/bonjour")
ResponseEntity<String> bonjour(@RequestParam String name) {
return ResponseEntity.ok(greetingService.bonjour(name));
}
+
+ @GetMapping("/rude")
+ ResponseEntity<String> rude(@RequestParam String name) {
+ return ResponseEntity.ok(greetingService.beingRude(name));
+ }
}
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
index ef8519f..69a86f6 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
@@ -17,19 +17,28 @@
package org.apache.servicecomb.saga.integration.pack.tests;
-import org.springframework.stereotype.Service;
+import java.util.Queue;
import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
@Service
-public class GreetingService {
+class GreetingService {
+ private final Queue<String> compensated;
+
+ @Autowired
+ GreetingService(Queue<String> compensated) {
+ this.compensated = compensated;
+ }
+
@Compensable(compensationMethod = "goodbye")
String greet(String name) {
return "Greetings, " + name;
}
String goodbye(String name) {
- return "Goodbye, " + name;
+ return appendMessage("Goodbye, " + name);
}
@Compensable(compensationMethod = "auRevoir")
@@ -38,6 +47,20 @@ public class GreetingService {
}
String auRevoir(String name) {
- return "Au revoir, " + name;
+ return appendMessage("Au revoir, " + name);
+ }
+
+ @Compensable(compensationMethod = "apologize")
+ String beingRude(String name) {
+ throw new IllegalStateException("You know where the door is, " + name);
+ }
+
+ String apologize(String name) {
+ return appendMessage("My bad, please take the window instead, " + name);
+ }
+
+ private String appendMessage(String message) {
+ compensated.add(message);
+ return message;
}
}
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index 50bbd4e..66454b7 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -17,17 +17,24 @@
package org.apache.servicecomb.saga.integration.pack.tests;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.integration.pack.tests.GreetingController.TRESPASSER;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.springframework.http.HttpMethod.GET;
+import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR;
import static org.springframework.http.HttpStatus.OK;
import java.util.List;
+import java.util.Queue;
import java.util.UUID;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -55,6 +62,14 @@ public class PackIT {
@Autowired
private TxEventEnvelopeRepository repository;
+ @Autowired
+ private Queue<String> compensatedMessages;
+
+ @After
+ public void tearDown() throws Exception {
+ repository.deleteAll();
+ }
+
@Test
public void updatesTxStateToAlpha() throws Exception {
HttpHeaders headers = new HttpHeaders();
@@ -74,24 +89,76 @@ public class PackIT {
assertThat(envelopes.size(), is(4));
assertThat(envelopes.get(0).type(), is("TxStartedEvent"));
+ assertThat(envelopes.get(0).localTxId(), is(notNullValue()));
assertThat(envelopes.get(0).parentTxId(), is(nullValue()));
assertThat(envelopes.get(0).serviceName(), is(serviceName));
assertThat(envelopes.get(0).instanceId(), is(notNullValue()));
assertThat(envelopes.get(1).type(), is("TxEndedEvent"));
+ assertThat(envelopes.get(1).localTxId(), is(envelopes.get(0).localTxId()));
assertThat(envelopes.get(1).parentTxId(), is(nullValue()));
assertThat(envelopes.get(1).serviceName(), is(serviceName));
- assertThat(envelopes.get(1).instanceId(), is(notNullValue()));
+ assertThat(envelopes.get(1).instanceId(), is(envelopes.get(0).instanceId()));
assertThat(envelopes.get(2).type(), is("TxStartedEvent"));
+ assertThat(envelopes.get(2).localTxId(), is(notNullValue()));
assertThat(envelopes.get(2).parentTxId(), is(envelopes.get(0).localTxId()));
assertThat(envelopes.get(2).serviceName(), is(serviceName));
assertThat(envelopes.get(2).instanceId(), is(notNullValue()));
assertThat(envelopes.get(3).type(), is("TxEndedEvent"));
+ assertThat(envelopes.get(3).localTxId(), is(envelopes.get(2).localTxId()));
+ assertThat(envelopes.get(3).parentTxId(), is(envelopes.get(0).localTxId()));
+ assertThat(envelopes.get(3).serviceName(), is(serviceName));
+ assertThat(envelopes.get(3).instanceId(), is(envelopes.get(2).instanceId()));
+
+ assertThat(compensatedMessages.isEmpty(), is(true));
+ }
+
+ @Test
+ public void compensatesFailedGlobalTransaction() throws Exception {
+ HttpHeaders headers = new HttpHeaders();
+
+ headers.set(OmegaContext.GLOBAL_TX_ID_KEY, globalTxId);
+
+ ResponseEntity<String> entity = restTemplate.exchange("/greet?name={name}",
+ GET,
+ new HttpEntity<>(headers),
+ String.class,
+ TRESPASSER);
+
+ assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
+
+ await().atMost(2, SECONDS).until(() -> repository.count() == 6);
+
+ List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+ assertThat(envelopes.size(), is(6));
+
+ assertThat(envelopes.get(0).type(), is("TxStartedEvent"));
+ assertThat(envelopes.get(1).type(), is("TxEndedEvent"));
+ assertThat(envelopes.get(2).type(), is("TxStartedEvent"));
+
+ assertThat(envelopes.get(3).type(), is("TxAbortedEvent"));
+ assertThat(envelopes.get(3).localTxId(), is(envelopes.get(2).localTxId()));
assertThat(envelopes.get(3).parentTxId(), is(envelopes.get(0).localTxId()));
assertThat(envelopes.get(3).serviceName(), is(serviceName));
- assertThat(envelopes.get(3).instanceId(), is(notNullValue()));
+ assertThat(envelopes.get(3).instanceId(), is(envelopes.get(2).instanceId()));
+
+ assertThat(envelopes.get(4).type(), is("TxCompensatedEvent"));
+ assertThat(envelopes.get(4).localTxId(), is(envelopes.get(0).localTxId()));
+ assertThat(envelopes.get(4).parentTxId(), is(nullValue()));
+ assertThat(envelopes.get(4).serviceName(), is(serviceName));
+ assertThat(envelopes.get(4).instanceId(), is(envelopes.get(0).instanceId()));
+
+ assertThat(envelopes.get(5).type(), is("TxCompensatedEvent"));
+ assertThat(envelopes.get(5).localTxId(), is(envelopes.get(2).localTxId()));
+ assertThat(envelopes.get(5).parentTxId(), is(envelopes.get(0).localTxId()));
+ assertThat(envelopes.get(5).serviceName(), is(serviceName));
+ assertThat(envelopes.get(5).instanceId(), is(envelopes.get(2).instanceId()));
+
+ assertThat(compensatedMessages, contains(
+ "Goodbye, " + TRESPASSER,
+ "My bad, please take the window instead, " + TRESPASSER));
}
}
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
index 25ef1e3..d99c2b6 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.saga.integration.pack.tests;
+import java.util.Arrays;
import java.util.Date;
import javax.persistence.Entity;
@@ -64,4 +65,19 @@ class TxEventEnvelope {
public byte[] payloads() {
return payloads;
}
+
+ @Override
+ public String toString() {
+ return "TxEventEnvelope{" +
+ "surrogateId=" + surrogateId +
+ ", serviceName='" + serviceName + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", creationTime=" + creationTime +
+ ", globalTxId='" + globalTxId + '\'' +
+ ", localTxId='" + localTxId + '\'' +
+ ", parentTxId='" + parentTxId + '\'' +
+ ", type='" + type + '\'' +
+ ", payloads=" + Arrays.toString(payloads) +
+ '}';
+ }
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index ef07536..b2e837c 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -21,6 +21,7 @@
package org.apache.servicecomb.saga.omega.connector.grpc;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
@@ -46,13 +47,16 @@ public class GrpcClientMessageSender implements MessageSender {
private final GrpcCompensateStreamObserver compensateStreamObserver;
private final GrpcServiceConfig serviceConfig;
- public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
+ public GrpcClientMessageSender(ManagedChannel channel,
+ MessageSerializer serializer,
+ MessageDeserializer deserializer,
+ ServiceConfig serviceConfig,
MessageHandler handler) {
this.asyncEventService = TxEventServiceGrpc.newStub(channel);
this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
this.serializer = serializer;
- this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler);
+ this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, deserializer);
this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
}
@@ -62,6 +66,11 @@ public class GrpcClientMessageSender implements MessageSender {
}
@Override
+ public void onDisconnected() {
+ blockingEventService.onDisconnected(serviceConfig);
+ }
+
+ @Override
public void send(TxEvent event) {
blockingEventService.onTxEvent(convertEvent(event));
}
@@ -76,6 +85,7 @@ public class GrpcClientMessageSender implements MessageSender {
.setGlobalTxId(event.globalTxId())
.setLocalTxId(event.localTxId())
.setType(event.type())
+ .setCompensationMethod(event.compensationMethod())
.setPayloads(payloads);
if (event.parentTxId() != null) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 7f6b85c..7a845eb 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -22,6 +22,7 @@ package org.apache.servicecomb.saga.omega.connector.grpc;
import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.slf4j.Logger;
@@ -34,9 +35,11 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MessageHandler messageHandler;
+ private final MessageDeserializer deserializer;
- GrpcCompensateStreamObserver(MessageHandler messageHandler) {
+ GrpcCompensateStreamObserver(MessageHandler messageHandler, MessageDeserializer deserializer) {
this.messageHandler = messageHandler;
+ this.deserializer = deserializer;
}
@Override
@@ -49,7 +52,7 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
command.getLocalTxId(),
command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
command.getCompensateMethod(),
- command.getPayloads());
+ deserializer.deserialize(command.getPayloads().toByteArray()));
}
@Override
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 460d314..9e0ebb6 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -47,6 +47,7 @@ class OmegaSpringConfig {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final List<ManagedChannel> channels = new ArrayList<>();
+ private final List<MessageSender> senders = new ArrayList<>();
@Bean
IdGenerator<String> idGenerator() {
@@ -65,6 +66,7 @@ class OmegaSpringConfig {
@PreDestroy
void close() {
+ senders.forEach(MessageSender::onDisconnected);
channels.forEach(ManagedChannel::shutdown);
}
@@ -74,8 +76,9 @@ class OmegaSpringConfig {
// TODO: 2017/12/26 connect to the one with lowest latency
for (String address : addresses) {
try {
- MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig, handler);
+ MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), new NativeMessageFormat(), serviceConfig, handler);
sender.onConnected();
+ senders.add(sender);
return sender;
} catch (Exception e) {
log.error("Unable to connect to alpha at {}", address, e);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 8bc5260..d37c5b8 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -21,5 +21,8 @@ public interface MessageSender {
default void onConnected() {
}
+ default void onDisconnected() {
+ }
+
void send(TxEvent event);
}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 7ffb654..9522b6f 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -24,6 +24,7 @@ option java_outer_classname = "TxEventProto";
service TxEventService {
rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {}
rpc OnTxEvent (GrpcTxEvent) returns (GrpcAck) {}
+ rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck){}
}
message GrpcServiceConfig {
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.