You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/02 02:52:09 UTC

[incubator-servicecomb-saga] branch SCB-149_service_aware_callback created (now 8d459e2)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


      at 8d459e2  SCB-149 distinguished omega callbacks by service name and instance id

This branch includes the following new commits:

     new 63ccc28  SCB-149 removed unnecessary endpoint interface
     new 6cbcf09  SCB-149 added service name and instance id to contract
     new 1fe3e9d  SCB-149 checked matching of compensation method
     new 47a1ba6  SCB-149 omega callback signature change
     new 8d459e2  SCB-149 distinguished omega callbacks by service name and instance id

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 01/05: SCB-149 removed unnecessary endpoint interface

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 63ccc284ef040e3469cbb5dfdc2a15c51d2b5437
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Dec 29 17:47:13 2017 +0800

    SCB-149 removed unnecessary endpoint interface
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  1 +
 integration-tests/coverage-aggregate/pom.xml       |  4 +
 .../connector/grpc/GrpcClientMessageSender.java    | 12 ++-
 .../connector/grpc/GrpcTxEventEndpointImpl.java    | 39 ----------
 .../grpc/GrpcClientMessageSenderTest.java          | 90 ----------------------
 .../saga/omega/spring/OmegaSpringConfig.java       | 27 +++----
 .../pack/contract/grpc/GrpcTxEventEndpoint.java    | 25 ------
 7 files changed, 27 insertions(+), 171 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 76ab346..42c597e 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -48,6 +48,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getCompensationMethod(),
         message.getPayloads().toByteArray()
     ));
+
     GrpcEmpty reply = GrpcEmpty.newBuilder().build();
     responseObserver.onNext(reply);
     responseObserver.onCompleted();
diff --git a/integration-tests/coverage-aggregate/pom.xml b/integration-tests/coverage-aggregate/pom.xml
index ffebdee..72a172d 100644
--- a/integration-tests/coverage-aggregate/pom.xml
+++ b/integration-tests/coverage-aggregate/pom.xml
@@ -69,6 +69,10 @@
     </dependency>
     <dependency>
       <groupId>io.servicecomb.saga</groupId>
+      <artifactId>omega-connector-grpc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.servicecomb.saga</groupId>
       <artifactId>alpha-server</artifactId>
     </dependency>
     <dependency>
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 25f6223..16f94b3 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -22,21 +22,23 @@ package io.servicecomb.saga.omega.connector.grpc;
 
 import com.google.protobuf.ByteString;
 
+import io.grpc.ManagedChannel;
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.MessageSerializer;
 import io.servicecomb.saga.omega.transaction.TxEvent;
 import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
