You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/10/10 13:32:34 UTC

[incubator-servicecomb-saga] 05/07: SCB-909 Add alpha globalTx timeout and clear mechanism.

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 89cae3edf8238b1f40e5e89710c493652c8b1d9f
Author: cherrylzhao <zh...@126.com>
AuthorDate: Sat Sep 29 22:12:16 2018 +0800

    SCB-909 Add alpha globalTx timeout and clear mechanism.
---
 .../server/tcc/jpa/GlobalTxEventRepository.java    |  17 +++
 .../tcc/service/MemoryTxEventRepository.java       |  12 ++
 .../server/tcc/service/RDBTxEventRepository.java   |  13 +++
 .../server/tcc/service/TccTxEventRepository.java   |   7 +-
 .../server/tcc/service/TccTxEventService.java      |  26 ++++-
 .../server/tcc/service/TccTxEventServiceTest.java  | 128 +++++++++++++++++++--
 6 files changed, 188 insertions(+), 15 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
index 439e0f5..8c3023b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
@@ -17,8 +17,12 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.jpa;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Optional;
+import javax.transaction.Transactional;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
@@ -30,4 +34,17 @@ public interface GlobalTxEventRepository extends CrudRepository<GlobalTxEvent, L
   @Query(value = "SELECT t FROM GlobalTxEvent AS t WHERE t.globalTxId = ?1 and t.localTxId = ?2 and t.txType = ?3")
   Optional<GlobalTxEvent> findByUniqueKey(String globalTxId, String localTxId, String txType);
 
+  @Query(value = "SELECT t FROM GlobalTxEvent AS t WHERE t.creationTime < ?1 and t.txType = ?2 order by t.creationTime asc")
+  Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable);
+
+  @Query(value = "SELECT t.globalTxId from GlobalTxEvent as t GROUP BY t.globalTxId HAVING COUNT(t.globalTxId) = 2 "
+      + "AND NOT EXISTS (select 1 from  ParticipatedEvent as b where b.globalTxId = t.globalTxId)"
+  )
+  Optional<List<String>> findCompletedGlobalTx(Pageable pageable);
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query(value = "DELETE FROM GlobalTxEvent as t where t.globalTxId in (?1)")
+  void deleteByGlobalId(String globalTxId);
+
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
index 2ec392b..3bdd873 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server.tcc.service;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +35,7 @@ import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Profile;
+import org.springframework.data.domain.Pageable;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -108,6 +110,16 @@ public class MemoryTxEventRepository implements TccTxEventRepository {
   }
 
   @Override
+  public Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable) {
+    return Optional.empty();
+  }
+
+  @Override
+  public void clearCompletedGlobalTx(Pageable pageable) {
+
+  }
+
+  @Override
   public Iterable<TccTxEvent> findAll() {
     List<TccTxEvent> events = new ArrayList<>();
     for (String globalTxId : tccEventMap.keySet()) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
index 745441a..9e24a36 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.service;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
@@ -29,6 +30,7 @@ import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEventDBRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Profile;
+import org.springframework.data.domain.Pageable;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -96,6 +98,17 @@ public class RDBTxEventRepository implements TccTxEventRepository {
   }
 
   @Override
+  public Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable) {
+    return globalTxEventRepository.findTimeoutGlobalTx(deadLine, txType, pageable);
+  }
+
+  @Override
+  public void clearCompletedGlobalTx(Pageable pageable) {
+    globalTxEventRepository.findCompletedGlobalTx(pageable).ifPresent(e -> e.forEach(t ->
+        globalTxEventRepository.deleteByGlobalId(t)));
+    }
+
+  @Override
   public Iterable<TccTxEvent> findAll() {
     return tccTxEventDBRepository.findAll();
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
index f167a85..ebc19ce 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
@@ -17,13 +17,14 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.service;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Optional;
-
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.springframework.data.domain.Pageable;
 
 public interface TccTxEventRepository {
 
@@ -43,6 +44,10 @@ public interface TccTxEventRepository {
 
   Optional<TccTxEvent> findByUniqueKey(String globalTxId, String localTxId, TccTxType tccTxType);
 
+  Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable);
+
+  void clearCompletedGlobalTx(Pageable pageable);
+
   Iterable<TccTxEvent> findAll();
 
   void deleteAll();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
index 068e66d..1883c98 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
@@ -18,13 +18,16 @@
 package org.apache.servicecomb.saga.alpha.server.tcc.service;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Date;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.data.domain.PageRequest;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -89,7 +92,7 @@ public class TccTxEventService {
           globalTxEvent.getGlobalTxId(), globalTxEvent.getLocalTxId(), globalTxEvent.getTxType(), ex);
       return false;
     }
-    // Just return the excution result back
+    // Just return the execution result back
     return tccCallbackEngine.execute(globalTxEvent);
   }
 
