You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/23 00:49:34 UTC
[incubator-servicecomb-saga] branch master updated: SCB-270 in
order compensation (#126)
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/master by this push:
new f91946d SCB-270 in order compensation (#126)
f91946d is described below
commit f91946d51ecc8deb86e941b1bbd5d5151c8ea208
Author: seanyinx <yi...@hotmail.com>
AuthorDate: Tue Jan 23 08:49:32 2018 +0800
SCB-270 in order compensation (#126)
* SCB-270 avoided compensating tx which has pending compensations to ensure order
Signed-off-by: seanyinx <se...@huawei.com>
---
.../apache/servicecomb/saga/alpha/server/AlphaConfig.java | 9 ++++++++-
.../saga/alpha/server/CommandEntityRepository.java | 14 ++++++++------
.../saga/alpha/server/SpringCommandRepository.java | 2 +-
.../saga/alpha/server/AlphaIntegrationTest.java | 4 ++--
4 files changed, 19 insertions(+), 10 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 35352f4..a431437 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import org.apache.servicecomb.saga.alpha.core.CommandRepository;
import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
@@ -43,6 +44,7 @@ import org.springframework.context.annotation.Configuration;
@Configuration
class AlphaConfig {
private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Value("${alpha.compensation.retry.delay:3000}")
private int delay;
@@ -69,7 +71,7 @@ class AlphaConfig {
@Bean
ScheduledExecutorService compensationScheduler() {
- return Executors.newScheduledThreadPool(1);
+ return scheduler;
}
@Bean
@@ -105,4 +107,9 @@ class AlphaConfig {
void init() {
new PendingTaskRunner(pendingCompensations, delay).run();
}
+
+ @PreDestroy
+ void shutdown() {
+ scheduler.shutdownNow();
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 17df477..737fd11 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -23,7 +23,6 @@ import javax.persistence.LockModeType;
import javax.transaction.Transactional;
import org.apache.servicecomb.saga.alpha.core.Command;
-import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
@@ -47,10 +46,13 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
// TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
@Lock(LockModeType.OPTIMISTIC)
- @Query("SELECT c FROM Command c "
+ @Query(value = "SELECT * FROM Command AS c "
+ "WHERE c.eventId IN ("
- + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
- + ") "
- + "ORDER BY c.eventId ASC")
- List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
+ + " SELECT MAX(c1.eventId) FROM Command AS c1 "
+ + " INNER JOIN Command AS c2 on c1.globalTxId = c2.globalTxId"
+ + " WHERE c1.status = 'NEW' "
+ + " GROUP BY c1.globalTxId "
+ + " HAVING MAX( CASE c2.status WHEN 'PENDING' THEN 1 ELSE 0 END ) = 0) "
+ + "ORDER BY c.eventId ASC LIMIT 1", nativeQuery = true)
+ List<Command> findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc();
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 8241d81..afbdaf5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -83,7 +83,7 @@ public class SpringCommandRepository implements CommandRepository {
@Override
public List<Command> findFirstCommandToCompensate() {
List<Command> commands = commandRepository
- .findFirstGroupByGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST);
+ .findFirstGroupByGlobalTxIdWithoutPendingOrderByIdDesc();
commands.forEach(command ->
commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 4635425..5dddc1d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -25,7 +25,7 @@ import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -257,7 +257,7 @@ public class AlphaIntegrationTest {
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
- assertThat(receivedCommands, containsInAnyOrder(
+ assertThat(receivedCommands, contains(
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
.setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
--
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.