You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:30 UTC
[incubator-servicecomb-saga] 04/09: SCB-218 excluded duplicate
events during command persistence
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 54e300b97f86a879a39a820bc1c81d47f9fb507e
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 18 16:59:51 2018 +0800
SCB-218 excluded duplicate events during command persistence
Signed-off-by: seanyinx <se...@huawei.com>
---
.../servicecomb/saga/alpha/core/Command.java | 32 ++++++++++++++++++----
.../saga/alpha/core/TxConsistentService.java | 12 +++++++-
.../servicecomb/saga/alpha/core/TxEvent.java | 32 ++++++++++++++++++++--
.../servicecomb/saga/alpha/server/AlphaConfig.java | 11 +++++++-
.../saga/alpha/server/SpringCommandRepository.java | 4 +--
.../alpha/server/TxEventEnvelopeRepository.java | 4 +--
alpha/alpha-server/src/test/resources/schema.sql | 2 +-
7 files changed, 83 insertions(+), 14 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index b29988b..b5902e1 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -48,7 +48,8 @@ public class Command {
Command() {
}
- Command(String serviceName,
+ Command(long id,
+ String serviceName,
String instanceId,
String globalTxId,
String localTxId,
@@ -57,6 +58,7 @@ public class Command {
byte[] payloads,
String status) {
+ this.surrogateId = id;
this.serviceName = serviceName;
this.instanceId = instanceId;
this.globalTxId = globalTxId;
@@ -68,7 +70,8 @@ public class Command {
this.lastModified = new Date();
}
- Command(String serviceName,
+ Command(long id,
+ String serviceName,
String instanceId,
String globalTxId,
String localTxId,
@@ -76,11 +79,12 @@ public class Command {
String compensationMethod,
byte[] payloads) {
- this(serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
+ this(id, serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
}
Command(Command command, CommandStatus status) {
- this(command.serviceName,
+ this(command.surrogateId,
+ command.serviceName,
command.instanceId,
command.globalTxId,
command.localTxId,
@@ -91,7 +95,8 @@ public class Command {
}
public Command(TxEvent event) {
- this(event.serviceName(),
+ this(event.id(),
+ event.serviceName(),
event.instanceId(),
event.globalTxId(),
event.localTxId(),
@@ -131,4 +136,21 @@ public class Command {
String status() {
return status;
}
+
+ long id() {
+ return surrogateId;
+ }
+
+ @Override
+ public String toString() {
+ return "Command{" +
+ "surrogateId=" + surrogateId +
+ ", serviceName='" + serviceName + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", globalTxId='" + globalTxId + '\'' +
+ ", localTxId='" + localTxId + '\'' +
+ ", parentTxId='" + parentTxId + '\'' +
+ ", compensationMethod='" + compensationMethod + '\'' +
+ '}';
+ }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 8f88735..61beea5 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -52,15 +52,25 @@ public class TxConsistentService {
}};
private final ExecutorService executor = Executors.newSingleThreadExecutor();
- private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService scheduler;
public TxConsistentService(TxEventRepository eventRepository,
CommandRepository commandRepository,
OmegaCallback omegaCallback,
int commandPollingInterval) {
+
+ this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
+ }
+
+ public TxConsistentService(TxEventRepository eventRepository,
+ CommandRepository commandRepository,
+ OmegaCallback omegaCallback,
+ int commandPollingInterval,
+ ScheduledExecutorService scheduler) {
this.eventRepository = eventRepository;
this.commandRepository = commandRepository;
this.omegaCallback = omegaCallback;
+ this.scheduler = scheduler;
scheduleCompensationCommandPolling(commandPollingInterval);
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 0ff2299..b654689 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -65,7 +65,34 @@ public class TxEvent {
String type,
String compensationMethod,
byte[] payloads) {
- this.surrogateId = -1L;
+ this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+ }
+
+ public TxEvent(
+ long id,
+ String serviceName,
+ String instanceId,
+ String globalTxId,
+ String localTxId,
+ String parentTxId,
+ String type,
+ String compensationMethod,
+ byte[] payloads) {
+ this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+ }
+
+ TxEvent(Long surrogateId,
+ String serviceName,
+ String instanceId,
+ Date creationTime,
+ String globalTxId,
+ String localTxId,
+ String parentTxId,
+ String type,
+ String compensationMethod,
+ byte[] payloads) {
+
+ this.surrogateId = surrogateId;
this.serviceName = serviceName;
this.instanceId = instanceId;
this.creationTime = creationTime;
@@ -120,7 +147,8 @@ public class TxEvent {
@Override
public String toString() {
return "TxEvent{" +
- "serviceName='" + serviceName + '\'' +
+ "surrogateId=" + surrogateId +
+ ", serviceName='" + serviceName + '\'' +
", instanceId='" + instanceId + '\'' +
", creationTime=" + creationTime +
", globalTxId='" + globalTxId + '\'' +
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 9c34738..3740581 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -20,7 +20,9 @@ package org.apache.servicecomb.saga.alpha.server;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.PostConstruct;
@@ -65,8 +67,14 @@ class AlphaConfig {
}
@Bean
+ ScheduledExecutorService compensationScheduler() {
+ return Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @Bean
TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
@Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
+ ScheduledExecutorService scheduler,
TxEventRepository eventRepository,
CommandRepository commandRepository,
OmegaCallback omegaCallback,
@@ -76,7 +84,8 @@ class AlphaConfig {
eventRepository,
commandRepository,
omegaCallback,
- commandPollingInterval);
+ commandPollingInterval,
+ scheduler);
ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index a335849..18ee9ad 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -21,7 +21,7 @@ import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -61,7 +61,7 @@ public class SpringCommandRepository implements CommandRepository {
List<TxEvent> events = eventRepository
.findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
- Map<String, Command> commands = new HashMap<>();
+ Map<String, Command> commands = new LinkedHashMap<>();
for (TxEvent event : events) {
commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 71d5f1b..09aec6f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -54,8 +54,8 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
)
List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
- @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
- + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
+ @Query("SELECT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+ + "t.surrogateId, t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
+ ") FROM TxEvent t "
+ "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
+ " SELECT t1.globalTxId"
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index c6d66de..fb396b3 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS `TxEvent` (
) DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `Command` (
- `surrogateId` bigint NOT NULL AUTO_INCREMENT,
+ `surrogateId` bigint NOT NULL,
`serviceName` varchar(36) NOT NULL,
`instanceId` varchar(36) NOT NULL,
`globalTxId` varchar(36) NOT NULL,
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.