You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:33 UTC

[incubator-servicecomb-saga] 07/09: SCB-218 added event id to check for command uniqueness

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 02553d55a0b0a1938d0f3a5efa9712639af3f8d9
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 11:06:16 2018 +0800

    SCB-218 added event id to check for command uniqueness
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/Command.java       | 24 ++++++++--------------
 .../saga/alpha/core/CommandRepository.java         |  2 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  | 23 ++++++++++++---------
 .../saga/alpha/server/CommandEntityRepository.java | 10 ++++-----
 .../saga/alpha/server/SpringCommandRepository.java | 11 +++++++---
 .../alpha/server/TxEventEnvelopeRepository.java    |  2 +-
 .../src/main/resources/schema-postgresql.sql       | 20 +++++++++++++++++-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  2 +-
 alpha/alpha-server/src/test/resources/schema.sql   |  3 ++-
 9 files changed, 57 insertions(+), 40 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 904cc54..2716abf 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -22,6 +22,8 @@ import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 import java.util.Date;
 
 import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.Version;
 
@@ -29,8 +31,10 @@ import javax.persistence.Version;
 public class Command {
 
   @Id
+  @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long surrogateId;
 
+  private long eventId;
   private String serviceName;
   private String instanceId;
   private String globalTxId;
@@ -48,7 +52,7 @@ public class Command {
   Command() {
   }
 
-  Command(long id,
+  private Command(long id,
       String serviceName,
       String instanceId,
       String globalTxId,
@@ -58,7 +62,7 @@ public class Command {
       byte[] payloads,
       String status) {
 
-    this.surrogateId = id;
+    this.eventId = id;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
     this.globalTxId = globalTxId;
@@ -70,7 +74,7 @@ public class Command {
     this.lastModified = new Date();
   }
 
-  Command(long id,
+  private Command(long id,
       String serviceName,
       String instanceId,
       String globalTxId,
@@ -82,18 +86,6 @@ public class Command {
     this(id, serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
   }
 
-  Command(Command command, CommandStatus status) {
-    this(command.surrogateId,
-        command.serviceName,
-        command.instanceId,
-        command.globalTxId,
-        command.localTxId,
-        command.parentTxId,
-        command.compensationMethod,
-        command.payloads,
-        status.name());
-  }
-
   public Command(TxEvent event) {
     this(event.id(),
         event.serviceName(),
@@ -144,7 +136,7 @@ public class Command {
   @Override
   public String toString() {
     return "Command{" +
-        "surrogateId=" + surrogateId +
+        "eventId=" + eventId +
         ", serviceName='" + serviceName + '\'' +
         ", instanceId='" + instanceId + '\'' +
         ", globalTxId='" + globalTxId + '\'' +
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 1da033f..2bbea77 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 public interface CommandRepository {
 
-  Iterable<Command> saveCompensationCommands(String globalTxId);
+  void saveCompensationCommands(String globalTxId);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index ea4aaf6..5a4589d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -81,8 +81,7 @@ public class EventScanner implements Runnable {
         .forEach(event -> {
           log.info("Found uncompensated event {}", event);
           nextEndedEventId = event.id();
-          commandRepository.saveCompensationCommands(event.globalTxId())
-              .forEach(command -> nextEndedEventId = command.id());
+          commandRepository.saveCompensationCommands(event.globalTxId());
         });
   }
 
@@ -130,19 +129,23 @@ public class EventScanner implements Runnable {
 
   private void pollCompensationCommand(int commandPollingInterval) {
     scheduler.scheduleWithFixedDelay(
-        () -> commandRepository.findFirstCommandToCompensate()
-            .forEach(command -> {
-              log.info("Compensating transaction with globalTxId {} and localTxId {}",
-                  command.globalTxId(),
-                  command.localTxId());
-
-              omegaCallback.compensate(txStartedEventOf(command));
-            }),
+        this::compensate,
         0,
         commandPollingInterval,
         MILLISECONDS);
   }
 
+  private void compensate() {
+    commandRepository.findFirstCommandToCompensate()
+        .forEach(command -> {
+          log.info("Compensating transaction with globalTxId {} and localTxId {}",
+              command.globalTxId(),
+              command.localTxId());
+
+          omegaCallback.compensate(txStartedEventOf(command));
+        });
+  }
+
   private TxEvent txStartedEventOf(Command command) {
     return new TxEvent(
         command.serviceName(),
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index d7c583e..4b8c3ee 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
-import java.util.Optional;
 
 import javax.transaction.Transactional;
 
@@ -30,10 +29,9 @@ import org.springframework.data.repository.CrudRepository;
 import org.springframework.data.repository.query.Param;
 
 public interface CommandEntityRepository extends CrudRepository<Command, Long> {
-  Optional<Command> findByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId);
 
   @Transactional
-  @Modifying
+  @Modifying(clearAutomatically = true)
   @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
       + "SET c.status = :status "
       + "WHERE c.globalTxId = :globalTxId "
@@ -47,9 +45,9 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
 
   // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
   @Query("SELECT c FROM Command c "
-      + "WHERE c.surrogateId IN ("
-      + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
+      + "WHERE c.eventId IN ("
+      + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
       + ") "
-      + "ORDER BY c.surrogateId ASC")
+      + "ORDER BY c.eventId ASC")
   List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 6076e54..1dabeda 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -46,9 +46,9 @@ public class SpringCommandRepository implements CommandRepository {
   }
 
   @Override
-  public Iterable<Command> saveCompensationCommands(String globalTxId) {
+  public void saveCompensationCommands(String globalTxId) {
     List<TxEvent> events = eventRepository
-        .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
+        .findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
 
     Map<String, Command> commands = new LinkedHashMap<>();
 
@@ -57,7 +57,12 @@ public class SpringCommandRepository implements CommandRepository {
     }
 
     log.info("Saving compensation commands {}", commands.values());
-    return commandRepository.save(commands.values());
+    try {
+      commandRepository.save(commands.values());
+    } catch (Exception e) {
+      log.warn("Failed to save some commands", e);
+    }
+    log.info("Saved compensation commands {}", commands.values());
   }
 
   @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 78c2a1d..e974527 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -48,7 +48,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+  List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
   @Query("SELECT t FROM TxEvent t "
       + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index e84a9c3..d1c36c2 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -11,4 +11,22 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   payloads bytea
 );
 
-CREATE INDEX IF NOT EXISTS running_sagas_index ON TxEvent (globalTxId, localTxId, type);
+CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, globalTxId, localTxId, type);
+
+
+CREATE TABLE IF NOT EXISTS Command (
+  surrogateId BIGSERIAL PRIMARY KEY,
+  eventId bigint NOT NULL UNIQUE,
+  serviceName varchar(16) NOT NULL,
+  instanceId varchar(36) NOT NULL,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  compensationMethod varchar(256) NOT NULL,
+  payloads bytea,
+  status varchar(12),
+  lastModified timestamp(6) NOT NULL DEFAULT CURRENT_DATE,
+  version bigint NOT NULL,
+);
+
+CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, globalTxId, localTxId, type, status);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 5cff57c..a5356bb 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -65,7 +65,7 @@ import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
-    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=100"})
+    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=1"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index fb396b3..71444ec 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -13,7 +13,8 @@ CREATE TABLE IF NOT EXISTS `TxEvent` (
 ) DEFAULT CHARSET=utf8;
 
 CREATE TABLE IF NOT EXISTS `Command` (
-  `surrogateId` bigint NOT NULL,
+  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
+  `eventId` bigint NOT NULL UNIQUE,
   `serviceName` varchar(36) NOT NULL,
   `instanceId` varchar(36) NOT NULL,
   `globalTxId` varchar(36) NOT NULL,

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.