You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/20 12:00:20 UTC
[incubator-servicecomb-saga] 08/12: SCB-218 locked commands to
avoid duplicate compensation callback
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit f075bd5f27c76a073e253095a99a3b0b73153cfc
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 16:35:03 2018 +0800
SCB-218 locked commands to avoid duplicate compensation callback
Signed-off-by: seanyinx <se...@huawei.com>
---
.../servicecomb/saga/alpha/core/Command.java | 2 +-
.../servicecomb/saga/alpha/core/EventScanner.java | 28 +++++------
.../saga/alpha/core/TxEventRepository.java | 2 +
.../saga/alpha/core/TxConsistentServiceTest.java | 4 ++
alpha/alpha-server/pom.xml | 4 +-
.../servicecomb/saga/alpha/server/AlphaConfig.java | 2 -
.../saga/alpha/server/CommandEntityRepository.java | 3 ++
.../saga/alpha/server/SpringCommandRepository.java | 17 ++++---
.../saga/alpha/server/SpringTxEventRepository.java | 5 ++
.../alpha/server/TxEventEnvelopeRepository.java | 17 ++++++-
.../saga/alpha/server/AlphaIntegrationTest.java | 36 +++++++++++++-
alpha/alpha-server/src/test/resources/schema.sql | 55 +++++++++++-----------
12 files changed, 119 insertions(+), 56 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 2716abf..49c1756 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -47,7 +47,7 @@ public class Command {
private Date lastModified;
@Version
- private int version;
+ private long version;
Command() {
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 5a4589d..f9fa3be 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -38,7 +38,6 @@ public class EventScanner implements Runnable {
private final TxEventRepository eventRepository;
private final CommandRepository commandRepository;
private final OmegaCallback omegaCallback;
- private final int commandPollingInterval;
private final int eventPollingInterval;
private long nextEndedEventId;
@@ -48,20 +47,17 @@ public class EventScanner implements Runnable {
TxEventRepository eventRepository,
CommandRepository commandRepository,
OmegaCallback omegaCallback,
- int commandPollingInterval,
int eventPollingInterval) {
this.scheduler = scheduler;
this.eventRepository = eventRepository;
this.commandRepository = commandRepository;
this.omegaCallback = omegaCallback;
- this.commandPollingInterval = commandPollingInterval;
this.eventPollingInterval = eventPollingInterval;
}
@Override
public void run() {
- pollCompensationCommand(commandPollingInterval);
pollEvents();
}
@@ -69,7 +65,9 @@ public class EventScanner implements Runnable {
scheduler.scheduleWithFixedDelay(
() -> {
saveUncompensatedEventsToCommands();
+ compensate();
updateCompensatedCommands();
+ deleteDuplicateSagaEndedEvents();
},
0,
eventPollingInterval,
@@ -94,6 +92,14 @@ public class EventScanner implements Runnable {
});
}
+ private void deleteDuplicateSagaEndedEvents() {
+ try {
+ eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
+ } catch (Exception e) {
+ log.warn("Failed to delete duplicate SagaEndedEvent", e);
+ }
+ }
+
// TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
// unless we ask user to specify a name for each participant in the global TX in @Compensable
private void updateCompensationStatus(TxEvent event) {
@@ -102,9 +108,11 @@ public class EventScanner implements Runnable {
event.globalTxId(),
event.localTxId());
- if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
- && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+ markSagaEnded(event);
+ }
+ private void markSagaEnded(TxEvent event) {
+ if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
markGlobalTxEnd(event);
}
}
@@ -127,14 +135,6 @@ public class EventScanner implements Runnable {
EMPTY_PAYLOAD);
}
- private void pollCompensationCommand(int commandPollingInterval) {
- scheduler.scheduleWithFixedDelay(
- this::compensate,
- 0,
- commandPollingInterval,
- MILLISECONDS);
- }
-
private void compensate() {
commandRepository.findFirstCommandToCompensate()
.forEach(command -> {
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index d793de2..b61aa06 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -28,4 +28,6 @@ public interface TxEventRepository {
List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type);
+
+ void deleteDuplicateEvents(String type);
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 473501e..231d5bf 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -64,6 +64,10 @@ public class TxConsistentServiceTest {
public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
return Optional.empty();
}
+
+ @Override
+ public void deleteDuplicateEvents(String type) {
+ }
};
private final String globalTxId = UUID.randomUUID().toString();
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index ae894b8..a05177a 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -82,8 +82,8 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.h2database</groupId>
- <artifactId>h2</artifactId>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 73298be..769ee5a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -74,7 +74,6 @@ class AlphaConfig {
@Bean
TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
- @Value("${alpha.command.pollingInterval:500}") int commandPollingInterval,
@Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
ScheduledExecutorService scheduler,
TxEventRepository eventRepository,
@@ -86,7 +85,6 @@ class AlphaConfig {
eventRepository,
commandRepository,
omegaCallback,
- commandPollingInterval,
eventPollingInterval).run();
TxConsistentService consistentService = new TxConsistentService(eventRepository);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 4b8c3ee..17df477 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -19,10 +19,12 @@ package org.apache.servicecomb.saga.alpha.server;
import java.util.List;
+import javax.persistence.LockModeType;
import javax.transaction.Transactional;
import org.apache.servicecomb.saga.alpha.core.Command;
import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
@@ -44,6 +46,7 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status);
// TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
+ @Lock(LockModeType.OPTIMISTIC)
@Query("SELECT c FROM Command c "
+ "WHERE c.eventId IN ("
+ " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 1dabeda..8241d81 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import javax.transaction.Transactional;
+
import org.apache.servicecomb.saga.alpha.core.Command;
import org.apache.servicecomb.saga.alpha.core.CommandRepository;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -56,13 +58,15 @@ public class SpringCommandRepository implements CommandRepository {
commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
}
- log.info("Saving compensation commands {}", commands.values());
- try {
- commandRepository.save(commands.values());
- } catch (Exception e) {
- log.warn("Failed to save some commands", e);
+ for (Command command : commands.values()) {
+ log.info("Saving compensation command {}", command);
+ try {
+ commandRepository.save(command);
+ } catch (Exception e) {
+ log.warn("Failed to save some command {}", command);
+ }
+ log.info("Saved compensation command {}", command);
}
- log.info("Saved compensation commands {}", commands.values());
}
@Override
@@ -75,6 +79,7 @@ public class SpringCommandRepository implements CommandRepository {
return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name());
}
+ @Transactional
@Override
public List<Command> findFirstCommandToCompensate() {
List<Command> commands = commandRepository
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 4108aa5..ad32148 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -50,4 +50,9 @@ class SpringTxEventRepository implements TxEventRepository {
public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id);
}
+
+ @Override
+ public void deleteDuplicateEvents(String type) {
+ eventRepo.deleteByType(type);
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index e974527..2e52fef 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -20,8 +20,11 @@ package org.apache.servicecomb.saga.alpha.server;
import java.util.List;
import java.util.Optional;
+import javax.transaction.Transactional;
+
import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
@@ -51,7 +54,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
@Query("SELECT t FROM TxEvent t "
- + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
+ + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
+ " SELECT t1.globalTxId"
+ " FROM TxEvent t1 "
+ " WHERE t1.globalTxId = t.globalTxId "
@@ -59,11 +62,21 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ ") AND NOT EXISTS ( "
+ " SELECT t2.globalTxId"
+ " FROM TxEvent t2 "
- + " WHERE t2.globalTxId = ?1 "
+ + " WHERE t2.globalTxId = t.globalTxId "
+ " AND t2.localTxId = t.localTxId "
+ " AND t2.type = 'TxCompensatedEvent') "
+ "ORDER BY t.surrogateId ASC")
List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
+
+ @Transactional
+ @Modifying(clearAutomatically = true)
+ @Query("DELETE FROM TxEvent t "
+ + "WHERE t.type = ?1 AND t.surrogateId NOT IN ("
+ + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 "
+ + " WHERE t1.type = ?1"
+ + " GROUP BY t1.globalTxId"
+ + ")")
+ void deleteByType(String type);
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a5356bb..a928443 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -36,9 +36,13 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
+import javax.annotation.PostConstruct;
+
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -65,7 +69,7 @@ import io.grpc.stub.StreamObserver;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
- properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=1"})
+ properties = {"alpha.server.port=8090", "alpha.event.pollingInterval=1"})
public class AlphaIntegrationTest {
private static final int port = 8090;
@@ -93,6 +97,18 @@ public class AlphaIntegrationTest {
private TxEventEnvelopeRepository eventRepo;
@Autowired
+ private TxEventRepository eventRepository;
+
+ @Autowired
+ private CommandRepository commandRepository;
+
+ @Autowired
+ private CommandEntityRepository commandEntityRepository;
+
+ @Autowired
+ private OmegaCallback omegaCallback;
+
+ @Autowired
private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
@Autowired
@@ -115,6 +131,19 @@ public class AlphaIntegrationTest {
@After
public void after() throws Exception {
blockingStub.onDisconnected(serviceConfig);
+ deleteAllTillSuccessful();
+ }
+
+ public void deleteAllTillSuccessful() {
+ boolean deleted = false;
+ do {
+ try {
+ eventRepo.deleteAll();
+ commandEntityRepository.deleteAll();
+ deleted = true;
+ } catch (Exception ignored) {
+ }
+ } while (!deleted);
}
@Test
@@ -413,4 +442,9 @@ public class AlphaIntegrationTest {
return completed;
}
}
+
+ @PostConstruct
+ void init() {
+// new EventScanner(Executors.newScheduledThreadPool(2), eventRepository, commandRepository, omegaCallback, 1, 1).run();
+ }
}
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index 71444ec..344fdda 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -1,29 +1,28 @@
-CREATE TABLE IF NOT EXISTS `TxEvent` (
- `surrogateId` bigint NOT NULL AUTO_INCREMENT,
- `serviceName` varchar(36) NOT NULL,
- `instanceId` varchar(36) NOT NULL,
- `creationTime` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
- `globalTxId` varchar(36) NOT NULL,
- `localTxId` varchar(36) NOT NULL,
- `parentTxId` varchar(36) DEFAULT NULL,
- `type` varchar(50) NOT NULL,
- `compensationMethod` varchar(256) NOT NULL,
- `payloads` varbinary(10240),
- PRIMARY KEY (`surrogateId`)
-) DEFAULT CHARSET=utf8;
+CREATE TABLE IF NOT EXISTS TxEvent (
+ surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
+ serviceName varchar(36) NOT NULL,
+ instanceId varchar(36) NOT NULL,
+ creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ globalTxId varchar(36) NOT NULL,
+ localTxId varchar(36) NOT NULL,
+ parentTxId varchar(36) DEFAULT NULL,
+ type varchar(50) NOT NULL,
+ compensationMethod varchar(256) NOT NULL,
+ payloads varbinary(10240),
+-- version bigint NOT NULL
+);
-CREATE TABLE IF NOT EXISTS `Command` (
- `surrogateId` bigint NOT NULL AUTO_INCREMENT,
- `eventId` bigint NOT NULL UNIQUE,
- `serviceName` varchar(36) NOT NULL,
- `instanceId` varchar(36) NOT NULL,
- `globalTxId` varchar(36) NOT NULL,
- `localTxId` varchar(36) NOT NULL,
- `parentTxId` varchar(36) DEFAULT NULL,
- `compensationMethod` varchar(256) NOT NULL,
- `payloads` varbinary(10240),
- `status` varchar(12),
- `lastModified` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
- `version` bigint NOT NULL,
- PRIMARY KEY (`surrogateId`)
-) DEFAULT CHARSET=utf8;
+CREATE TABLE IF NOT EXISTS Command (
+ surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
+ eventId bigint NOT NULL UNIQUE,
+ serviceName varchar(36) NOT NULL,
+ instanceId varchar(36) NOT NULL,
+ globalTxId varchar(36) NOT NULL,
+ localTxId varchar(36) NOT NULL,
+ parentTxId varchar(36) DEFAULT NULL,
+ compensationMethod varchar(256) NOT NULL,
+ payloads varbinary(10240),
+ status varchar(12),
+ lastModified TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ version bigint NOT NULL
+);
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.