@@ -108,4 +111,23 @@ public class TccTxEventService {
     return true;
   }
 
-}
+  public void handleTimeoutTx(Date deadLine, int size) {
+    tccTxEventRepository.findTimeoutGlobalTx(deadLine, TccTxType.STARTED.name(), new PageRequest(0, size))
+        .ifPresent(e -> e.forEach(t -> {
+          GlobalTxEvent globalTxEvent = new GlobalTxEvent(
+              t.getServiceName(),
+              t.getInstanceId(),
+              t.getGlobalTxId(),
+              t.getLocalTxId(),
+              t.getParentTxId(),
+              TccTxType.END_TIMEOUT.name(),
+              TransactionStatus.Failed.name());
+          onTccEndedEvent(globalTxEvent);
+        }));
+  }
+
+  public void clearCompletedGlobalTx(int size) {
+    tccTxEventRepository.clearCompletedGlobalTx(new PageRequest(0, size));
+  }
+
+}
\ No newline at end of file
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
index e45f222..6993d79 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
@@ -18,16 +18,24 @@
 package org.apache.servicecomb.saga.alpha.server.tcc.service;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import io.grpc.stub.StreamObserver;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
 import org.apache.servicecomb.saga.alpha.server.tcc.TccApplication;
 import org.apache.servicecomb.saga.alpha.server.tcc.TccConfiguration;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallbacksRegistry;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEventRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEventRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
 import org.apache.servicecomb.saga.common.TransactionStatus;
