You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2017/12/25 10:36:29 UTC

[incubator-servicecomb-saga] 05/14: SCB-96 encapsulated args into event

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 716c434073c91532625b47f7cf77efb5e7ffd6c8
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Dec 22 19:00:54 2017 +0800

    SCB-96 encapsulated args into event
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../spring/TransactionAspectConfig.java            |  9 ++++++--
 .../spring/TransactionInterceptionTest.java        | 27 +++++++++++++++++-----
 .../saga/omega/transaction/MessageSerializer.java  |  2 +-
 .../transaction/PreTransactionInterceptor.java     |  4 ++--
 .../saga/omega/transaction/TransactionAspect.java  |  9 +++++---
 .../{MessageSerializer.java => TxEvent.java}       | 20 ++++++++++++++--
 ...{MessageSerializer.java => TxStartedEvent.java} | 12 ++++++++--
 .../transaction/PreTransactionInterceptorTest.java | 19 ++++++++++-----
 8 files changed, 78 insertions(+), 24 deletions(-)

diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 683aa85..60671d2 100644
--- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -21,6 +21,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
 
+import io.servicecomb.saga.omega.context.OmegaContext;
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.MessageSerializer;
 import io.servicecomb.saga.omega.transaction.TransactionAspect;
@@ -28,9 +29,13 @@ import io.servicecomb.saga.omega.transaction.TransactionAspect;
 @Configuration
 @EnableAspectJAutoProxy
 class TransactionAspectConfig {
+  @Bean
+  OmegaContext omegaContext() {
+    return new OmegaContext();
+  }
 
   @Bean
-  TransactionAspect transactionAspect(MessageSerializer serializer, MessageSender sender) {
-    return new TransactionAspect(serializer, sender);
+  TransactionAspect transactionAspect(MessageSerializer serializer, MessageSender sender, OmegaContext context) {
+    return new TransactionAspect(serializer, sender, context);
   }
 }
diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 04996ff..e0afebb 100644
--- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -17,6 +17,7 @@
 
 package io.servicecomb.saga.omega.transaction.spring;
 
+import static com.seanyinx.github.unit.scaffolding.Randomness.nextId;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
@@ -24,6 +25,7 @@ import static org.junit.Assert.assertThat;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -32,6 +34,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
+import io.servicecomb.saga.omega.context.OmegaContext;
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.MessageSerializer;
 import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
@@ -39,6 +42,7 @@ import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 public class TransactionInterceptionTest {
+  private final long txId = nextId();
   private final String username = uniquify("username");
   private final String email = uniquify("email");
 
@@ -48,11 +52,19 @@ public class TransactionInterceptionTest {
   @Autowired
   private TransactionalUserService userService;
 
+  @Autowired
+  private OmegaContext omegaContext;
+
+  @Before
+  public void setUp() throws Exception {
+    omegaContext.setTxId(txId);
+  }
+
   @Test
   public void sendsUserToRemote_BeforeTransaction() throws Exception {
     userService.add(new User(username, email));
 
-    assertThat(messages, contains((username + ":" + email).getBytes()));
+    assertThat(messages, contains(serialize(txId, "TxStartedEvent", username, email)));
   }
 
   @Configuration
@@ -71,14 +83,17 @@ public class TransactionInterceptionTest {
 
     @Bean
     MessageSerializer serializer() {
-      return messages -> {
-        if (messages[0] instanceof User) {
-          User user = ((User) messages[0]);
-          return (user.username() + ":" + user.email()).getBytes();
+      return event -> {
+        if (event.payloads()[0] instanceof User) {
+          User user = ((User) event.payloads()[0]);
+          return serialize(event.txId(), event.type(), user.username(), user.email());
         }
-        throw new IllegalArgumentException("Expected instance of User, but was " + messages.getClass());
+        throw new IllegalArgumentException("Expected instance of User, but was " + event.getClass());
       };
     }
   }
 
+  private static byte[] serialize(long txId, String eventType, String username, String email) {
+    return (txId + ":" + eventType + ":" + username + ":" + email).getBytes();
+  }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java
index 1a88f13..b1eb7fd 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java
@@ -18,5 +18,5 @@
 package io.servicecomb.saga.omega.transaction;
 
 public interface MessageSerializer {
-  byte[] serialize(Object[] message);
+  byte[] serialize(TxEvent event);
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
index 089ba32..1934280 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
@@ -26,7 +26,7 @@ class PreTransactionInterceptor {
     this.serializer = serializer;
   }
 
-  void intercept(Object... message) {
-    sender.send(serializer.serialize(message));
+  void intercept(long txId, Object... message) {
+    sender.send(serializer.serialize(new TxStartedEvent(txId, message)));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
index 54f6d58..1701b98 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -27,13 +27,16 @@ import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.servicecomb.saga.omega.context.OmegaContext;
+
 @Aspect
 public class TransactionAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final PreTransactionInterceptor preTransactionInterceptor;
+  private final OmegaContext context;
 
-
-  public TransactionAspect(MessageSerializer serializer, MessageSender sender) {
+  public TransactionAspect(MessageSerializer serializer, MessageSender sender, OmegaContext context) {
+    this.context = context;
     this.preTransactionInterceptor = new PreTransactionInterceptor(sender, serializer);
   }
 
@@ -42,7 +45,7 @@ public class TransactionAspect {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     LOG.debug("Intercepting transactional method {}", method.toString());
 
-    preTransactionInterceptor.intercept(joinPoint.getArgs());
+    preTransactionInterceptor.intercept(context.txId(), joinPoint.getArgs());
     return joinPoint.proceed();
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
similarity index 72%
copy from omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java
copy to omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
index 1a88f13..5347c0d 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
@@ -17,6 +17,22 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-public interface MessageSerializer {
-  byte[] serialize(Object[] message);
+public abstract class TxEvent {
+  private final long txId;
+  private final Object[] payloads;
+
+  TxEvent(Object[] payloads, long txId) {
+    this.payloads = payloads;
+    this.txId = txId;
+  }
+
+  public long txId() {
+    return txId;
+  }
+
+  public Object[] payloads() {
+    return payloads;
+  }
+
+  public abstract String type();
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
similarity index 80%
copy from omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java
copy to omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
index 1a88f13..2839afa 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -17,6 +17,14 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-public interface MessageSerializer {
-  byte[] serialize(Object[] message);
+class TxStartedEvent extends TxEvent {
+
+  TxStartedEvent(long txId, Object[] payloads) {
+    super(payloads, txId);
+  }
+
+  @Override
+  public String type() {
+    return this.getClass().getSimpleName();
+  }
 }
diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
index fd7414e..b20cd3c 100644
--- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
@@ -17,6 +17,7 @@
 
 package io.servicecomb.saga.omega.transaction;
 
+import static com.seanyinx.github.unit.scaffolding.Randomness.nextId;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
@@ -28,22 +29,28 @@ import org.junit.Test;
 
 public class PreTransactionInterceptorTest {
   private final List<byte[]> messages = new ArrayList<>();
+  private final long txId = nextId();
 
   private final MessageSender sender = messages::add;
-  private final MessageSerializer serializer = messages -> {
-    if (messages[0] instanceof String) {
-      return ((String) messages[0]).getBytes();
+  private final MessageSerializer serializer = event -> {
+    if (event.payloads()[0] instanceof String) {
+      String message = (String) event.payloads()[0];
+      return serialize(txId, message);
     }
-    throw new IllegalArgumentException("Expected instance of String, but was " + messages.getClass());
+    throw new IllegalArgumentException("Expected instance of String, but was " + event.getClass());
   };
 
   private final String message = uniquify("message");
   private final PreTransactionInterceptor interceptor = new PreTransactionInterceptor(sender, serializer);
 
+  private byte[] serialize(long txId, String message) {
+    return (txId + ":" + message).getBytes();
+  }
+
   @Test
   public void sendsSerializedMessage() throws Exception {
-    interceptor.intercept(message);
+    interceptor.intercept(txId, message);
 
-    assertThat(messages, contains(message.getBytes()));
+    assertThat(messages, contains(serialize(txId, message)));
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.