You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:31 UTC
[incubator-servicecomb-saga] 05/09: SCB-218 updated command status
to pending when compensating
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 3241cb6a43f78b624e043edc73ac952b293a96a2
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 18 17:55:43 2018 +0800
SCB-218 updated command status to pending when compensating
Signed-off-by: seanyinx <se...@huawei.com>
---
.../java/org/apache/servicecomb/saga/alpha/core/Command.java | 4 ++--
.../apache/servicecomb/saga/alpha/core/CommandStatus.java | 1 +
.../saga/alpha/server/CommandEntityRepository.java | 3 ++-
.../saga/alpha/server/SpringCommandRepository.java | 11 ++++++++++-
.../servicecomb/saga/alpha/server/AlphaIntegrationTest.java | 12 +++++++++++-
5 files changed, 26 insertions(+), 5 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index b5902e1..904cc54 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -113,11 +113,11 @@ public class Command {
return instanceId;
}
- String globalTxId() {
+ public String globalTxId() {
return globalTxId;
}
- String localTxId() {
+ public String localTxId() {
return localTxId;
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
index cdf1f6c..0c9b78b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
@@ -19,5 +19,6 @@ package org.apache.servicecomb.saga.alpha.core;
public enum CommandStatus {
NEW,
+ PENDING,
DONE
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index fffea56..d7c583e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -45,9 +45,10 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status);
+ // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
@Query("SELECT c FROM Command c "
+ "WHERE c.surrogateId IN ("
- + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status <> 'DONE' GROUP BY c1.globalTxId"
+ + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
+ ") "
+ "ORDER BY c.surrogateId ASC")
List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 18ee9ad..34b43a4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server;
import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.util.LinkedHashMap;
@@ -82,7 +83,15 @@ public class SpringCommandRepository implements CommandRepository {
@Override
public List<Command> findFirstCommandToCompensate() {
- return commandRepository
+ List<Command> commands = commandRepository
.findFirstGroupByGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST);
+
+ commands.forEach(command ->
+ commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
+ PENDING.name(),
+ command.globalTxId(),
+ command.localTxId()));
+
+ return commands;
}
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 1514869..1bfbe3c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -34,6 +34,8 @@ import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -55,6 +57,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
import org.springframework.test.context.junit4.SpringRunner;
import com.google.protobuf.ByteString;
@@ -65,7 +69,7 @@ import io.grpc.stub.StreamObserver;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
- properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=300"})
+ properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1"})
public class AlphaIntegrationTest {
private static final int port = 8090;
@@ -413,4 +417,10 @@ public class AlphaIntegrationTest {
return completed;
}
}
+
+ @Primary
+ @Bean
+ ScheduledExecutorService scheduler() {
+ return Executors.newScheduledThreadPool(2);
+ }
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.