You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:33 UTC
[incubator-servicecomb-saga] 07/09: SCB-218 added event id to check
for command uniqueness
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 02553d55a0b0a1938d0f3a5efa9712639af3f8d9
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 11:06:16 2018 +0800
SCB-218 added event id to check for command uniqueness
Signed-off-by: seanyinx <se...@huawei.com>
---
.../servicecomb/saga/alpha/core/Command.java | 24 ++++++++--------------
.../saga/alpha/core/CommandRepository.java | 2 +-
.../servicecomb/saga/alpha/core/EventScanner.java | 23 ++++++++++++---------
.../saga/alpha/server/CommandEntityRepository.java | 10 ++++-----
.../saga/alpha/server/SpringCommandRepository.java | 11 +++++++---
.../alpha/server/TxEventEnvelopeRepository.java | 2 +-
.../src/main/resources/schema-postgresql.sql | 20 +++++++++++++++++-
.../saga/alpha/server/AlphaIntegrationTest.java | 2 +-
alpha/alpha-server/src/test/resources/schema.sql | 3 ++-
9 files changed, 57 insertions(+), 40 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 904cc54..2716abf 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -22,6 +22,8 @@ import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
import java.util.Date;
import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Version;
@@ -29,8 +31,10 @@ import javax.persistence.Version;
public class Command {
@Id
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long surrogateId;
+ private long eventId;
private String serviceName;
private String instanceId;
private String globalTxId;
@@ -48,7 +52,7 @@ public class Command {
Command() {
}
- Command(long id,
+ private Command(long id,
String serviceName,
String instanceId,
String globalTxId,
@@ -58,7 +62,7 @@ public class Command {
byte[] payloads,
String status) {
- this.surrogateId = id;
+ this.eventId = id;
this.serviceName = serviceName;
this.instanceId = instanceId;
this.globalTxId = globalTxId;
@@ -70,7 +74,7 @@ public class Command {
this.lastModified = new Date();
}
- Command(long id,
+ private Command(long id,
String serviceName,
String instanceId,
String globalTxId,
@@ -82,18 +86,6 @@ public class Command {
this(id, serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
}
- Command(Command command, CommandStatus status) {
- this(command.surrogateId,
- command.serviceName,
- command.instanceId,
- command.globalTxId,
- command.localTxId,
- command.parentTxId,
- command.compensationMethod,
- command.payloads,
- status.name());
- }
-
public Command(TxEvent event) {
this(event.id(),
event.serviceName(),
@@ -144,7 +136,7 @@ public class Command {
@Override
public String toString() {
return "Command{" +
- "surrogateId=" + surrogateId +
+ "eventId=" + eventId +
", serviceName='" + serviceName + '\'' +
", instanceId='" + instanceId + '\'' +
", globalTxId='" + globalTxId + '\'' +
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 1da033f..2bbea77 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
public interface CommandRepository {
- Iterable<Command> saveCompensationCommands(String globalTxId);
+ void saveCompensationCommands(String globalTxId);
void markCommandAsDone(String globalTxId, String localTxId);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index ea4aaf6..5a4589d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -81,8 +81,7 @@ public class EventScanner implements Runnable {
.forEach(event -> {
log.info("Found uncompensated event {}", event);
nextEndedEventId = event.id();
- commandRepository.saveCompensationCommands(event.globalTxId())
- .forEach(command -> nextEndedEventId = command.id());
+ commandRepository.saveCompensationCommands(event.globalTxId());
});
}
@@ -130,19 +129,23 @@ public class EventScanner implements Runnable {
private void pollCompensationCommand(int commandPollingInterval) {
scheduler.scheduleWithFixedDelay(
- () -> commandRepository.findFirstCommandToCompensate()
- .forEach(command -> {
- log.info("Compensating transaction with globalTxId {} and localTxId {}",
- command.globalTxId(),
- command.localTxId());
-
- omegaCallback.compensate(txStartedEventOf(command));
- }),
+ this::compensate,
0,
commandPollingInterval,
MILLISECONDS);
}
+ private void compensate() {
+ commandRepository.findFirstCommandToCompensate()
+ .forEach(command -> {
+ log.info("Compensating transaction with globalTxId {} and localTxId {}",
+ command.globalTxId(),
+ command.localTxId());
+
+ omegaCallback.compensate(txStartedEventOf(command));
+ });
+ }
+
private TxEvent txStartedEventOf(Command command) {
return new TxEvent(
command.serviceName(),
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index d7c583e..4b8c3ee 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -18,7 +18,6 @@
package org.apache.servicecomb.saga.alpha.server;
import java.util.List;
-import java.util.Optional;
import javax.transaction.Transactional;
@@ -30,10 +29,9 @@ import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
public interface CommandEntityRepository extends CrudRepository<Command, Long> {
- Optional<Command> findByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId);
@Transactional
- @Modifying
+ @Modifying(clearAutomatically = true)
@Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
+ "SET c.status = :status "
+ "WHERE c.globalTxId = :globalTxId "
@@ -47,9 +45,9 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
// TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
@Query("SELECT c FROM Command c "
- + "WHERE c.surrogateId IN ("
- + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
+ + "WHERE c.eventId IN ("
+ + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
+ ") "
- + "ORDER BY c.surrogateId ASC")
+ + "ORDER BY c.eventId ASC")
List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 6076e54..1dabeda 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -46,9 +46,9 @@ public class SpringCommandRepository implements CommandRepository {
}
@Override
- public Iterable<Command> saveCompensationCommands(String globalTxId) {
+ public void saveCompensationCommands(String globalTxId) {
List<TxEvent> events = eventRepository
- .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
+ .findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
Map<String, Command> commands = new LinkedHashMap<>();
@@ -57,7 +57,12 @@ public class SpringCommandRepository implements CommandRepository {
}
log.info("Saving compensation commands {}", commands.values());
- return commandRepository.save(commands.values());
+ try {
+ commandRepository.save(commands.values());
+ } catch (Exception e) {
+ log.warn("Failed to save some commands", e);
+ }
+ log.info("Saved compensation commands {}", commands.values());
}
@Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 78c2a1d..e974527 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -48,7 +48,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " AND t2.localTxId = t.localTxId "
+ " AND t2.type = 'TxCompensatedEvent') "
+ "ORDER BY t.surrogateId ASC")
- List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+ List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
@Query("SELECT t FROM TxEvent t "
+ "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index e84a9c3..d1c36c2 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -11,4 +11,22 @@ CREATE TABLE IF NOT EXISTS TxEvent (
payloads bytea
);
-CREATE INDEX IF NOT EXISTS running_sagas_index ON TxEvent (globalTxId, localTxId, type);
+CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, globalTxId, localTxId, type);
+
+
+CREATE TABLE IF NOT EXISTS Command (
+ surrogateId BIGSERIAL PRIMARY KEY,
+ eventId bigint NOT NULL UNIQUE,
+ serviceName varchar(16) NOT NULL,
+ instanceId varchar(36) NOT NULL,
+ globalTxId varchar(36) NOT NULL,
+ localTxId varchar(36) NOT NULL,
+ parentTxId varchar(36) DEFAULT NULL,
+ compensationMethod varchar(256) NOT NULL,
+ payloads bytea,
+ status varchar(12),
+ lastModified timestamp(6) NOT NULL DEFAULT CURRENT_DATE,
+ version bigint NOT NULL,
+);
+
+CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, globalTxId, localTxId, type, status);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 5cff57c..a5356bb 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -65,7 +65,7 @@ import io.grpc.stub.StreamObserver;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
- properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=100"})
+ properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=1"})
public class AlphaIntegrationTest {
private static final int port = 8090;
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index fb396b3..71444ec 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -13,7 +13,8 @@ CREATE TABLE IF NOT EXISTS `TxEvent` (
) DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `Command` (
- `surrogateId` bigint NOT NULL,
+ `surrogateId` bigint NOT NULL AUTO_INCREMENT,
+ `eventId` bigint NOT NULL UNIQUE,
`serviceName` varchar(36) NOT NULL,
`instanceId` varchar(36) NOT NULL,
`globalTxId` varchar(36) NOT NULL,
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.