You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/02 02:52:11 UTC

[incubator-servicecomb-saga] 02/05: SCB-149 added service name and instance id to contract

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 6cbcf093ba3e0b05391f6a682420d03ee78ae6b2
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Dec 29 19:25:18 2017 +0800

    SCB-149 added service name and instance id to contract
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../io/servicecomb/saga/alpha/core/TxEvent.java    | 17 ++++++-
 .../saga/alpha/core/TxConsistentServiceTest.java   |  7 ++-
 alpha/alpha-server/pom.xml                         |  4 ++
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  2 +
 .../saga/alpha/server/TxEventEnvelope.java         | 15 +++++-
 .../alpha/server/TxEventEnvelopeRepository.java    |  2 +-
 .../src/main/resources/schema-mysql.sql            |  2 +
 .../saga/alpha/server/AlphaIntegrationTest.java    | 12 ++++-
 .../saga/integration/pack/tests/PackIT.java        | 15 +++++-
 .../integration/pack/tests/TxEventEnvelope.java    | 10 ++++
 .../connector/grpc/GrpcClientMessageSender.java    |  7 ++-
 .../saga/omega/context/ServiceConfig.java          | 54 ++++++++--------------
 .../saga/omega/spring/OmegaSpringConfig.java       | 10 +++-
 .../src/main/proto/GrpcTxEvent.proto               |  2 +
 14 files changed, 114 insertions(+), 45 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
