You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:27 UTC

[incubator-servicecomb-saga] 01/09: SCB-218 replaced in memory compensation store with persistent repo to make alpha stateless

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 705b27472da79e1fedd29b2e8bed2a024289514d
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 16 11:54:52 2018 +0800

    SCB-218 replaced in memory compensation store with persistent repo to make alpha stateless
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/{TxEvent.java => Command.java} | 93 +++++++++-------------
 .../saga/alpha/core/CommandRepository.java         | 32 ++++++++
 .../servicecomb/saga/alpha/core/CommandStatus.java | 23 ++++++
 .../saga/alpha/core/TxConsistentService.java       | 28 +++----
 .../servicecomb/saga/alpha/core/TxEvent.java       | 14 ++++
 .../saga/alpha/core/TxConsistentServiceTest.java   | 56 ++++++++++++-
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  9 ++-
 .../saga/alpha/server/CommandEntity.java           | 53 ++++++++++++
 .../saga/alpha/server/CommandEntityRepository.java | 43 ++++++++++
 .../saga/alpha/server/SpringCommandRepository.java | 78 ++++++++++++++++++
 .../alpha/server/TxEventEnvelopeRepository.java    | 13 +++
 .../saga/omega/transaction/TransactionAspect.java  |  1 +
 12 files changed, 368 insertions(+), 75 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
