You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:30 UTC

[incubator-servicecomb-saga] 04/09: SCB-218 excluded duplicate events during command persistence

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 54e300b97f86a879a39a820bc1c81d47f9fb507e
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 18 16:59:51 2018 +0800

    SCB-218 excluded duplicate events during command persistence
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/Command.java       | 32 ++++++++++++++++++----
 .../saga/alpha/core/TxConsistentService.java       | 12 +++++++-
 .../servicecomb/saga/alpha/core/TxEvent.java       | 32 ++++++++++++++++++++--
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 11 +++++++-
 .../saga/alpha/server/SpringCommandRepository.java |  4 +--
 .../alpha/server/TxEventEnvelopeRepository.java    |  4 +--
 alpha/alpha-server/src/test/resources/schema.sql   |  2 +-
 7 files changed, 83 insertions(+), 14 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index b29988b..b5902e1 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -48,7 +48,8 @@ public class Command {
   Command() {
   }
 
-  Command(String serviceName,
+  Command(long id,
+      String serviceName,
       String instanceId,
       String globalTxId,
       String localTxId,
@@ -57,6 +58,7 @@ public class Command {
       byte[] payloads,
       String status) {
 
+    this.surrogateId = id;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
     this.globalTxId = globalTxId;
@@ -68,7 +70,8 @@ public class Command {
     this.lastModified = new Date();
   }
 
-  Command(String serviceName,
+  Command(long id,
+      String serviceName,
       String instanceId,
       String globalTxId,
       String localTxId,
@@ -76,11 +79,12 @@ public class Command {
       String compensationMethod,
       byte[] payloads) {
 
-    this(serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
+    this(id, serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
   }
 
   Command(Command command, CommandStatus status) {
-    this(command.serviceName,
+    this(command.surrogateId,
+        command.serviceName,
         command.instanceId,
         command.globalTxId,
         command.localTxId,
@@ -91,7 +95,8 @@ public class Command {
   }
 
   public Command(TxEvent event) {
-    this(event.serviceName(),
+    this(event.id(),
+        event.serviceName(),
         event.instanceId(),
         event.globalTxId(),
         event.localTxId(),
@@ -131,4 +136,21 @@ public class Command {
   String status() {
     return status;
   }
+
+  long id() {
+    return surrogateId;
+  }
+
+  @Override
+  public String toString() {
+    return "Command{" +
+        "surrogateId=" + surrogateId +
+        ", serviceName='" + serviceName + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", compensationMethod='" + compensationMethod + '\'' +
+        '}';
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 8f88735..61beea5 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -52,15 +52,25 @@ public class TxConsistentService {
   }};
 
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
-  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+  private final ScheduledExecutorService scheduler;
 
   public TxConsistentService(TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       int commandPollingInterval) {
+
+    this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
+  }
+
+  public TxConsistentService(TxEventRepository eventRepository,
+      CommandRepository commandRepository,
+      OmegaCallback omegaCallback,
+      int commandPollingInterval,
+      ScheduledExecutorService scheduler) {
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
+    this.scheduler = scheduler;
 
     scheduleCompensationCommandPolling(commandPollingInterval);
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 0ff2299..b654689 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -65,7 +65,34 @@ public class TxEvent {
       String type,
       String compensationMethod,
       byte[] payloads) {
-    this.surrogateId = -1L;
+    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+  }
+
+  public TxEvent(
+      long id,
+      String serviceName,
+      String instanceId,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
+    this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+  }
+
+  TxEvent(Long surrogateId,
+      String serviceName,
+      String instanceId,
+      Date creationTime,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
+
+    this.surrogateId = surrogateId;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
     this.creationTime = creationTime;
@@ -120,7 +147,8 @@ public class TxEvent {
   @Override
   public String toString() {
     return "TxEvent{" +
-        "serviceName='" + serviceName + '\'' +
+        "surrogateId=" + surrogateId +
+        ", serviceName='" + serviceName + '\'' +
         ", instanceId='" + instanceId + '\'' +
         ", creationTime=" + creationTime +
         ", globalTxId='" + globalTxId + '\'' +
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 9c34738..3740581 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -20,7 +20,9 @@ package org.apache.servicecomb.saga.alpha.server;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.annotation.PostConstruct;
 
@@ -65,8 +67,14 @@ class AlphaConfig {
   }
 
   @Bean
+  ScheduledExecutorService compensationScheduler() {
+    return Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
       @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
+      ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
@@ -76,7 +84,8 @@ class AlphaConfig {
         eventRepository,
         commandRepository,
         omegaCallback,
-        commandPollingInterval);
+        commandPollingInterval,
+        scheduler);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index a335849..18ee9ad 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -21,7 +21,7 @@ import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -61,7 +61,7 @@ public class SpringCommandRepository implements CommandRepository {
     List<TxEvent> events = eventRepository
         .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
 
-    Map<String, Command> commands = new HashMap<>();
+    Map<String, Command> commands = new LinkedHashMap<>();
 
     for (TxEvent event : events) {
       commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 71d5f1b..09aec6f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -54,8 +54,8 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
  )
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
-  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
-      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
+  @Query("SELECT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+      + "t.surrogateId, t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
       + ") FROM TxEvent t "
       + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
       + "  SELECT t1.globalTxId"
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index c6d66de..fb396b3 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS `TxEvent` (
 ) DEFAULT CHARSET=utf8;
 
 CREATE TABLE IF NOT EXISTS `Command` (
-  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
+  `surrogateId` bigint NOT NULL,
   `serviceName` varchar(36) NOT NULL,
   `instanceId` varchar(36) NOT NULL,
   `globalTxId` varchar(36) NOT NULL,

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.