index da46db8..6781cb5 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
@@ -20,6 +20,8 @@ package io.servicecomb.saga.alpha.core;
 import java.util.Date;
 
 public class TxEvent {
+  private String serviceName;
+  private String instanceId;
   private Date creationTime;
   private String globalTxId;
   private String localTxId;
@@ -31,13 +33,18 @@ public class TxEvent {
   private TxEvent() {
   }
 
-  public TxEvent(Date creationTime,
+  public TxEvent(
+      String serviceName,
+      String instanceId,
+      Date creationTime,
       String globalTxId,
       String localTxId,
       String parentTxId,
       String type,
       String compensationMethod,
       byte[] payloads) {
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
     this.creationTime = creationTime;
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
@@ -47,6 +54,14 @@ public class TxEvent {
     this.payloads = payloads;
   }
 
+  public String serviceName() {
+    return serviceName;
+  }
+
+  public String instanceId() {
+    return instanceId;
+  }
+
   public Date creationTime() {
     return creationTime;
   }
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 13a2674..9d9332d 100644
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -17,6 +17,7 @@
 
 package io.servicecomb.saga.alpha.core;
 
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static io.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
@@ -59,6 +60,8 @@ public class TxConsistentServiceTest {
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
 
   private final String compensationMethod = getClass().getCanonicalName();
   private final List<CompensationContext> compensationContexts = new ArrayList<>();
@@ -107,11 +110,11 @@ public class TxConsistentServiceTest {
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
+    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
 
   private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId) {
-    return new TxEvent(new Date(),
+    return new TxEvent(serviceName, instanceId, new Date(),
         globalTxId,
         localTxId,
         UUID.randomUUID().toString(),
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 3af09c2..32647e6 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -84,6 +84,10 @@
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.github.seanyinx</groupId>
+      <artifactId>unit-scaffolding</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 42c597e..3c23a79 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -40,6 +40,8 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
   @Override
   public void reportEvent(GrpcTxEvent message, StreamObserver<GrpcEmpty> responseObserver) {
     txConsistentService.handle(new TxEvent(
+        message.getServiceName(),
+        message.getInstanceId(),
         new Date(message.getTimestamp()),
         message.getGlobalTxId(),
         message.getLocalTxId(),
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index fa282b4..06a44dc 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -42,13 +42,24 @@ class TxEventEnvelope {
     this.event = event;
   }
 
-  public TxEventEnvelope(String globalTxId,
+  public TxEventEnvelope(
+      String serviceName,
+      String instanceId,
+      String globalTxId,
       String localTxId,
       String parentTxId,
       String type,
       String compensationMethod,
       byte[] payloads) {
-    this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+    this.event = new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+  }
+
+  String serviceName() {
+    return event.serviceName();
+  }
+
+  String instanceId() {
+    return event.instanceId();
   }
 
   public long creationTime() {
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 5f929a1..3c35cba 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -26,7 +26,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long
   TxEventEnvelope findByEventGlobalTxId(String globalTxId);
 
   @Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope("
-      + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+      + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
       + ") FROM TxEventEnvelope t "
       + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
   List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index 2940804..bd98c2a 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -1,5 +1,7 @@
 CREATE TABLE IF NOT EXISTS `tx_event_envelope` (
   `surrogate_id` bigint NOT NULL AUTO_INCREMENT,
+  `service_name` varchar(16) NOT NULL,
+  `instance_id` varchar(36) NOT NULL,
   `creation_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
   `global_tx_id` varchar(36) NOT NULL,
   `local_tx_id` varchar(36) NOT NULL,
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 56a57eb..271977e 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -17,6 +17,7 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
@@ -70,6 +71,8 @@ public class AlphaIntegrationTest {
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
   private final String compensationMethod = getClass().getCanonicalName();
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
 
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
@@ -88,6 +91,8 @@ public class AlphaIntegrationTest {
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
+    assertThat(envelope.serviceName(), is(serviceName));
+    assertThat(envelope.instanceId(), is(instanceId));
     assertThat(envelope.globalTxId(), is(globalTxId));
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
@@ -118,6 +123,8 @@ public class AlphaIntegrationTest {
 
   private GrpcTxEvent someGrpcEvent(EventType type) {
     return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
         .setTimestamp(System.currentTimeMillis())
         .setGlobalTxId(this.globalTxId)
         .setLocalTxId(this.localTxId)
@@ -133,7 +140,10 @@ public class AlphaIntegrationTest {
   }
 
   private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads) {
-    return new TxEventEnvelope(new TxEvent(new Date(),
+    return new TxEventEnvelope(new TxEvent(
+        serviceName,
+        instanceId,
+        new Date(),
         globalTxId,
         localTxId,
         parentTxId,
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
index 334e60c..fb330c8 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -18,6 +18,7 @@
 package io.servicecomb.saga.integration.pack.tests;
 
 import static io.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -42,8 +43,9 @@ import io.servicecomb.saga.omega.context.OmegaContext;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = GreetingApplication.class, webEnvironment = WebEnvironment.DEFINED_PORT,
-    properties = {"server.port=8080"})
+    properties = {"server.port=8080", "spring.application.name=greeting-service"})
 public class PackIT {
+  private static final String serviceName = "greeting-service";
   private final String globalTxId = UUID.randomUUID().toString();
 
   @Autowired
@@ -75,12 +77,23 @@ public class PackIT {
     assertThat(envelopes.size(), is(4));
     assertThat(envelopes.get(0).type(), is("TxStartedEvent"));
     assertThat(envelopes.get(0).parentTxId(), is(nullValue()));
+    assertThat(envelopes.get(0).serviceName(), is(serviceName));
+    assertThat(envelopes.get(0).instanceId(), is(notNullValue()));
+
     assertThat(envelopes.get(1).type(), is("TxEndedEvent"));
     assertThat(envelopes.get(1).parentTxId(), is(nullValue()));
+    assertThat(envelopes.get(1).serviceName(), is(serviceName));
+    assertThat(envelopes.get(1).instanceId(), is(notNullValue()));
+
 
     assertThat(envelopes.get(2).type(), is("TxStartedEvent"));
     assertThat(envelopes.get(2).parentTxId(), is(envelopes.get(0).localTxId()));
+    assertThat(envelopes.get(2).serviceName(), is(serviceName));
+    assertThat(envelopes.get(2).instanceId(), is(notNullValue()));
+
     assertThat(envelopes.get(3).type(), is("TxEndedEvent"));
     assertThat(envelopes.get(3).parentTxId(), is(envelopes.get(0).localTxId()));
+    assertThat(envelopes.get(3).serviceName(), is(serviceName));
+    assertThat(envelopes.get(3).instanceId(), is(notNullValue()));
   }
 }
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
index ae7a302..206088d 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
+++ b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
@@ -29,6 +29,8 @@ class TxEventEnvelope {
   @GeneratedValue
   private long surrogateId;
 
+  private String serviceName;
+  private String instanceId;
   private Date creationTime;
   private String globalTxId;
   private String localTxId;
@@ -39,6 +41,14 @@ class TxEventEnvelope {
   private TxEventEnvelope() {
   }
 
+  String serviceName() {
+    return serviceName;
+  }
+
+  String instanceId() {
+    return instanceId;
+  }
+
   String localTxId() {
     return localTxId;
   }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 16f94b3..2c56247 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -23,6 +23,7 @@ package io.servicecomb.saga.omega.connector.grpc;
 import com.google.protobuf.ByteString;
 
 import io.grpc.ManagedChannel;
+import io.servicecomb.saga.omega.context.ServiceConfig;
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.MessageSerializer;
 import io.servicecomb.saga.omega.transaction.TxEvent;
@@ -36,10 +37,12 @@ public class GrpcClientMessageSender implements MessageSender {
   private final TxEventServiceBlockingStub eventService;
 
   private final MessageSerializer serializer;
+  private final ServiceConfig serviceConfig;
 
-  public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer) {
+  public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer, ServiceConfig serviceConfig) {
     this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
     this.serializer = serializer;
+    this.serviceConfig = serviceConfig;
   }
 
   @Override
@@ -51,6 +54,8 @@ public class GrpcClientMessageSender implements MessageSender {
     ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads()));
 
     Builder builder = GrpcTxEvent.newBuilder()
+        .setServiceName(serviceConfig.serviceName())
+        .setInstanceId(serviceConfig.instanceId())
         .setTimestamp(event.timestamp())
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
similarity index 56%
copy from integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
copy to omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
index ae7a302..53671ad 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
@@ -15,43 +15,29 @@
  * limitations under the License.
  */
 
-package io.servicecomb.saga.integration.pack.tests;
-
-import java.util.Date;
-
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.Id;
-
-@Entity
-class TxEventEnvelope {
-  @Id
-  @GeneratedValue
-  private long surrogateId;
-
-  private Date creationTime;
-  private String globalTxId;
-  private String localTxId;
-  private String parentTxId;
-  private String type;
-  private byte[] payloads;
-
-  private TxEventEnvelope() {
-  }
-
-  String localTxId() {
-    return localTxId;
-  }
-
-  String parentTxId() {
-    return parentTxId;
+package io.servicecomb.saga.omega.context;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ServiceConfig {
+  private final String serviceName;
+  private final String instanceId;
+
+  public ServiceConfig(String serviceName) {
+    this.serviceName = serviceName;
+    try {
+      instanceId = serviceName + "-" + InetAddress.getLocalHost().getHostAddress();
+    } catch (UnknownHostException e) {
+      throw new IllegalStateException(e);
+    }
   }
 
-  String type() {
-    return type;
+  public String serviceName() {
+    return serviceName;
   }
 
-  public byte[] payloads() {
-    return payloads;
+  public String instanceId() {
+    return instanceId;
   }
 }
diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 73e2212..28181f4 100644
--- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -35,6 +35,7 @@ import io.grpc.ManagedChannelBuilder;
 import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
 import io.servicecomb.saga.omega.context.IdGenerator;
 import io.servicecomb.saga.omega.context.OmegaContext;
+import io.servicecomb.saga.omega.context.ServiceConfig;
 import io.servicecomb.saga.omega.context.UniqueIdGenerator;
 import io.servicecomb.saga.omega.format.NativeMessageFormat;
 import io.servicecomb.saga.omega.transaction.MessageSender;
@@ -55,17 +56,22 @@ class OmegaSpringConfig {
     return new OmegaContext(idGenerator);
   }
 
+  @Bean
+  ServiceConfig serviceConfig(@Value("${spring.application.name}") String serviceName) {
+    return new ServiceConfig(serviceName);
+  }
+
   @PreDestroy
   void close() {
     channels.forEach(ManagedChannel::shutdown);
   }
 
   @Bean
-  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses) {
+  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig) {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat());
+        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig);
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index f6ebf74..743df4d 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -33,6 +33,8 @@ message GrpcTxEvent {
   string type = 5;
   string compensationMethod = 6;
   bytes payloads = 7;
+  string serviceName = 8;
+  string instanceId = 9;
 }
 
 message GrpcEmpty {}

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.