You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/23 00:49:34 UTC

[incubator-servicecomb-saga] branch master updated: SCB-270 in order compensation (#126)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git


The following commit(s) were added to refs/heads/master by this push:
     new f91946d  SCB-270 in order compensation (#126)
f91946d is described below

commit f91946d51ecc8deb86e941b1bbd5d5151c8ea208
Author: seanyinx <yi...@hotmail.com>
AuthorDate: Tue Jan 23 08:49:32 2018 +0800

    SCB-270 in order compensation (#126)
    
    
    * SCB-270 avoided compensating tx which has pending compensations to ensure order
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../apache/servicecomb/saga/alpha/server/AlphaConfig.java  |  9 ++++++++-
 .../saga/alpha/server/CommandEntityRepository.java         | 14 ++++++++------
 .../saga/alpha/server/SpringCommandRepository.java         |  2 +-
 .../saga/alpha/server/AlphaIntegrationTest.java            |  4 ++--
 4 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 35352f4..a431437 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
@@ -43,6 +44,7 @@ import org.springframework.context.annotation.Configuration;
 @Configuration
 class AlphaConfig {
   private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
+  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
@@ -69,7 +71,7 @@ class AlphaConfig {
 
   @Bean
   ScheduledExecutorService compensationScheduler() {
-    return Executors.newScheduledThreadPool(1);
+    return scheduler;
   }
 
   @Bean
@@ -105,4 +107,9 @@ class AlphaConfig {
   void init() {
     new PendingTaskRunner(pendingCompensations, delay).run();
   }
+
+  @PreDestroy
+  void shutdown() {
+    scheduler.shutdownNow();
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 17df477..737fd11 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -23,7 +23,6 @@ import javax.persistence.LockModeType;
 import javax.transaction.Transactional;
 
 import org.apache.servicecomb.saga.alpha.core.Command;
-import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Lock;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
@@ -47,10 +46,13 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
 
   // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
   @Lock(LockModeType.OPTIMISTIC)
-  @Query("SELECT c FROM Command c "
+  @Query(value = "SELECT * FROM Command AS c "
       + "WHERE c.eventId IN ("
-      + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
-      + ") "
-      + "ORDER BY c.eventId ASC")
-  List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
+      + " SELECT MAX(c1.eventId) FROM Command AS c1 "
+      + " INNER JOIN Command AS c2 on c1.globalTxId = c2.globalTxId"
+      + " WHERE c1.status = 'NEW' "
+      + " GROUP BY c1.globalTxId "
+      + " HAVING MAX( CASE c2.status WHEN 'PENDING' THEN 1 ELSE 0 END ) = 0) "
+      + "ORDER BY c.eventId ASC LIMIT 1", nativeQuery = true)
+  List<Command> findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc();
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 8241d81..afbdaf5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -83,7 +83,7 @@ public class SpringCommandRepository implements CommandRepository {
   @Override
   public List<Command> findFirstCommandToCompensate() {
     List<Command> commands = commandRepository
-        .findFirstGroupByGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST);
+        .findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc();
 
     commands.forEach(command ->
         commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 4635425..5dddc1d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -25,7 +25,7 @@ import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -257,7 +257,7 @@ public class AlphaIntegrationTest {
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
 
-    assertThat(receivedCommands, containsInAnyOrder(
+    assertThat(receivedCommands, contains(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
             .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.