You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/02 02:52:11 UTC
[incubator-servicecomb-saga] 02/05: SCB-149 added service name and
instance id to contract
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 6cbcf093ba3e0b05391f6a682420d03ee78ae6b2
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Dec 29 19:25:18 2017 +0800
SCB-149 added service name and instance id to contract
Signed-off-by: seanyinx <se...@huawei.com>
---
.../io/servicecomb/saga/alpha/core/TxEvent.java | 17 ++++++-
.../saga/alpha/core/TxConsistentServiceTest.java | 7 ++-
alpha/alpha-server/pom.xml | 4 ++
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 2 +
.../saga/alpha/server/TxEventEnvelope.java | 15 +++++-
.../alpha/server/TxEventEnvelopeRepository.java | 2 +-
.../src/main/resources/schema-mysql.sql | 2 +
.../saga/alpha/server/AlphaIntegrationTest.java | 12 ++++-
.../saga/integration/pack/tests/PackIT.java | 15 +++++-
.../integration/pack/tests/TxEventEnvelope.java | 10 ++++
.../connector/grpc/GrpcClientMessageSender.java | 7 ++-
.../saga/omega/context/ServiceConfig.java | 54 ++++++++--------------
.../saga/omega/spring/OmegaSpringConfig.java | 10 +++-
.../src/main/proto/GrpcTxEvent.proto | 2 +
14 files changed, 114 insertions(+), 45 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
index da46db8..6781cb5 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
@@ -20,6 +20,8 @@ package io.servicecomb.saga.alpha.core;
import java.util.Date;
public class TxEvent {
+ private String serviceName;
+ private String instanceId;
private Date creationTime;
private String globalTxId;
private String localTxId;
@@ -31,13 +33,18 @@ public class TxEvent {
private TxEvent() {
}
- public TxEvent(Date creationTime,
+ public TxEvent(
+ String serviceName,
+ String instanceId,
+ Date creationTime,
String globalTxId,
String localTxId,
String parentTxId,
String type,
String compensationMethod,
byte[] payloads) {
+ this.serviceName = serviceName;
+ this.instanceId = instanceId;
this.creationTime = creationTime;
this.globalTxId = globalTxId;
this.localTxId = localTxId;
@@ -47,6 +54,14 @@ public class TxEvent {
this.payloads = payloads;
}
+ public String serviceName() {
+ return serviceName;
+ }
+
+ public String instanceId() {
+ return instanceId;
+ }
+
public Date creationTime() {
return creationTime;
}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 13a2674..9d9332d 100644
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -17,6 +17,7 @@
package io.servicecomb.saga.alpha.core;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static io.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
import static io.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
@@ -59,6 +60,8 @@ public class TxConsistentServiceTest {
private final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
+ private final String serviceName = uniquify("serviceName");
+ private final String instanceId = uniquify("instanceId");
private final String compensationMethod = getClass().getCanonicalName();
private final List<CompensationContext> compensationContexts = new ArrayList<>();
@@ -107,11 +110,11 @@ public class TxConsistentServiceTest {
}
private TxEvent newEvent(EventType eventType) {
- return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
+ return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
}
private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId) {
- return new TxEvent(new Date(),
+ return new TxEvent(serviceName, instanceId, new Date(),
globalTxId,
localTxId,
UUID.randomUUID().toString(),
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 3af09c2..32647e6 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -84,6 +84,10 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.seanyinx</groupId>
+ <artifactId>unit-scaffolding</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 42c597e..3c23a79 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -40,6 +40,8 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
@Override
public void reportEvent(GrpcTxEvent message, StreamObserver<GrpcEmpty> responseObserver) {
txConsistentService.handle(new TxEvent(
+ message.getServiceName(),
+ message.getInstanceId(),
new Date(message.getTimestamp()),
message.getGlobalTxId(),
message.getLocalTxId(),
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index fa282b4..06a44dc 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -42,13 +42,24 @@ class TxEventEnvelope {
this.event = event;
}
- public TxEventEnvelope(String globalTxId,
+ public TxEventEnvelope(
+ String serviceName,
+ String instanceId,
+ String globalTxId,
String localTxId,
String parentTxId,
String type,
String compensationMethod,
byte[] payloads) {
- this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+ this.event = new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+ }
+
+ String serviceName() {
+ return event.serviceName();
+ }
+
+ String instanceId() {
+ return event.instanceId();
}
public long creationTime() {
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 5f929a1..3c35cba 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -26,7 +26,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long
TxEventEnvelope findByEventGlobalTxId(String globalTxId);
@Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope("
- + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+ + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+ ") FROM TxEventEnvelope t "
+ "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index 2940804..bd98c2a 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -1,5 +1,7 @@
CREATE TABLE IF NOT EXISTS `tx_event_envelope` (
`surrogate_id` bigint NOT NULL AUTO_INCREMENT,
+ `service_name` varchar(16) NOT NULL,
+ `instance_id` varchar(36) NOT NULL,
`creation_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
`global_tx_id` varchar(36) NOT NULL,
`local_tx_id` varchar(36) NOT NULL,
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 56a57eb..271977e 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -17,6 +17,7 @@
package io.servicecomb.saga.alpha.server;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
@@ -70,6 +71,8 @@ public class AlphaIntegrationTest {
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
private final String compensationMethod = getClass().getCanonicalName();
+ private final String serviceName = uniquify("serviceName");
+ private final String instanceId = uniquify("instanceId");
@Autowired
private TxEventEnvelopeRepository eventRepo;
@@ -88,6 +91,8 @@ public class AlphaIntegrationTest {
TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
+ assertThat(envelope.serviceName(), is(serviceName));
+ assertThat(envelope.instanceId(), is(instanceId));
assertThat(envelope.globalTxId(), is(globalTxId));
assertThat(envelope.localTxId(), is(localTxId));
assertThat(envelope.parentTxId(), is(parentTxId));
@@ -118,6 +123,8 @@ public class AlphaIntegrationTest {
private GrpcTxEvent someGrpcEvent(EventType type) {
return GrpcTxEvent.newBuilder()
+ .setServiceName(serviceName)
+ .setInstanceId(instanceId)
.setTimestamp(System.currentTimeMillis())
.setGlobalTxId(this.globalTxId)
.setLocalTxId(this.localTxId)
@@ -133,7 +140,10 @@ public class AlphaIntegrationTest {
}
private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads) {
- return new TxEventEnvelope(new TxEvent(new Date(),
+ return new TxEventEnvelope(new TxEvent(
+ serviceName,
+ instanceId,
+ new Date(),
globalTxId,
localTxId,
parentTxId,
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
index 334e60c..fb330c8 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -18,6 +18,7 @@
package io.servicecomb.saga.integration.pack.tests;
import static io.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -42,8 +43,9 @@ import io.servicecomb.saga.omega.context.OmegaContext;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = GreetingApplication.class, webEnvironment = WebEnvironment.DEFINED_PORT,
- properties = {"server.port=8080"})
+ properties = {"server.port=8080", "spring.application.name=greeting-service"})
public class PackIT {
+ private static final String serviceName = "greeting-service";
private final String globalTxId = UUID.randomUUID().toString();
@Autowired
@@ -75,12 +77,23 @@ public class PackIT {
assertThat(envelopes.size(), is(4));
assertThat(envelopes.get(0).type(), is("TxStartedEvent"));
assertThat(envelopes.get(0).parentTxId(), is(nullValue()));
+ assertThat(envelopes.get(0).serviceName(), is(serviceName));
+ assertThat(envelopes.get(0).instanceId(), is(notNullValue()));
+
assertThat(envelopes.get(1).type(), is("TxEndedEvent"));
assertThat(envelopes.get(1).parentTxId(), is(nullValue()));
+ assertThat(envelopes.get(1).serviceName(), is(serviceName));
+ assertThat(envelopes.get(1).instanceId(), is(notNullValue()));
+
assertThat(envelopes.get(2).type(), is("TxStartedEvent"));
assertThat(envelopes.get(2).parentTxId(), is(envelopes.get(0).localTxId()));
+ assertThat(envelopes.get(2).serviceName(), is(serviceName));
+ assertThat(envelopes.get(2).instanceId(), is(notNullValue()));
+
assertThat(envelopes.get(3).type(), is("TxEndedEvent"));
assertThat(envelopes.get(3).parentTxId(), is(envelopes.get(0).localTxId()));
+ assertThat(envelopes.get(3).serviceName(), is(serviceName));
+ assertThat(envelopes.get(3).instanceId(), is(notNullValue()));
}
}
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
index ae7a302..206088d 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
+++ b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
@@ -29,6 +29,8 @@ class TxEventEnvelope {
@GeneratedValue
private long surrogateId;
+ private String serviceName;
+ private String instanceId;
private Date creationTime;
private String globalTxId;
private String localTxId;
@@ -39,6 +41,14 @@ class TxEventEnvelope {
private TxEventEnvelope() {
}
+ String serviceName() {
+ return serviceName;
+ }
+
+ String instanceId() {
+ return instanceId;
+ }
+
String localTxId() {
return localTxId;
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 16f94b3..2c56247 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -23,6 +23,7 @@ package io.servicecomb.saga.omega.connector.grpc;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
+import io.servicecomb.saga.omega.context.ServiceConfig;
import io.servicecomb.saga.omega.transaction.MessageSender;
import io.servicecomb.saga.omega.transaction.MessageSerializer;
import io.servicecomb.saga.omega.transaction.TxEvent;
@@ -36,10 +37,12 @@ public class GrpcClientMessageSender implements MessageSender {
private final TxEventServiceBlockingStub eventService;
private final MessageSerializer serializer;
+ private final ServiceConfig serviceConfig;
- public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer) {
+ public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer, ServiceConfig serviceConfig) {
this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
this.serializer = serializer;
+ this.serviceConfig = serviceConfig;
}
@Override
@@ -51,6 +54,8 @@ public class GrpcClientMessageSender implements MessageSender {
ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads()));
Builder builder = GrpcTxEvent.newBuilder()
+ .setServiceName(serviceConfig.serviceName())
+ .setInstanceId(serviceConfig.instanceId())
.setTimestamp(event.timestamp())
.setGlobalTxId(event.globalTxId())
.setLocalTxId(event.localTxId())
diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
similarity index 56%
copy from integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
copy to omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
index ae7a302..53671ad 100644
--- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java
@@ -15,43 +15,29 @@
* limitations under the License.
*/
-package io.servicecomb.saga.integration.pack.tests;
-
-import java.util.Date;
-
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.Id;
-
-@Entity
-class TxEventEnvelope {
- @Id
- @GeneratedValue
- private long surrogateId;
-
- private Date creationTime;
- private String globalTxId;
- private String localTxId;
- private String parentTxId;
- private String type;
- private byte[] payloads;
-
- private TxEventEnvelope() {
- }
-
- String localTxId() {
- return localTxId;
- }
-
- String parentTxId() {
- return parentTxId;
+package io.servicecomb.saga.omega.context;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ServiceConfig {
+ private final String serviceName;
+ private final String instanceId;
+
+ public ServiceConfig(String serviceName) {
+ this.serviceName = serviceName;
+ try {
+ instanceId = serviceName + "-" + InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ throw new IllegalStateException(e);
+ }
}
- String type() {
- return type;
+ public String serviceName() {
+ return serviceName;
}
- public byte[] payloads() {
- return payloads;
+ public String instanceId() {
+ return instanceId;
}
}
diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 73e2212..28181f4 100644
--- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -35,6 +35,7 @@ import io.grpc.ManagedChannelBuilder;
import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
import io.servicecomb.saga.omega.context.IdGenerator;
import io.servicecomb.saga.omega.context.OmegaContext;
+import io.servicecomb.saga.omega.context.ServiceConfig;
import io.servicecomb.saga.omega.context.UniqueIdGenerator;
import io.servicecomb.saga.omega.format.NativeMessageFormat;
import io.servicecomb.saga.omega.transaction.MessageSender;
@@ -55,17 +56,22 @@ class OmegaSpringConfig {
return new OmegaContext(idGenerator);
}
+ @Bean
+ ServiceConfig serviceConfig(@Value("${spring.application.name}") String serviceName) {
+ return new ServiceConfig(serviceName);
+ }
+
@PreDestroy
void close() {
channels.forEach(ManagedChannel::shutdown);
}
@Bean
- MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses) {
+ MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig) {
// TODO: 2017/12/26 connect to the one with lowest latency
for (String address : addresses) {
try {
- return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat());
+ return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig);
} catch (Exception e) {
log.error("Unable to connect to alpha at {}", address, e);
}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index f6ebf74..743df4d 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -33,6 +33,8 @@ message GrpcTxEvent {
string type = 5;
string compensationMethod = 6;
bytes payloads = 7;
+ string serviceName = 8;
+ string instanceId = 9;
}
message GrpcEmpty {}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.