+import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
+import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
 
 public class GrpcClientMessageSender implements MessageSender {
 
-  private final GrpcTxEventEndpoint eventService;
+  private final TxEventServiceBlockingStub eventService;
 
   private final MessageSerializer serializer;
 
-  public GrpcClientMessageSender(GrpcTxEventEndpoint eventService, MessageSerializer serializer) {
-    this.eventService = eventService;
+  public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer) {
+    this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
     this.serializer = serializer;
   }
 
@@ -47,12 +49,14 @@ public class GrpcClientMessageSender implements MessageSender {
 
   private GrpcTxEvent convertEvent(TxEvent event) {
     ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads()));
+
     Builder builder = GrpcTxEvent.newBuilder()
         .setTimestamp(event.timestamp())
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
         .setType(event.type())
         .setPayloads(payloads);
+
     if (event.parentTxId() != null) {
       builder.setParentTxId(event.parentTxId());
     }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java
deleted file mode 100644
index b3f2b26..0000000
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.omega.connector.grpc;
-
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
-
-public class GrpcTxEventEndpointImpl implements GrpcTxEventEndpoint {
-
-  private final TxEventServiceBlockingStub stub;
-
-  public GrpcTxEventEndpointImpl(TxEventServiceBlockingStub stub) {
-    this.stub = stub;
-  }
-
-  @Override
-  public void reportEvent(GrpcTxEvent event) {
-    stub.reportEvent(event);
-  }
-}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java
deleted file mode 100644
index ca4f034..0000000
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.omega.connector.grpc;
-
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.junit.Test;
-
-import io.servicecomb.saga.omega.transaction.MessageSerializer;
-import io.servicecomb.saga.omega.transaction.TxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
-
-public class GrpcClientMessageSenderTest {
-  private final String globalTxId = uniquify("global tx id");
-
-  private final String localTxId = uniquify("local tx id");
-
-  private final String parentTxId = uniquify("parent tx id");
-
-  private final String payload1 = uniquify("payload1");
-
-  private final String payload2 = uniquify("payload2");
-
-  private GrpcTxEvent grpcTxEvent;
-
-  private final MessageSerializer serializer = new MessageSerializer() {
-    @Override
-    public byte[] serialize(TxEvent event) {
-      return serialize(event.payloads());
-    }
-
-    @Override
-    public byte[] serialize(Object[] objects) {
-      try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
-        for (Object o : objects) {
-          stream.write(o.toString().getBytes());
-        }
-        return stream.toByteArray();
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-  };
-
-
-  private final GrpcTxEventEndpoint eventService = new GrpcTxEventEndpoint() {
-    @Override
-    public void reportEvent(GrpcTxEvent event) {
-      grpcTxEvent = event;
-    }
-  };
-
-  private final GrpcClientMessageSender messageSender = new GrpcClientMessageSender(eventService, serializer);
-
-  @Test
-  public void sendSerializedEvent() throws Exception {
-    TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, payload1, payload2);
-
-    messageSender.send(event);
-
-    assertThat(grpcTxEvent.getGlobalTxId(), is(event.globalTxId()));
-    assertThat(grpcTxEvent.getLocalTxId(), is(event.localTxId()));
-    assertThat(grpcTxEvent.getParentTxId(), is(event.parentTxId()));
-    assertThat(grpcTxEvent.getPayloads().toByteArray(), is(serializer.serialize(event)));
-  }
-}
\ No newline at end of file
diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 9b749fd..73e2212 100644
--- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -18,7 +18,9 @@
 package io.servicecomb.saga.omega.spring;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import javax.annotation.PreDestroy;
 
@@ -31,21 +33,17 @@ import org.springframework.context.annotation.Configuration;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
-import io.servicecomb.saga.omega.connector.grpc.GrpcTxEventEndpointImpl;
 import io.servicecomb.saga.omega.context.IdGenerator;
 import io.servicecomb.saga.omega.context.OmegaContext;
 import io.servicecomb.saga.omega.context.UniqueIdGenerator;
 import io.servicecomb.saga.omega.format.NativeMessageFormat;
 import io.servicecomb.saga.omega.transaction.MessageSender;
-import io.servicecomb.saga.omega.transaction.MessageSerializer;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
 
 @Configuration
 class OmegaSpringConfig {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private ManagedChannel clientChannel;
+  private final List<ManagedChannel> channels = new ArrayList<>();
 
   @Bean
   IdGenerator<String> idGenerator() {
@@ -59,7 +57,7 @@ class OmegaSpringConfig {
 
   @PreDestroy
   void close() {
-    clientChannel.shutdown();
+    channels.forEach(ManagedChannel::shutdown);
   }
 
   @Bean
@@ -67,8 +65,7 @@ class OmegaSpringConfig {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        String[] pair = address.split(":");
-        return createMessageSender(pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat());
+        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat());
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
@@ -78,10 +75,14 @@ class OmegaSpringConfig {
         "None of the alpha cluster is reachable: " + Arrays.toString(addresses));
   }
 
-  private GrpcClientMessageSender createMessageSender(String host, int port, MessageSerializer serializer) {
-    clientChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
-    TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel);
-    GrpcTxEventEndpointImpl eventService = new GrpcTxEventEndpointImpl(stub);
-    return new GrpcClientMessageSender(eventService, serializer);
+  private ManagedChannel grpcChannel(String address) {
+    String[] pair = address.split(":");
+
+    ManagedChannel channel = ManagedChannelBuilder.forAddress(pair[0], Integer.parseInt(pair[1]))
+        .usePlaintext(true)
+        .build();
+
+    channels.add(channel);
+    return channel;
   }
 }
diff --git a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java b/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java
deleted file mode 100644
index 32a3b6b..0000000
--- a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.pack.contract.grpc;
-
-public interface GrpcTxEventEndpoint {
-  void reportEvent(GrpcTxEvent message);
-}

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 04/05: SCB-149 omega callback signature change

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 47a1ba63e49b9ab7b9e1f041ebbf08daf6261c03
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 2 09:27:49 2018 +0800

    SCB-149 omega callback signature change
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java   | 2 +-
 .../main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java | 2 +-
 .../java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java  | 4 ++--
 .../src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java   | 4 +++-
 .../java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java   | 4 ++--
 5 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
index 5ebfb72..166994d 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -18,5 +18,5 @@
 package io.servicecomb.saga.alpha.core;
 
 public interface OmegaCallback {
-  void compensate(String globalTxId, String localTxId, String compensationMethod, byte[] message);
+  void compensate(TxEvent event);
 }
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index 34218bf..5dcb7bc 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -49,6 +49,6 @@ public class TxConsistentService {
   // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
   private void compensate(TxEvent event) {
     List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
-    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.localTxId(), evt.compensationMethod(), evt.payloads()));
+    events.forEach(omegaCallback::compensate);
   }
 }
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 635690b..76b3099 100644
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -66,8 +66,8 @@ public class TxConsistentServiceTest {
   private final String compensationMethod = getClass().getCanonicalName();
   private final List<CompensationContext> compensationContexts = new ArrayList<>();
 
-  private final OmegaCallback omegaCallback = (globalTxId, localTxId, compensationMethod, payloads) ->
-      compensationContexts.add(new CompensationContext(globalTxId, localTxId, compensationMethod, payloads));
+  private final OmegaCallback omegaCallback = event ->
+      compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
 
   private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index eca48bc..55105d4 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -23,6 +23,7 @@ import org.springframework.context.annotation.Configuration;
 
 import io.servicecomb.saga.alpha.core.OmegaCallback;
 import io.servicecomb.saga.alpha.core.TxConsistentService;
+import io.servicecomb.saga.alpha.core.TxEvent;
 import io.servicecomb.saga.alpha.core.TxEventRepository;
 
 @Configuration
@@ -31,7 +32,7 @@ class AlphaConfig {
   @Bean
   OmegaCallback omegaCallback() {
     // TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138
-    return (globalTxId, localTxId, compensationMethod, message) -> {};
+    return event -> {};
   }
   
   @Bean
@@ -47,6 +48,7 @@ class AlphaConfig {
     return eventRepository;
   }
 
+  // TODO: 2017/12/29 how to match callback with service instance? send some msg on startup?
   private ServerStartable buildGrpc(int port, OmegaCallback omegaCallback, TxEventRepository eventRepository) {
     return new GrpcStartable(
         port,
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 05ab8d2..2b83b97 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -163,8 +163,8 @@ public class AlphaIntegrationTest {
 
     @Bean
     OmegaCallback omegaCallback() {
-      return (globalTxId, localTxId, compensationMethod, payloads) ->
-          compensationContexts.add(new CompensationContext(globalTxId, localTxId, compensationMethod, payloads));
+      return event ->
+          compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
     }
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 02/05: SCB-149 added service name and instance id to contract

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 6cbcf093ba3e0b05391f6a682420d03ee78ae6b2
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Dec 29 19:25:18 2017 +0800

    SCB-149 added service name and instance id to contract
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../io/servicecomb/saga/alpha/core/TxEvent.java    | 17 ++++++-
 .../saga/alpha/core/TxConsistentServiceTest.java   |  7 ++-
 alpha/alpha-server/pom.xml                         |  4 ++
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  2 +
 .../saga/alpha/server/TxEventEnvelope.java         | 15 +++++-
 .../alpha/server/TxEventEnvelopeRepository.java    |  2 +-
 .../src/main/resources/schema-mysql.sql            |  2 +
 .../saga/alpha/server/AlphaIntegrationTest.java    | 12 ++++-
 .../saga/integration/pack/tests/PackIT.java        | 15 +++++-
 .../integration/pack/tests/TxEventEnvelope.java    | 10 ++++
 .../connector/grpc/GrpcClientMessageSender.java    |  7 ++-
 .../saga/omega/context/ServiceConfig.java          | 54 ++++++++--------------
 .../saga/omega/spring/OmegaSpringConfig.java       | 10 +++-
 .../src/main/proto/GrpcTxEvent.proto               |  2 +
 14 files changed, 114 insertions(+), 45 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
index da46db8..6781cb5 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
@@ -20,6 +20,8 @@ package io.servicecomb.saga.alpha.core;
 import java.util.Date;
 
 public class TxEvent {
+  private String serviceName;
+  private String instanceId;
   private Date creationTime;
   private String globalTxId;
   private String localTxId;
@@ -31,13 +33,18 @@ public class TxEvent {
   private TxEvent() {
   }
 
-  public TxEvent(Date creationTime,
+  public TxEvent(
+      String serviceName,
+      String instanceId,
+      Date creationTime,
       String globalTxId,
       String localTxId,
       String parentTxId,
       String type,
       String compensationMethod,
       byte[] payloads) {
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
     this.creationTime = creationTime;
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
@@ -47,6 +54,14 @@ public class TxEvent {
     this.payloads = payloads;
   }
 
+  public String serviceName() {
+    return serviceName;
+  }
+
+  public String instanceId() {
+    return instanceId;
+  }
+
   public Date creationTime() {
     return creationTime;
   }
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 13a2674..9d9332d 100644
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -17,6 +17,7 @@
 
 package io.servicecomb.saga.alpha.core;
 
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static io.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
@@ -59,6 +60,8 @@ public class TxConsistentServiceTest {
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
 
   private final String compensationMethod = getClass().getCanonicalName();
   private final List<CompensationContext> compensationContexts = new ArrayList<>();
@@ -107,11 +110,11 @@ public class TxConsistentServiceTest {
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
+    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
 
   private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId) {
-    return new TxEvent(new Date(),
+    return new TxEvent(serviceName, instanceId, new Date(),
         globalTxId,
         localTxId,
         UUID.randomUUID().toString(),
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 3af09c2..32647e6 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -84,6 +84,10 @@
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.github.seanyinx</groupId>
+      <artifactId>unit-scaffolding</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 42c597e..3c23a79 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -40,6 +40,8 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
   @Override
   public void reportEvent(GrpcTxEvent message, StreamObserver<GrpcEmpty> responseObserver) {
     txConsistentService.handle(new TxEvent(
+        message.getServiceName(),
+        message.getInstanceId(),
         new Date(message.getTimestamp()),
         message.getGlobalTxId(),
         message.getLocalTxId(),
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index fa282b4..06a44dc 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -42,13 +42,24 @@ class TxEventEnvelope {
     this.event = event;
   }
 
-  public TxEventEnvelope(String globalTxId,
+  public TxEventEnvelope(
+      String serviceName,
+      String instanceId,
+      String globalTxId,
       String localTxId,
       String parentTxId,
       String type,
       String compensationMethod,
       byte[] payloads) {
-    this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+    this.event = new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+  }
+
+  String serviceName() {
+    return event.serviceName();
+  }
+
+  String instanceId() {
+    return event.instanceId();
   }
 
   public long creationTime() {
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 5f929a1..3c35cba 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -26,7 +26,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long
   TxEventEnvelope findByEventGlobalTxId(String globalTxId);
 
   @Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope("
-      + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+      + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
       + ") FROM TxEventEnvelope t "
       + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
   List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index 2940804..bd98c2a 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -1,5 +1,7 @@
 CREATE TABLE IF NOT EXISTS `tx_event_envelope` (
   `surrogate_id` bigint NOT NULL AUTO_INCREMENT,
+  `service_name` varchar(16) NOT NULL,
+  `instance_id` varchar(36) NOT NULL,
   `creation_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
   `global_tx_id` varchar(36) NOT NULL,
   `local_tx_id` varchar(36) NOT NULL,
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 56a57eb..271977e 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -17,6 +17,7 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
@@ -70,6 +71,8 @@ public class AlphaIntegrationTest {
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
   private final String compensationMethod = getClass().getCanonicalName();
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
 
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
@@ -88,6 +91,8 @@ public class AlphaIntegrationTest {
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
+    assertThat(envelope.serviceName(), is(serviceName));
+    assertThat(envelope.instanceId(), is(instanceId));
     assertThat(envelope.globalTxId(), is(globalTxId));
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
@@ -118,6 +123,8 @@ public class AlphaIntegrationTest {
 
   private GrpcTxEvent someGrpcEvent(EventType type) {
     return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
         .setTimestamp(System.currentTimeMillis())
         .setGlobalTxId(this.globalTxId)
         .setLocalTxId(this.localTxId)
@@ -133,7 +140,10 @@ public class AlphaIntegrationTest {
   }
 
   private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads) {
-    return new TxEventEnvelope(new TxEvent(new Date(),
+    return new TxEventEnvelope(new TxEvent(
+        serviceName,
+        instanceId,
+        new Date(),
         globalTxId,
         localTxId,
         parentTxId,
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
index 334e60c..fb330c8 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -18,6 +18,7 @@
 package io.servicecomb.saga.integration.pack.tests;
 
 import static io.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -42,8 +43,9 @@ import io.servicecomb.saga.omega.context.OmegaContext;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = GreetingApplication.class, webEnvironment = WebEnvironment.DEFINED_PORT,
-    properties = {"server.port=8080"})
+    properties = {"server.port=8080", "spring.application.name=greeting-service"})
 public class PackIT {
+  private static final String serviceName = "greeting-service";
   private final String globalTxId = UUID.randomUUID().toString();
 
   @Autowired
@@ -75,12 +77,23 @@ public class PackIT {
     assertThat(envelopes.size(), is(4));
     assertThat(envelopes.get(0).type(), is("TxStartedEvent"));
     assertThat(envelopes.get(0).parentTxId(), is(nullValue()));
+    assertThat(envelopes.get(0).serviceName(), is(serviceName));
+    assertThat(envelopes.get(0).instanceId(), is(notNullValue()));
+
     assertThat(envelopes.get(1).type(), is("TxEndedEvent"));
     assertThat(envelopes.get(1).parentTxId(), is(nullValue()));
+    assertThat(envelopes.get(1).serviceName(), is(serviceName));
+    assertThat(envelopes.get(1).instanceId(), is(notNullValue()));
+
 
     assertThat(envelopes.get(2).type(), is("TxStartedEvent"));
     assertThat(envelopes.get(2).parentTxId(), is(envelopes.get(0).localTxId()));
+    assertThat(envelopes.get(2).serviceName(), is(serviceName));
+    assertThat(envelopes.get(2).instanceId(), is(notNullValue()));
+
     assertThat(envelopes.get(3).type(), is("TxEndedEvent"));
     assertThat(envelopes.get(3).parentTxId(), is(envelopes.get(0).localTxId()));
+    assertThat(envelopes.get(3).serviceName(), is(serviceName));
+    assertThat(envelopes.get(3).instanceId(), is(notNullValue()));
   }
 }
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
index ae7a302..206088d 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
+++ b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
@@ -29,6 +29,8 @@ class TxEventEnvelope {
   @GeneratedValue
   private long surrogateId;
 
+  private String serviceName;
+  private String instanceId;
   private Date creationTime;
   private String globalTxId;
   private String localTxId;
@@ -39,6 +41,14 @@ class TxEventEnvelope {
   private TxEventEnvelope() {
   }
 
+  String serviceName() {
+    return serviceName;
+  }
+
+  String instanceId() {
+    return instanceId;
+  }
+
   String localTxId() {
     return localTxId;
   }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 16f94b3..2c56247 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -23,6 +23,7 @@ package io.servicecomb.saga.omega.connector.grpc;
 import com.google.protobuf.ByteString;
 
 import io.grpc.ManagedChannel;
+import io.servicecomb.saga.omega.context.ServiceConfig;
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.MessageSerializer;
 import io.servicecomb.saga.omega.transaction.TxEvent;
@@ -36,10 +37,12 @@ public class GrpcClientMessageSender implements MessageSender {
   private final TxEventServiceBlockingStub eventService;
 
   private final MessageSerializer serializer;
+  private final ServiceConfig serviceConfig;
 
-  public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer) {
+  public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer, ServiceConfig serviceConfig) {
     this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
     this.serializer = serializer;
+    this.serviceConfig = serviceConfig;
   }
 
   @Override
@@ -51,6 +54,8 @@ public class GrpcClientMessageSender implements MessageSender {
     ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads()));
 
     Builder builder = GrpcTxEvent.newBuilder()
+        .setServiceName(serviceConfig.serviceName())
+        .setInstanceId(serviceConfig.instanceId())
         .setTimestamp(event.timestamp())
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
similarity index 56%
copy from integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
copy to omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
index ae7a302..53671ad 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
@@ -15,43 +15,29 @@
  * limitations under the License.
  */
 
-package io.servicecomb.saga.integration.pack.tests;
-
-import java.util.Date;
-
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.Id;
-
-@Entity
-class TxEventEnvelope {
-  @Id
-  @GeneratedValue
-  private long surrogateId;
-
-  private Date creationTime;
-  private String globalTxId;
-  private String localTxId;
-  private String parentTxId;
-  private String type;
-  private byte[] payloads;
-
-  private TxEventEnvelope() {
-  }
-
-  String localTxId() {
-    return localTxId;
-  }
-
-  String parentTxId() {
-    return parentTxId;
+package io.servicecomb.saga.omega.context;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ServiceConfig {
+  private final String serviceName;
+  private final String instanceId;
+
+  public ServiceConfig(String serviceName) {
+    this.serviceName = serviceName;
+    try {
+      instanceId = serviceName + "-" + InetAddress.getLocalHost().getHostAddress();
+    } catch (UnknownHostException e) {
+      throw new IllegalStateException(e);
+    }
   }
 
-  String type() {
-    return type;
+  public String serviceName() {
+    return serviceName;
   }
 
-  public byte[] payloads() {
-    return payloads;
+  public String instanceId() {
+    return instanceId;
   }
 }
diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 73e2212..28181f4 100644
--- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -35,6 +35,7 @@ import io.grpc.ManagedChannelBuilder;
 import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
 import io.servicecomb.saga.omega.context.IdGenerator;
 import io.servicecomb.saga.omega.context.OmegaContext;
+import io.servicecomb.saga.omega.context.ServiceConfig;
 import io.servicecomb.saga.omega.context.UniqueIdGenerator;
 import io.servicecomb.saga.omega.format.NativeMessageFormat;
 import io.servicecomb.saga.omega.transaction.MessageSender;
@@ -55,17 +56,22 @@ class OmegaSpringConfig {
     return new OmegaContext(idGenerator);
   }
 
+  @Bean
+  ServiceConfig serviceConfig(@Value("${spring.application.name}") String serviceName) {
+    return new ServiceConfig(serviceName);
+  }
+
   @PreDestroy
   void close() {
     channels.forEach(ManagedChannel::shutdown);
   }
 
   @Bean
-  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses) {
+  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig) {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat());
+        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig);
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index f6ebf74..743df4d 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -33,6 +33,8 @@ message GrpcTxEvent {
   string type = 5;
   string compensationMethod = 6;
   bytes payloads = 7;
+  string serviceName = 8;
+  string instanceId = 9;
 }
 
 message GrpcEmpty {}

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 03/05: SCB-149 checked matching of compensation method

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 1fe3e9ddb8bbeb53462c69d2ab0c1b83113ac692
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 2 09:11:36 2018 +0800

    SCB-149 checked matching of compensation method
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java         |  2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java     | 14 +++++++-------
 .../saga/alpha/server/AlphaIntegrationTest.java      | 20 ++++++++++----------
 3 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index 15f5099..34218bf 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -49,6 +49,6 @@ public class TxConsistentService {
   // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
   private void compensate(TxEvent event) {
     List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
-    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.localTxId(), event.compensationMethod(), evt.payloads()));
+    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.localTxId(), evt.compensationMethod(), evt.payloads()));
   }
 }
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 9d9332d..635690b 100644
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -91,12 +91,12 @@ public class TxConsistentServiceTest {
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
     String localTxId1 = UUID.randomUUID().toString();
-    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1));
-    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1));
+    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
 
     String localTxId2 = UUID.randomUUID().toString();
-    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2));
-    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2));
+    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
 
     TxEvent abortEvent = newEvent(TxAbortedEvent);
 
@@ -104,8 +104,8 @@ public class TxConsistentServiceTest {
 
     await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
     assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, localTxId1, compensationMethod, "service a".getBytes()),
-        new CompensationContext(globalTxId, localTxId2, compensationMethod, "service b".getBytes())
+        new CompensationContext(globalTxId, localTxId1, "method a", "service a".getBytes()),
+        new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
     ));
   }
 
@@ -113,7 +113,7 @@ public class TxConsistentServiceTest {
     return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
 
-  private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId) {
+  private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId, String compensationMethod) {
     return new TxEvent(serviceName, instanceId, new Date(),
         globalTxId,
         localTxId,
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 271977e..05ab8d2 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -104,20 +104,20 @@ public class AlphaIntegrationTest {
   @Test
   public void doNotCompensateDuplicateTxOnFailure() throws Exception {
     // duplicate events with same content but different timestamp
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0], "method b"));
 
     stub.reportEvent(someGrpcEvent(TxAbortedEvent));
 
     await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
     assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, this.localTxId, compensationMethod, "service a".getBytes()),
-        new CompensationContext(globalTxId, localTxId1, compensationMethod, "service b".getBytes())
+        new CompensationContext(globalTxId, this.localTxId, "method a", "service a".getBytes()),
+        new CompensationContext(globalTxId, localTxId1, "method b", "service b".getBytes())
     ));
   }
 
@@ -135,11 +135,11 @@ public class AlphaIntegrationTest {
         .build();
   }
 
-  private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads) {
-    return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads);
+  private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads, String compensationMethod) {
+    return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
   }
 
-  private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads) {
+  private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
     return new TxEventEnvelope(new TxEvent(
         serviceName,
         instanceId,

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 05/05: SCB-149 distinguished omega callbacks by service name and instance id

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 8d459e2691a41ba6f035efa3baaf71f6a8979bda
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 2 10:49:49 2018 +0800

    SCB-149 distinguished omega callbacks by service name and instance id
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 alpha/alpha-core/pom.xml                           |   9 ++
 .../saga/alpha/core/AlphaException.java            |  24 ++++
 .../saga/alpha/core/CompositeOmegaCallback.java    |  44 ++++++++
 .../saga/alpha/core/RetryOmegaCallback.java        |  71 ++++++++++++
 .../alpha/core/CompositeOmegaCallbackTest.java     | 122 +++++++++++++++++++++
 .../saga/alpha/core/RetryOmegaCallbackTest.java    |  79 +++++++++++++
 .../alpha-core/src/test/resources/log4j2-test.xml  |  30 +++++
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  21 +++-
 8 files changed, 395 insertions(+), 5 deletions(-)

diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml
index 92f22f6..93c810f 100644
--- a/alpha/alpha-core/pom.xml
+++ b/alpha/alpha-core/pom.xml
@@ -29,6 +29,11 @@
   <artifactId>alpha-core</artifactId>
   <dependencies>
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
@@ -44,6 +49,10 @@
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
   </dependencies>
 
 
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java
new file mode 100644
index 0000000..a5eb3c4
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+public class AlphaException extends RuntimeException {
+  public AlphaException(String cause) {
+    super(cause);
+  }
+}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
new file mode 100644
index 0000000..e5c4b12
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import java.util.Map;
+
+public class CompositeOmegaCallback implements OmegaCallback {
+  private final Map<String, Map<String, OmegaCallback>> callbacks;
+
+  public CompositeOmegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
+    this.callbacks = callbacks;
+  }
+
+  @Override
+  public void compensate(TxEvent event) {
+    Map<String, OmegaCallback> serviceCallbacks = callbacks.get(event.serviceName());
+
+    if (serviceCallbacks.isEmpty()) {
+      throw new AlphaException("No such omega callback found for service " + event.serviceName());
+    }
+
+    OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
+    if (omegaCallback == null) {
+      serviceCallbacks.values().iterator().next().compensate(event);
+    } else {
+      omegaCallback.compensate(event);
+    }
+  }
+}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
new file mode 100644
index 0000000..6f1a7dd
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RetryOmegaCallback implements OmegaCallback {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final String ERROR_MESSAGE = "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]";
+
+  private final OmegaCallback underlying;
+  private final int delay;
+
+  public RetryOmegaCallback(OmegaCallback underlying, int delay) {
+    this.underlying = underlying;
+    this.delay = delay;
+  }
+
+  @Override
+  public void compensate(TxEvent event) {
+    boolean success = false;
+    do {
+      try {
+        underlying.compensate(event);
+        success = true;
+      } catch (Exception e) {
+        logError(ERROR_MESSAGE, event, e);
+        sleep(event);
+      }
+    } while (!success && !Thread.currentThread().isInterrupted());
+  }
+
+  private void sleep(TxEvent event) {
+    try {
+      TimeUnit.MILLISECONDS.sleep(delay);
+    } catch (InterruptedException e) {
+      logError(ERROR_MESSAGE + " due to interruption", event, e);
+
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void logError(String message, TxEvent event, Exception e) {
+    log.error(message,
+        event.serviceName(),
+        event.instanceId(),
+        event.compensationMethod(),
+        event.globalTxId(),
+        event.localTxId(),
+        e);
+  }
+}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
new file mode 100644
index 0000000..38e04a3
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class CompositeOmegaCallbackTest {
+
+  private final OmegaCallback callback1One = Mockito.mock(OmegaCallback.class);
+  private final OmegaCallback callback1Two = Mockito.mock(OmegaCallback.class);
+
+  private final OmegaCallback callback2One = Mockito.mock(OmegaCallback.class);
+  private final OmegaCallback callback2Two = Mockito.mock(OmegaCallback.class);
+
+  private final String serviceName1 = uniquify("serviceName1");
+  private final String instanceId1One = uniquify("instanceId1One");
+  private final String instanceId1Two = uniquify("instanceId1Two");
+
+  private final String serviceName2 = uniquify("serviceName2");
+  private final String instanceId2One = uniquify("instanceId2One");
+  private final String instanceId2Two = uniquify("instanceId2Two");
+
+  private final Map<String, Map<String, OmegaCallback>> callbacks = new ConcurrentHashMap<>();
+  private final CompositeOmegaCallback compositeOmegaCallback = new CompositeOmegaCallback(callbacks);
+
+  @Before
+  public void setUp() throws Exception {
+    callbacks.put(serviceName1, new ConcurrentHashMap<>());
+    callbacks.get(serviceName1).put(instanceId1One, callback1One);
+    callbacks.get(serviceName1).put(instanceId1Two, callback1Two);
+
+    callbacks.put(serviceName2, new ConcurrentHashMap<>());
+    callbacks.get(serviceName2).put(instanceId2One, callback2One);
+    callbacks.get(serviceName2).put(instanceId2Two, callback2Two);
+  }
+
+  @Test
+  public void compensateCorrespondingOmegaInstanceOnly() throws Exception {
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
+
+    compositeOmegaCallback.compensate(event);
+
+    verify(callback1One, never()).compensate(event);
+    verify(callback1Two, never()).compensate(event);
+    verify(callback2One).compensate(event);
+    verify(callback2Two, never()).compensate(event);
+  }
+
+  @Test
+  public void compensateOtherOmegaInstance_IfTheRequestedIsUnreachable() throws Exception {
+    callbacks.get(serviceName2).remove(instanceId2One);
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
+
+    compositeOmegaCallback.compensate(event);
+
+    verify(callback1One, never()).compensate(event);
+    verify(callback1Two, never()).compensate(event);
+    verify(callback2One, never()).compensate(event);
+    verify(callback2Two).compensate(event);
+  }
+
+  @Test
+  public void blowsUpIfNoSuchServiceIsReachable() throws Exception {
+    callbacks.get(serviceName2).clear();
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
+
+    try {
+      compositeOmegaCallback.compensate(event);
+      expectFailing(AlphaException.class);
+    } catch (AlphaException e) {
+      assertThat(e.getMessage(), is("No such omega callback found for service " + serviceName2));
+    }
+
+    verify(callback1One, never()).compensate(event);
+    verify(callback1Two, never()).compensate(event);
+    verify(callback2One, never()).compensate(event);
+    verify(callback2Two, never()).compensate(event);
+  }
+
+  private TxEvent eventOf(String serviceName, String instanceId, EventType eventType) {
+    return new TxEvent(
+        serviceName,
+        instanceId,
+        new Date(),
+        uniquify("globalTxId"),
+        uniquify("localTxId"),
+        UUID.randomUUID().toString(),
+        eventType.name(),
+        getClass().getCanonicalName(),
+        uniquify("blah").getBytes());
+  }
+}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
new file mode 100644
index 0000000..27cc16f
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Date;
+import java.util.UUID;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RetryOmegaCallbackTest {
+  private final int delay = 100;
+  private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
+  private final RetryOmegaCallback callback = new RetryOmegaCallback(underlying, delay);
+
+  @Test
+  public void retryOnFailure() throws Exception {
+    TxEvent event = someEvent();
+
+    doThrow(AlphaException.class)
+        .doThrow(AlphaException.class)
+        .doNothing()
+        .when(underlying)
+        .compensate(event);
+
+    callback.compensate(event);
+
+    verify(underlying, times(3)).compensate(event);
+  }
+
+  @Test
+  public void exitOnInterruption() throws Exception {
+    TxEvent event = someEvent();
+
+    doThrow(AlphaException.class).when(underlying).compensate(event);
+
+    Thread thread = new Thread(() -> callback.compensate(event));
+    thread.start();
+
+    Thread.sleep(300);
+    thread.interrupt();
+
+    verify(underlying, atMost(4)).compensate(event);
+  }
+
+  private TxEvent someEvent() {
+    return new TxEvent(
+        uniquify("serviceName"),
+        uniquify("instanceId"),
+        new Date(),
+        uniquify("globalTxId"),
+        uniquify("localTxId"),
+        UUID.randomUUID().toString(),
+        EventType.TxStartedEvent.name(),
+        getClass().getCanonicalName(),
+        uniquify("blah").getBytes());
+  }
+}
diff --git a/alpha/alpha-core/src/test/resources/log4j2-test.xml b/alpha/alpha-core/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/alpha/alpha-core/src/test/resources/log4j2-test.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index 55105d4..f44c951 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -17,22 +17,34 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import io.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.RetryOmegaCallback;
 import io.servicecomb.saga.alpha.core.TxConsistentService;
-import io.servicecomb.saga.alpha.core.TxEvent;
 import io.servicecomb.saga.alpha.core.TxEventRepository;
 
 @Configuration
 class AlphaConfig {
 
+  // TODO: 2017/12/27 to be filled with actual callbacks on completion of SCB-138
   @Bean
-  OmegaCallback omegaCallback() {
-    // TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138
-    return event -> {};
+  Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
+    return new ConcurrentHashMap<>();
+  }
+
+  @Bean
+  OmegaCallback omegaCallback(
+      Map<String, Map<String, OmegaCallback>> callbacks,
+      @Value("${alpha.compensation.retry.delay:3000}") int delay) {
+
+    return new RetryOmegaCallback(new CompositeOmegaCallback(callbacks), delay);
   }
   
   @Bean
@@ -48,7 +60,6 @@ class AlphaConfig {
     return eventRepository;
   }
 
-  // TODO: 2017/12/29 how to match callback with service instance? send some msg on startup?
   private ServerStartable buildGrpc(int port, OmegaCallback omegaCallback, TxEventRepository eventRepository) {
     return new GrpcStartable(
         port,

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.