You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/20 12:00:20 UTC

[incubator-servicecomb-saga] 08/12: SCB-218 locked commands to avoid duplicate compensation callback

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit f075bd5f27c76a073e253095a99a3b0b73153cfc
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 16:35:03 2018 +0800

    SCB-218 locked commands to avoid duplicate compensation callback
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/Command.java       |  2 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  | 28 +++++------
 .../saga/alpha/core/TxEventRepository.java         |  2 +
 .../saga/alpha/core/TxConsistentServiceTest.java   |  4 ++
 alpha/alpha-server/pom.xml                         |  4 +-
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  2 -
 .../saga/alpha/server/CommandEntityRepository.java |  3 ++
 .../saga/alpha/server/SpringCommandRepository.java | 17 ++++---
 .../saga/alpha/server/SpringTxEventRepository.java |  5 ++
 .../alpha/server/TxEventEnvelopeRepository.java    | 17 ++++++-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 36 +++++++++++++-
 alpha/alpha-server/src/test/resources/schema.sql   | 55 +++++++++++-----------
 12 files changed, 119 insertions(+), 56 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 2716abf..49c1756 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -47,7 +47,7 @@ public class Command {
   private Date lastModified;
 
   @Version
-  private int version;
+  private long version;
 
   Command() {
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 5a4589d..f9fa3be 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -38,7 +38,6 @@ public class EventScanner implements Runnable {
   private final TxEventRepository eventRepository;
   private final CommandRepository commandRepository;
   private final OmegaCallback omegaCallback;
-  private final int commandPollingInterval;
   private final int eventPollingInterval;
 
   private long nextEndedEventId;
@@ -48,20 +47,17 @@ public class EventScanner implements Runnable {
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
-      int commandPollingInterval,
       int eventPollingInterval) {
 
     this.scheduler = scheduler;
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
-    this.commandPollingInterval = commandPollingInterval;
     this.eventPollingInterval = eventPollingInterval;
   }
 
   @Override
   public void run() {
-    pollCompensationCommand(commandPollingInterval);
     pollEvents();
   }
 
@@ -69,7 +65,9 @@ public class EventScanner implements Runnable {
     scheduler.scheduleWithFixedDelay(
         () -> {
           saveUncompensatedEventsToCommands();
+          compensate();
           updateCompensatedCommands();
+          deleteDuplicateSagaEndedEvents();
         },
         0,
         eventPollingInterval,
@@ -94,6 +92,14 @@ public class EventScanner implements Runnable {
         });
   }
 
+  private void deleteDuplicateSagaEndedEvents() {
+    try {
+      eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
+    } catch (Exception e) {
+      log.warn("Failed to delete duplicate SagaEndedEvent", e);
+    }
+  }
+
   // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
   // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensationStatus(TxEvent event) {
@@ -102,9 +108,11 @@ public class EventScanner implements Runnable {
         event.globalTxId(),
         event.localTxId());
 
-    if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
-        && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+    markSagaEnded(event);
+  }
 
+  private void markSagaEnded(TxEvent event) {
+    if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
       markGlobalTxEnd(event);
     }
   }
@@ -127,14 +135,6 @@ public class EventScanner implements Runnable {
         EMPTY_PAYLOAD);
   }
 
-  private void pollCompensationCommand(int commandPollingInterval) {
-    scheduler.scheduleWithFixedDelay(
-        this::compensate,
-        0,
-        commandPollingInterval,
-        MILLISECONDS);
-  }
-
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index d793de2..b61aa06 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -28,4 +28,6 @@ public interface TxEventRepository {
   List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
 
   Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type);
+
+  void deleteDuplicateEvents(String type);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 473501e..231d5bf 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -64,6 +64,10 @@ public class TxConsistentServiceTest {
     public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
       return Optional.empty();
     }
+
+    @Override
+    public void deleteDuplicateEvents(String type) {
+    }
   };
 
   private final String globalTxId = UUID.randomUUID().toString();
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index ae894b8..a05177a 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -82,8 +82,8 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.h2database</groupId>
-      <artifactId>h2</artifactId>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 73298be..769ee5a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -74,7 +74,6 @@ class AlphaConfig {
 
   @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
-      @Value("${alpha.command.pollingInterval:500}") int commandPollingInterval,
       @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
       ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
@@ -86,7 +85,6 @@ class AlphaConfig {
         eventRepository,
         commandRepository,
         omegaCallback,
-        commandPollingInterval,
         eventPollingInterval).run();
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 4b8c3ee..17df477 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -19,10 +19,12 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
 
+import javax.persistence.LockModeType;
 import javax.transaction.Transactional;
 
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Lock;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
@@ -44,6 +46,7 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
   List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status);
 
   // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
+  @Lock(LockModeType.OPTIMISTIC)
   @Query("SELECT c FROM Command c "
       + "WHERE c.eventId IN ("
       + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 1dabeda..8241d81 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.transaction.Transactional;
+
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -56,13 +58,15 @@ public class SpringCommandRepository implements CommandRepository {
       commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
     }
 
-    log.info("Saving compensation commands {}", commands.values());
-    try {
-      commandRepository.save(commands.values());
-    } catch (Exception e) {
-      log.warn("Failed to save some commands", e);
+    for (Command command : commands.values()) {
+      log.info("Saving compensation command {}", command);
+      try {
+        commandRepository.save(command);
+      } catch (Exception e) {
+        log.warn("Failed to save some command {}", command);
+      }
+      log.info("Saved compensation command {}", command);
     }
-    log.info("Saved compensation commands {}", commands.values());
   }
 
   @Override
@@ -75,6 +79,7 @@ public class SpringCommandRepository implements CommandRepository {
     return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name());
   }
 
