You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:27 UTC
[incubator-servicecomb-saga] 01/09: SCB-218 replaced in memory
compensation store with persistent repo to make alpha stateless
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 705b27472da79e1fedd29b2e8bed2a024289514d
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 16 11:54:52 2018 +0800
SCB-218 replaced in memory compensation store with persistent repo to make alpha stateless
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/core/{TxEvent.java => Command.java} | 93 +++++++++-------------
.../saga/alpha/core/CommandRepository.java | 32 ++++++++
.../servicecomb/saga/alpha/core/CommandStatus.java | 23 ++++++
.../saga/alpha/core/TxConsistentService.java | 28 +++----
.../servicecomb/saga/alpha/core/TxEvent.java | 14 ++++
.../saga/alpha/core/TxConsistentServiceTest.java | 56 ++++++++++++-
.../servicecomb/saga/alpha/server/AlphaConfig.java | 9 ++-
.../saga/alpha/server/CommandEntity.java | 53 ++++++++++++
.../saga/alpha/server/CommandEntityRepository.java | 43 ++++++++++
.../saga/alpha/server/SpringCommandRepository.java | 78 ++++++++++++++++++
.../alpha/server/TxEventEnvelopeRepository.java | 13 +++
.../saga/omega/transaction/TransactionAspect.java | 1 +
12 files changed, 368 insertions(+), 75 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
similarity index 58%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 37a29f1..08f8527 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -17,98 +17,81 @@
package org.apache.servicecomb.saga.alpha.core;
-import java.util.Date;
-
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-
-@Entity
-public class TxEvent {
- @Id
- @GeneratedValue(strategy = GenerationType.IDENTITY)
- private Long surrogateId;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+public class Command {
private String serviceName;
private String instanceId;
- private Date creationTime;
private String globalTxId;
private String localTxId;
private String parentTxId;
- private String type;
private String compensationMethod;
private byte[] payloads;
+ private String status;
- private TxEvent() {
+ Command() {
}
- public TxEvent(
- String serviceName,
+ Command(String serviceName,
String instanceId,
String globalTxId,
String localTxId,
String parentTxId,
- String type,
String compensationMethod,
- byte[] payloads) {
- this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
- }
+ byte[] payloads,
+ String status) {
- public TxEvent(
- String serviceName,
- String instanceId,
- Date creationTime,
- String globalTxId,
- String localTxId,
- String parentTxId,
- String type,
- String compensationMethod,
- byte[] payloads) {
this.serviceName = serviceName;
this.instanceId = instanceId;
- this.creationTime = creationTime;
this.globalTxId = globalTxId;
this.localTxId = localTxId;
this.parentTxId = parentTxId;
- this.type = type;
this.compensationMethod = compensationMethod;
this.payloads = payloads;
+ this.status = status;
}
- public String serviceName() {
- return serviceName;
+ Command(String serviceName,
+ String instanceId,
+ String globalTxId,
+ String localTxId,
+ String parentTxId,
+ String compensationMethod,
+ byte[] payloads) {
+
+ this(serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
}
- public String instanceId() {
- return instanceId;
+ Command(Command command, CommandStatus status) {
+ this(command.serviceName,
+ command.instanceId,
+ command.globalTxId,
+ command.localTxId,
+ command.parentTxId,
+ command.compensationMethod,
+ command.payloads,
+ status.name());
}
- public Date creationTime() {
- return creationTime;
+ public Command(TxEvent event) {
+ this(event.serviceName(),
+ event.instanceId(),
+ event.globalTxId(),
+ event.localTxId(),
+ event.parentTxId(),
+ event.compensationMethod(),
+ event.payloads());
}
- public String globalTxId() {
+ String globalTxId() {
return globalTxId;
}
- public String localTxId() {
+ String localTxId() {
return localTxId;
}
- public String parentTxId() {
- return parentTxId;
- }
-
- public String type() {
- return type;
- }
-
- public String compensationMethod() {
- return compensationMethod;
- }
-
- public byte[] payloads() {
- return payloads;
+ String status() {
+ return status;
}
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
new file mode 100644
index 0000000..915d476
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+import java.util.List;
+
+public interface CommandRepository {
+ boolean exists(String globalTxId, String localTxId);
+
+ void saveCompensationCommand(String globalTxId, String localTxId);
+
+ void saveCompensationCommands(String globalTxId);
+
+ void markCommandAsDone(String globalTxId, String localTxId);
+
+ List<Command> findUncompletedCommands(String globalTxId);
+}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
new file mode 100644
index 0000000..cdf1f6c
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+public enum CommandStatus {
+ NEW,
+ DONE
+}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 5dc5788..560096f 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,7 +17,6 @@
package org.apache.servicecomb.saga.alpha.core;
-import static java.util.Collections.emptySet;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
@@ -26,10 +25,8 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@@ -40,6 +37,7 @@ public class TxConsistentService {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private final TxEventRepository eventRepository;
+ private final CommandRepository commandRepository;
private final OmegaCallback omegaCallback;
private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
@@ -47,11 +45,13 @@ public class TxConsistentService {
put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
}};
- private final Map<String, Set<String>> eventsToCompensate = new HashMap<>();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
- public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
+ public TxConsistentService(TxEventRepository eventRepository,
+ CommandRepository commandRepository,
+ OmegaCallback omegaCallback) {
this.eventRepository = eventRepository;
+ this.commandRepository = commandRepository;
this.omegaCallback = omegaCallback;
}
@@ -68,7 +68,7 @@ public class TxConsistentService {
private void compensateIfAlreadyAborted(TxEvent event) {
if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
- eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()).add(event.localTxId());
+ commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
TxEvent correspondingStartedEvent = eventRepository
.findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
@@ -77,7 +77,7 @@ public class TxConsistentService {
}
private boolean isCompensationScheduled(TxEvent event) {
- return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId());
+ return commandRepository.exists(event.globalTxId(), event.localTxId());
}
private void compensate(TxEvent event) {
@@ -85,8 +85,7 @@ public class TxConsistentService {
events.removeIf(this::isCompensationScheduled);
- Set<String> localTxIds = eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>());
- events.forEach(e -> localTxIds.add(e.localTxId()));
+ commandRepository.saveCompensationCommands(event.globalTxId());
events.forEach(omegaCallback::compensate);
}
@@ -94,13 +93,10 @@ public class TxConsistentService {
// TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
// unless we ask user to specify a name for each participant in the global TX in @Compensable
private void updateCompensateStatus(TxEvent event) {
- Set<String> events = eventsToCompensate.get(event.globalTxId());
- if (events != null) {
- events.remove(event.localTxId());
- if (events.isEmpty()) {
- markGlobalTxEnd(event);
- eventsToCompensate.remove(event.globalTxId());
- }
+ commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
+ if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
+ && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+ markGlobalTxEnd(event);
}
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 37a29f1..760dd70 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -111,4 +111,18 @@ public class TxEvent {
public byte[] payloads() {
return payloads;
}
+
+ @Override
+ public String toString() {
+ return "TxEvent{" +
+ "serviceName='" + serviceName + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", creationTime=" + creationTime +
+ ", globalTxId='" + globalTxId + '\'' +
+ ", localTxId='" + localTxId + '\'' +
+ ", parentTxId='" + parentTxId + '\'' +
+ ", type='" + type + '\'' +
+ ", compensationMethod='" + compensationMethod + '\'' +
+ '}';
+ }
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 99667e7..8ae60a3 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.core;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
@@ -34,16 +35,20 @@ import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
import org.apache.servicecomb.saga.common.EventType;
import org.junit.Test;
public class TxConsistentServiceTest {
- private final List<TxEvent> events = new ArrayList<>();
+ private final Deque<TxEvent> events = new ConcurrentLinkedDeque<>();
private final TxEventRepository eventRepository = new TxEventRepository() {
@Override
public void save(TxEvent event) {
@@ -92,6 +97,51 @@ public class TxConsistentServiceTest {
}
};
+ private final List<Command> commands = new ArrayList<>();
+ private final CommandRepository commandRepository = new CommandRepository() {
+ @Override
+ public boolean exists(String globalTxId, String localTxId) {
+ return commands.stream()
+ .anyMatch(command -> globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId()));
+ }
+
+ @Override
+ public void saveCompensationCommand(String globalTxId, String localTxId) {
+ TxEvent event = eventRepository.findFirstTransaction(globalTxId, localTxId, TxStartedEvent.name());
+ commands.add(new Command(event));
+ }
+
+ @Override
+ public void saveCompensationCommands(String globalTxId) {
+ List<TxEvent> events = eventRepository.findTransactionsToCompensate(globalTxId);
+
+ Map<String, Command> commandMap = new HashMap<>();
+
+ for (TxEvent event : events) {
+ commandMap.computeIfAbsent(event.localTxId(), k -> new Command(event));
+ }
+
+ commands.addAll(commandMap.values());
+ }
+
+ @Override
+ public void markCommandAsDone(String globalTxId, String localTxId) {
+ for (int i = 0; i < commands.size(); i++) {
+ Command command = commands.get(i);
+ if (globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())) {
+ commands.set(i, new Command(command, DONE));
+ }
+ }
+ }
+
+ @Override
+ public List<Command> findUncompletedCommands(String globalTxId) {
+ return commands.stream()
+ .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
+ .collect(Collectors.toList());
+ }
+ };
+
private final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
@@ -104,7 +154,7 @@ public class TxConsistentServiceTest {
private final OmegaCallback omegaCallback = event ->
compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
- private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+ private final TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
@Test
public void persistEventOnArrival() throws Exception {
@@ -150,7 +200,7 @@ public class TxConsistentServiceTest {
consistentService.handle(compensateEvent1);
await().atMost(1, SECONDS).until(() -> events.size() == 8);
- assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
+ assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
}
@Test
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index bb4ba89..00dfe27 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
@@ -59,12 +60,18 @@ class AlphaConfig {
}
@Bean
+ CommandRepository springCommandRepository(TxEventEnvelopeRepository eventRepo, CommandEntityRepository commandRepository) {
+ return new SpringCommandRepository(eventRepo, commandRepository);
+ }
+
+ @Bean
TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
TxEventRepository eventRepository,
+ CommandRepository commandRepository,
OmegaCallback omegaCallback,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
- TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+ TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
new file mode 100644
index 0000000..3eac681
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.Date;
+
+import javax.persistence.Embedded;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+import javax.persistence.Version;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+
+@Entity
+class CommandEntity {
+ @Id
+ @GeneratedValue
+ private long surrogateId;
+
+ @Embedded
+ private Command command;
+
+ private Date lastModified;
+
+ @Version
+ private int version;
+
+ CommandEntity() {
+ }
+
+ CommandEntity(long id, TxEvent event) {
+ surrogateId = id;
+ lastModified = new Date();
+ command = new Command(event);
+ }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
new file mode 100644
index 0000000..4b7309e
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+
+public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> {
+ Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId);
+
+ @Modifying
+ @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c "
+ + "SET c.command.status = :status "
+ + "WHERE c.command.globalTxId = :globalTxId "
+ + "AND c.command.localTxId = :localTxId")
+ void updateStatusByGlobalTxIdAndLocalTxId(
+ @Param("status") String status,
+ @Param("globalTxId") String globalTxId,
+ @Param("localTxId") String localTxId);
+
+ List<Command> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
new file mode 100644
index 0000000..9281b7e
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+
+public class SpringCommandRepository implements CommandRepository {
+ private final TxEventEnvelopeRepository eventRepository;
+ private final CommandEntityRepository commandRepository;
+
+ SpringCommandRepository(TxEventEnvelopeRepository eventRepository, CommandEntityRepository commandRepository) {
+ this.eventRepository = eventRepository;
+ this.commandRepository = commandRepository;
+ }
+
+ @Override
+ public boolean exists(String globalTxId, String localTxId) {
+ return commandRepository.findByCommandGlobalTxIdAndCommandLocalTxId(globalTxId, localTxId).isPresent();
+ }
+
+ @Override
+ public void saveCompensationCommand(String globalTxId, String localTxId) {
+ TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(
+ globalTxId,
+ localTxId,
+ TxStartedEvent.name());
+
+ commandRepository.save(new CommandEntity(startedEvent.id(), startedEvent.event()));
+ }
+
+ @Override
+ public void saveCompensationCommands(String globalTxId) {
+ List<TxEventEnvelope> events = eventRepository
+ .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
+
+ Map<String, CommandEntity> commands = new HashMap<>();
+
+ for (TxEventEnvelope event : events) {
+ commands.computeIfAbsent(event.localTxId(), k -> new CommandEntity(event.id(), event.event()));
+ }
+
+ commandRepository.save(commands.values());
+ }
+
+ @Override
+ public void markCommandAsDone(String globalTxId, String localTxId) {
+ commandRepository.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId);
+ }
+
+ @Override
+ public List<Command> findUncompletedCommands(String globalTxId) {
+ return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name());
+ }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index bdf82f1..fcb7c00 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -52,4 +52,17 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " AND t2.type = 'TxCompensatedEvent')"
)
List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+
+ @Query("FROM TxEventEnvelope t "
+ + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( "
+ + " FROM TxEventEnvelope t1 "
+ + " WHERE t1.event.globalTxId = ?1 "
+ + " AND t1.event.localTxId = t.event.localTxId "
+ + " AND t1.event.type = 'TxEndedEvent'"
+ + ") AND NOT EXISTS ( "
+ + " FROM TxEventEnvelope t2 "
+ + " WHERE t2.event.globalTxId = ?1 "
+ + " AND t2.event.localTxId = t.event.localTxId "
+ + " AND t2.event.type = 'TxCompensatedEvent')")
+ List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index cead07a..5a61dc7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -68,6 +68,7 @@ public class TransactionAspect {
}
LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
+ // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha
scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
try {
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.