@@ -42,12 +50,23 @@ import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class})
+@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class}, properties = {
+    "spring.jpa.show-sql=true"
+})
 public class TccTxEventServiceTest {
 
   @Autowired
   private TccTxEventService tccTxEventService;
 
+  @Autowired
+  private TccTxEventRepository tccTxEventRepository;
+
+  @Autowired
+  private GlobalTxEventRepository globalTxEventRepository;
+
+  @Autowired
+  private ParticipatedEventRepository participatedEventRepository;
+
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
@@ -68,17 +87,10 @@ public class TccTxEventServiceTest {
 
   @Before
   public void setup() {
-    tccStartEvent = new GlobalTxEvent(serviceName, instanceId, globalTxId,
-        localTxId, parentTxId, TccTxType.STARTED.name(), TransactionStatus.Succeed.name());
-
-    participatedEvent = new ParticipatedEvent(serviceName, instanceId, globalTxId, localTxId,
-        parentTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed.name());
-
-    tccEndEvent = new GlobalTxEvent(serviceName, instanceId, globalTxId,
-        localTxId, parentTxId, TccTxType.ENDED.name(), TransactionStatus.Succeed.name());
-
-    coordinateEvent = new TccTxEvent(serviceName, instanceId, globalTxId,
-        localTxId, parentTxId, TccTxType.COORDINATED.name(), TransactionStatus.Succeed.name());
+    tccStartEvent = newGlobalTxEvent(TccTxType.STARTED, globalTxId, TransactionStatus.Succeed);
+    participatedEvent = newParticipateEvent(globalTxId, TransactionStatus.Succeed);
+    tccEndEvent = newGlobalTxEvent(TccTxType.ENDED, globalTxId, TransactionStatus.Succeed);
+    coordinateEvent = newTccTxEvent(TccTxType.COORDINATED, globalTxId, TransactionStatus.Succeed);
   }
 
   @After
@@ -101,4 +113,96 @@ public class TccTxEventServiceTest {
     tccTxEventService.onTccEndedEvent(tccEndEvent);
     verify(observer).onNext(any());
   }
+
+  @Test
+  public void handleTimeoutGlobalTraction() throws InterruptedException {
+    StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+    tccTxEventService.onTccStartedEvent(tccStartEvent);
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+
+    Thread.sleep(3000l);
+    Date deadLine = new Date(System.currentTimeMillis() - SECONDS.toMillis(2));
+    tccTxEventService.handleTimeoutTx(deadLine, 1);
+
+    // global tx has timeout, so participated event will be coordinated through cancel.
+    Optional<GlobalTxEvent> timeoutEvent = globalTxEventRepository.findByUniqueKey(globalTxId, localTxId, TccTxType.END_TIMEOUT.name());
+    assertThat(timeoutEvent.isPresent(), is(true));
+    assertThat(timeoutEvent.get().getStatus(), is(TransactionStatus.Failed.name()));
+    assertThat(timeoutEvent.get().getTxType(), is(TccTxType.END_TIMEOUT.name()));
+    assertThat(timeoutEvent.get().getGlobalTxId(), is(globalTxId));
+    assertThat(timeoutEvent.get().getLocalTxId(), is(localTxId));
+    assertThat(timeoutEvent.get().getParentTxId(), is(parentTxId));
+    assertThat(timeoutEvent.get().getServiceName(), is(serviceName));
+    verify(observer).onNext(any());
+
+    Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(events.get().size(), is(3));
+  }
+
+  @Test
+  public void clearUpCompletedTxFromGlobalTxTable() {
+    StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+    tccTxEventService.onTccStartedEvent(tccStartEvent);
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    tccTxEventService.onTccEndedEvent(tccEndEvent);
+    tccTxEventService.onCoordinatedEvent(coordinateEvent);
+
+    tccTxEventService.clearCompletedGlobalTx(1);
+
+    assertThat(participatedEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
+    assertThat(globalTxEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
+
+    Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(events.get().size(), is(4));
+  }
+
+  @Test
+  public void clearUpCompletedTxFromGlobalTxTableMoreThanOne() {
+    StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
+    OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+    // one global tx
+    tccTxEventService.onTccStartedEvent(tccStartEvent);
+    tccTxEventService.onParticipatedEvent(participatedEvent);
+    tccTxEventService.onTccEndedEvent(tccEndEvent);
+    tccTxEventService.onCoordinatedEvent(coordinateEvent);
+
+    // another global tx
+    String globalTxId_2 = uniquify("globalTxId");
+    tccTxEventService.onTccStartedEvent(newGlobalTxEvent(TccTxType.STARTED, globalTxId_2, TransactionStatus.Succeed));
+    tccTxEventService.onParticipatedEvent(newParticipateEvent(globalTxId_2, TransactionStatus.Succeed));
+    tccTxEventService.onTccEndedEvent(newGlobalTxEvent(TccTxType.ENDED, globalTxId_2, TransactionStatus.Succeed));
+    tccTxEventService.onCoordinatedEvent(newTccTxEvent(TccTxType.COORDINATED, globalTxId_2, TransactionStatus.Succeed));
+
+    tccTxEventService.clearCompletedGlobalTx(2);
+
+    assertThat(participatedEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
+    assertThat(globalTxEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
+
+    Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
+    assertThat(events.get().size(), is(4));
+
+    events = tccTxEventRepository.findByGlobalTxId(globalTxId_2);
+    assertThat(events.get().size(), is(4));
+
+  }
+
+  private ParticipatedEvent newParticipateEvent(String globalTxId, TransactionStatus transactionStatus) {
+    return new ParticipatedEvent(serviceName, instanceId, globalTxId, localTxId,
+        parentTxId, confirmMethod, cancelMethod, transactionStatus.name());
+  }
+
+  private GlobalTxEvent newGlobalTxEvent(TccTxType tccTxType, String globalTxId, TransactionStatus transactionStatus) {
+    return new GlobalTxEvent(serviceName, instanceId, globalTxId,
+        localTxId, parentTxId, tccTxType.name(), transactionStatus.name());
+  }
+
+  private TccTxEvent newTccTxEvent(TccTxType tccTxType, String globalTxId, TransactionStatus transactionStatus) {
+    return new TccTxEvent(serviceName, instanceId, globalTxId,
+        localTxId, parentTxId, tccTxType.name(), transactionStatus.name());
+  }
 }