+  @Transactional
   @Override
   public List<Command> findFirstCommandToCompensate() {
     List<Command> commands = commandRepository
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 4108aa5..ad32148 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -50,4 +50,9 @@ class SpringTxEventRepository implements TxEventRepository {
   public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
     return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id);
   }
+
+  @Override
+  public void deleteDuplicateEvents(String type) {
+    eventRepo.deleteByType(type);
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index e974527..2e52fef 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -20,8 +20,11 @@ package org.apache.servicecomb.saga.alpha.server;
 import java.util.List;
 import java.util.Optional;
 
+import javax.transaction.Transactional;
+
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
@@ -51,7 +54,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
   @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
+      + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
       + "  SELECT t1.globalTxId"
       + "  FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = t.globalTxId "
@@ -59,11 +62,21 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + ") AND NOT EXISTS ( "
       + "  SELECT t2.globalTxId"
       + "  FROM TxEvent t2 "
-      + "  WHERE t2.globalTxId = ?1 "
+      + "  WHERE t2.globalTxId = t.globalTxId "
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
   List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
 
   Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("DELETE FROM TxEvent t "
+      + "WHERE t.type = ?1 AND t.surrogateId NOT IN ("
+      + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 "
+      + " WHERE t1.type = ?1"
+      + " GROUP BY t1.globalTxId"
+      + ")")
+  void deleteByType(String type);
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a5356bb..a928443 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -36,9 +36,13 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Consumer;
 
+import javax.annotation.PostConstruct;
+
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -65,7 +69,7 @@ import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
-    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=1"})
+    properties = {"alpha.server.port=8090", "alpha.event.pollingInterval=1"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -93,6 +97,18 @@ public class AlphaIntegrationTest {
   private TxEventEnvelopeRepository eventRepo;
 
   @Autowired
+  private TxEventRepository eventRepository;
+
+  @Autowired
+  private CommandRepository commandRepository;
+
+  @Autowired
+  private CommandEntityRepository commandEntityRepository;
+
+  @Autowired
+  private OmegaCallback omegaCallback;
+
+  @Autowired
   private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
   @Autowired
@@ -115,6 +131,19 @@ public class AlphaIntegrationTest {
   @After
   public void after() throws Exception {
     blockingStub.onDisconnected(serviceConfig);
+    deleteAllTillSuccessful();
+  }
+
+  public void deleteAllTillSuccessful() {
+    boolean deleted = false;
+    do {
+      try {
+        eventRepo.deleteAll();
+        commandEntityRepository.deleteAll();
+        deleted = true;
+      } catch (Exception ignored) {
+      }
+    } while (!deleted);
   }
 
   @Test
@@ -413,4 +442,9 @@ public class AlphaIntegrationTest {
       return completed;
     }
   }
+
+  @PostConstruct
+  void init() {
+//    new EventScanner(Executors.newScheduledThreadPool(2), eventRepository, commandRepository, omegaCallback, 1, 1).run();
+  }
 }
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index 71444ec..344fdda 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -1,29 +1,28 @@
-CREATE TABLE IF NOT EXISTS `TxEvent` (
-  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
-  `serviceName` varchar(36) NOT NULL,
-  `instanceId` varchar(36) NOT NULL,
-  `creationTime` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
-  `globalTxId` varchar(36) NOT NULL,
-  `localTxId` varchar(36) NOT NULL,
-  `parentTxId` varchar(36) DEFAULT NULL,
-  `type` varchar(50) NOT NULL,
-  `compensationMethod` varchar(256) NOT NULL,
-  `payloads` varbinary(10240),
-  PRIMARY KEY (`surrogateId`)
-) DEFAULT CHARSET=utf8;
+CREATE TABLE IF NOT EXISTS TxEvent (
+  surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
+  serviceName varchar(36) NOT NULL,
+  instanceId varchar(36) NOT NULL,
+  creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  type varchar(50) NOT NULL,
+  compensationMethod varchar(256) NOT NULL,
+  payloads varbinary(10240),
+--  version bigint NOT NULL
+);
 
-CREATE TABLE IF NOT EXISTS `Command` (
-  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
-  `eventId` bigint NOT NULL UNIQUE,
-  `serviceName` varchar(36) NOT NULL,
-  `instanceId` varchar(36) NOT NULL,
-  `globalTxId` varchar(36) NOT NULL,
-  `localTxId` varchar(36) NOT NULL,
-  `parentTxId` varchar(36) DEFAULT NULL,
-  `compensationMethod` varchar(256) NOT NULL,
-  `payloads` varbinary(10240),
-  `status` varchar(12),
-  `lastModified` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
-  `version` bigint NOT NULL,
-  PRIMARY KEY (`surrogateId`)
-) DEFAULT CHARSET=utf8;
+CREATE TABLE IF NOT EXISTS Command (
+  surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
+  eventId bigint NOT NULL UNIQUE,
+  serviceName varchar(36) NOT NULL,
+  instanceId varchar(36) NOT NULL,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  compensationMethod varchar(256) NOT NULL,
+  payloads varbinary(10240),
+  status varchar(12),
+  lastModified TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  version bigint NOT NULL
+);

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.