similarity index 58%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 37a29f1..08f8527 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -17,98 +17,81 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import java.util.Date;
-
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-
-@Entity
-public class TxEvent {
-  @Id
-  @GeneratedValue(strategy = GenerationType.IDENTITY)
-  private Long surrogateId;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 
+public class Command {
   private String serviceName;
   private String instanceId;
-  private Date creationTime;
   private String globalTxId;
   private String localTxId;
   private String parentTxId;
-  private String type;
   private String compensationMethod;
   private byte[] payloads;
+  private String status;
 
-  private TxEvent() {
+  Command() {
   }
 
-  public TxEvent(
-      String serviceName,
+  Command(String serviceName,
       String instanceId,
       String globalTxId,
       String localTxId,
       String parentTxId,
-      String type,
       String compensationMethod,
-      byte[] payloads) {
-    this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
-  }
+      byte[] payloads,
+      String status) {
 
-  public TxEvent(
-      String serviceName,
-      String instanceId,
-      Date creationTime,
-      String globalTxId,
-      String localTxId,
-      String parentTxId,
-      String type,
-      String compensationMethod,
-      byte[] payloads) {
     this.serviceName = serviceName;
     this.instanceId = instanceId;
-    this.creationTime = creationTime;
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
     this.parentTxId = parentTxId;
-    this.type = type;
     this.compensationMethod = compensationMethod;
     this.payloads = payloads;
+    this.status = status;
   }
 
-  public String serviceName() {
-    return serviceName;
+  Command(String serviceName,
+      String instanceId,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String compensationMethod,
+      byte[] payloads) {
+
+    this(serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
   }
 
-  public String instanceId() {
-    return instanceId;
+  Command(Command command, CommandStatus status) {
+    this(command.serviceName,
+        command.instanceId,
+        command.globalTxId,
+        command.localTxId,
+        command.parentTxId,
+        command.compensationMethod,
+        command.payloads,
+        status.name());
   }
 
-  public Date creationTime() {
-    return creationTime;
+  public Command(TxEvent event) {
+    this(event.serviceName(),
+        event.instanceId(),
+        event.globalTxId(),
+        event.localTxId(),
+        event.parentTxId(),
+        event.compensationMethod(),
+        event.payloads());
   }
 
-  public String globalTxId() {
+  String globalTxId() {
     return globalTxId;
   }
 
-  public String localTxId() {
+  String localTxId() {
     return localTxId;
   }
 
-  public String parentTxId() {
-    return parentTxId;
-  }
-
-  public String type() {
-    return type;
-  }
-
-  public String compensationMethod() {
-    return compensationMethod;
-  }
-
-  public byte[] payloads() {
-    return payloads;
+  String status() {
+    return status;
   }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
new file mode 100644
index 0000000..915d476
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+import java.util.List;
+
+public interface CommandRepository {
+  boolean exists(String globalTxId, String localTxId);
+
+  void saveCompensationCommand(String globalTxId, String localTxId);
+
+  void saveCompensationCommands(String globalTxId);
+
+  void markCommandAsDone(String globalTxId, String localTxId);
+
+  List<Command> findUncompletedCommands(String globalTxId);
+}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
new file mode 100644
index 0000000..cdf1f6c
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+public enum CommandStatus {
+  NEW,
+  DONE
+}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 5dc5788..560096f 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,7 +17,6 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static java.util.Collections.emptySet;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
@@ -26,10 +25,8 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
@@ -40,6 +37,7 @@ public class TxConsistentService {
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final TxEventRepository eventRepository;
+  private final CommandRepository commandRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
     put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
@@ -47,11 +45,13 @@ public class TxConsistentService {
     put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
   }};
 
-  private final Map<String, Set<String>> eventsToCompensate = new HashMap<>();
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
-  public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
+  public TxConsistentService(TxEventRepository eventRepository,
+      CommandRepository commandRepository,
+      OmegaCallback omegaCallback) {
     this.eventRepository = eventRepository;
+    this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
   }
 
@@ -68,7 +68,7 @@ public class TxConsistentService {
 
   private void compensateIfAlreadyAborted(TxEvent event) {
     if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
-      eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()).add(event.localTxId());
+      commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
       TxEvent correspondingStartedEvent = eventRepository
           .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
 
@@ -77,7 +77,7 @@ public class TxConsistentService {
   }
 
   private boolean isCompensationScheduled(TxEvent event) {
-    return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId());
+    return commandRepository.exists(event.globalTxId(), event.localTxId());
   }
 
   private void compensate(TxEvent event) {
@@ -85,8 +85,7 @@ public class TxConsistentService {
 
     events.removeIf(this::isCompensationScheduled);
 
-    Set<String> localTxIds = eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>());
-    events.forEach(e -> localTxIds.add(e.localTxId()));
+    commandRepository.saveCompensationCommands(event.globalTxId());
 
     events.forEach(omegaCallback::compensate);
   }
@@ -94,13 +93,10 @@ public class TxConsistentService {
   // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
   // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensateStatus(TxEvent event) {
-    Set<String> events = eventsToCompensate.get(event.globalTxId());
-    if (events != null) {
-      events.remove(event.localTxId());
-      if (events.isEmpty()) {
-        markGlobalTxEnd(event);
-        eventsToCompensate.remove(event.globalTxId());
-      }
+    commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
+    if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
+        && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+      markGlobalTxEnd(event);
     }
   }
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 37a29f1..760dd70 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -111,4 +111,18 @@ public class TxEvent {
   public byte[] payloads() {
     return payloads;
   }
+
+  @Override
+  public String toString() {
+    return "TxEvent{" +
+        "serviceName='" + serviceName + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", creationTime=" + creationTime +
+        ", globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", type='" + type + '\'' +
+        ", compensationMethod='" + compensationMethod + '\'' +
+        '}';
+  }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 99667e7..8ae60a3 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
@@ -34,16 +35,20 @@ import static org.junit.Assert.assertThat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.junit.Test;
 
 public class TxConsistentServiceTest {
-  private final List<TxEvent> events = new ArrayList<>();
+  private final Deque<TxEvent> events = new ConcurrentLinkedDeque<>();
   private final TxEventRepository eventRepository = new TxEventRepository() {
     @Override
     public void save(TxEvent event) {
@@ -92,6 +97,51 @@ public class TxConsistentServiceTest {
     }
   };
 
+  private final List<Command> commands = new ArrayList<>();
+  private final CommandRepository commandRepository = new CommandRepository() {
+    @Override
+    public boolean exists(String globalTxId, String localTxId) {
+      return commands.stream()
+          .anyMatch(command -> globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId()));
+    }
+
+    @Override
+    public void saveCompensationCommand(String globalTxId, String localTxId) {
+      TxEvent event = eventRepository.findFirstTransaction(globalTxId, localTxId, TxStartedEvent.name());
+      commands.add(new Command(event));
+    }
+
+    @Override
+    public void saveCompensationCommands(String globalTxId) {
+      List<TxEvent> events = eventRepository.findTransactionsToCompensate(globalTxId);
+
+      Map<String, Command> commandMap = new HashMap<>();
+
+      for (TxEvent event : events) {
+        commandMap.computeIfAbsent(event.localTxId(), k -> new Command(event));
+      }
+
+      commands.addAll(commandMap.values());
+    }
+
+    @Override
+    public void markCommandAsDone(String globalTxId, String localTxId) {
+      for (int i = 0; i < commands.size(); i++) {
+        Command command = commands.get(i);
+        if (globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())) {
+          commands.set(i, new Command(command, DONE));
+        }
+      }
+    }
+
+    @Override
+    public List<Command> findUncompletedCommands(String globalTxId) {
+      return commands.stream()
+          .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
+          .collect(Collectors.toList());
+    }
+  };
+
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
@@ -104,7 +154,7 @@ public class TxConsistentServiceTest {
   private final OmegaCallback omegaCallback = event ->
       compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
 
-  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
 
   @Test
   public void persistEventOnArrival() throws Exception {
@@ -150,7 +200,7 @@ public class TxConsistentServiceTest {
     consistentService.handle(compensateEvent1);
 
     await().atMost(1, SECONDS).until(() -> events.size() == 8);
-    assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
+    assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
   }
 
   @Test
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index bb4ba89..00dfe27 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.PostConstruct;
 
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
@@ -59,12 +60,18 @@ class AlphaConfig {
   }
 
   @Bean
+  CommandRepository springCommandRepository(TxEventEnvelopeRepository eventRepo, CommandEntityRepository commandRepository) {
+    return new SpringCommandRepository(eventRepo, commandRepository);
+  }
+
+  @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
       TxEventRepository eventRepository,
+      CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
-    TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+    TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
new file mode 100644
index 0000000..3eac681
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.Date;
+
+import javax.persistence.Embedded;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+import javax.persistence.Version;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+
+@Entity
+class CommandEntity {
+  @Id
+  @GeneratedValue
+  private long surrogateId;
+
+  @Embedded
+  private Command command;
+
+  private Date lastModified;
+
+  @Version
+  private int version;
+
+  CommandEntity() {
+  }
+
+  CommandEntity(long id, TxEvent event) {
+    surrogateId = id;
+    lastModified = new Date();
+    command = new Command(event);
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
new file mode 100644
index 0000000..4b7309e
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+
+public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> {
+  Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId);
+
+  @Modifying
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c "
+      + "SET c.command.status = :status "
+      + "WHERE c.command.globalTxId = :globalTxId "
+      + "AND c.command.localTxId = :localTxId")
+  void updateStatusByGlobalTxIdAndLocalTxId(
+      @Param("status") String status,
+      @Param("globalTxId") String globalTxId,
+      @Param("localTxId") String localTxId);
+
+  List<Command> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
new file mode 100644
index 0000000..9281b7e
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+
+public class SpringCommandRepository implements CommandRepository {
+  private final TxEventEnvelopeRepository eventRepository;
+  private final CommandEntityRepository commandRepository;
+
+  SpringCommandRepository(TxEventEnvelopeRepository eventRepository, CommandEntityRepository commandRepository) {
+    this.eventRepository = eventRepository;
+    this.commandRepository = commandRepository;
+  }
+
+  @Override
+  public boolean exists(String globalTxId, String localTxId) {
+    return commandRepository.findByCommandGlobalTxIdAndCommandLocalTxId(globalTxId, localTxId).isPresent();
+  }
+
+  @Override
+  public void saveCompensationCommand(String globalTxId, String localTxId) {
+    TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(
+        globalTxId,
+        localTxId,
+        TxStartedEvent.name());
+
+    commandRepository.save(new CommandEntity(startedEvent.id(), startedEvent.event()));
+  }
+
+  @Override
+  public void saveCompensationCommands(String globalTxId) {
+    List<TxEventEnvelope> events = eventRepository
+        .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
+
+    Map<String, CommandEntity> commands = new HashMap<>();
+
+    for (TxEventEnvelope event : events) {
+      commands.computeIfAbsent(event.localTxId(), k -> new CommandEntity(event.id(), event.event()));
+    }
+
+    commandRepository.save(commands.values());
+  }
+
+  @Override
+  public void markCommandAsDone(String globalTxId, String localTxId) {
+    commandRepository.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId);
+  }
+
+  @Override
+  public List<Command> findUncompletedCommands(String globalTxId) {
+    return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name());
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index bdf82f1..fcb7c00 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -52,4 +52,17 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  AND t2.type = 'TxCompensatedEvent')"
  )
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+
+  @Query("FROM TxEventEnvelope t "
+      + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( "
+      + "  FROM TxEventEnvelope t1 "
+      + "  WHERE t1.event.globalTxId = ?1 "
+      + "  AND t1.event.localTxId = t.event.localTxId "
+      + "  AND t1.event.type = 'TxEndedEvent'"
+      + ") AND NOT EXISTS ( "
+      + "  FROM TxEventEnvelope t2 "
+      + "  WHERE t2.event.globalTxId = ?1 "
+      + "  AND t2.event.localTxId = t.event.localTxId "
+      + "  AND t2.event.type = 'TxCompensatedEvent')")
+  List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index cead07a..5a61dc7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -68,6 +68,7 @@ public class TransactionAspect {
     }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
+    // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha
     scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
 
     try {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.