You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/20 12:00:17 UTC

[incubator-servicecomb-saga] 05/12: SCB-218 updated command status to pending when compensating

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 7aafab876c21da6f50ea10aad5095d3b18bcdf66
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 18 17:55:43 2018 +0800

    SCB-218 updated command status to pending when compensating
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../java/org/apache/servicecomb/saga/alpha/core/Command.java |  4 ++--
 .../apache/servicecomb/saga/alpha/core/CommandStatus.java    |  1 +
 .../saga/alpha/server/CommandEntityRepository.java           |  3 ++-
 .../saga/alpha/server/SpringCommandRepository.java           | 11 ++++++++++-
 .../servicecomb/saga/alpha/server/AlphaIntegrationTest.java  | 12 +++++++++++-
 5 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index b5902e1..904cc54 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -113,11 +113,11 @@ public class Command {
     return instanceId;
   }
 
-  String globalTxId() {
+  public String globalTxId() {
     return globalTxId;
   }
 
-  String localTxId() {
+  public String localTxId() {
     return localTxId;
   }
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
index cdf1f6c..0c9b78b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
@@ -19,5 +19,6 @@ package org.apache.servicecomb.saga.alpha.core;
 
 public enum CommandStatus {
   NEW,
+  PENDING,
   DONE
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index fffea56..d7c583e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -45,9 +45,10 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
 
   List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status);
 
+  // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
   @Query("SELECT c FROM Command c "
       + "WHERE c.surrogateId IN ("
-      + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status <> 'DONE' GROUP BY c1.globalTxId"
+      + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
       + ") "
       + "ORDER BY c.surrogateId ASC")
   List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 18ee9ad..34b43a4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.util.LinkedHashMap;
@@ -82,7 +83,15 @@ public class SpringCommandRepository implements CommandRepository {
 
   @Override
   public List<Command> findFirstCommandToCompensate() {
-    return commandRepository
+    List<Command> commands = commandRepository
         .findFirstGroupByGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST);
+
+    commands.forEach(command ->
+        commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
+            PENDING.name(),
+            command.globalTxId(),
+            command.localTxId()));
+
+    return commands;
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 1514869..1bfbe3c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -34,6 +34,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -55,6 +57,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.google.protobuf.ByteString;
@@ -65,7 +69,7 @@ import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
-    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=300"})
+    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -413,4 +417,10 @@ public class AlphaIntegrationTest {
       return completed;
     }
   }
+
+  @Primary
+  @Bean
+  ScheduledExecutorService scheduler() {
+    return Executors.newScheduledThreadPool(2);
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.