You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/10/10 13:32:34 UTC
[incubator-servicecomb-saga] 05/07: SCB-909 Add alpha globalTx
timeout and clear mechanism.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 89cae3edf8238b1f40e5e89710c493652c8b1d9f
Author: cherrylzhao <zh...@126.com>
AuthorDate: Sat Sep 29 22:12:16 2018 +0800
SCB-909 Add alpha globalTx timeout and clear mechanism.
---
.../server/tcc/jpa/GlobalTxEventRepository.java | 17 +++
.../tcc/service/MemoryTxEventRepository.java | 12 ++
.../server/tcc/service/RDBTxEventRepository.java | 13 +++
.../server/tcc/service/TccTxEventRepository.java | 7 +-
.../server/tcc/service/TccTxEventService.java | 26 ++++-
.../server/tcc/service/TccTxEventServiceTest.java | 128 +++++++++++++++++++--
6 files changed, 188 insertions(+), 15 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
index 439e0f5..8c3023b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/jpa/GlobalTxEventRepository.java
@@ -17,8 +17,12 @@
package org.apache.servicecomb.saga.alpha.server.tcc.jpa;
+import java.util.Date;
import java.util.List;
import java.util.Optional;
+import javax.transaction.Transactional;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
@@ -30,4 +34,17 @@ public interface GlobalTxEventRepository extends CrudRepository<GlobalTxEvent, L
@Query(value = "SELECT t FROM GlobalTxEvent AS t WHERE t.globalTxId = ?1 and t.localTxId = ?2 and t.txType = ?3")
Optional<GlobalTxEvent> findByUniqueKey(String globalTxId, String localTxId, String txType);
+ @Query(value = "SELECT t FROM GlobalTxEvent AS t WHERE t.creationTime < ?1 and t.txType = ?2 order by t.creationTime asc")
+ Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable);
+
+ @Query(value = "SELECT t.globalTxId from GlobalTxEvent as t GROUP BY t.globalTxId HAVING COUNT(t.globalTxId) = 2 "
+ + "AND NOT EXISTS (select 1 from ParticipatedEvent as b where b.globalTxId = t.globalTxId)"
+ )
+ Optional<List<String>> findCompletedGlobalTx(Pageable pageable);
+
+ @Transactional
+ @Modifying(clearAutomatically = true)
+ @Query(value = "DELETE FROM GlobalTxEvent as t where t.globalTxId in (?1)")
+ void deleteByGlobalId(String globalTxId);
+
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
index 2ec392b..3bdd873 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server.tcc.service;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Date;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -34,6 +35,7 @@ import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
+import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Component;
@Component
@@ -108,6 +110,16 @@ public class MemoryTxEventRepository implements TccTxEventRepository {
}
@Override
+ public Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable) {
+ return Optional.empty();
+ }
+
+ @Override
+ public void clearCompletedGlobalTx(Pageable pageable) {
+
+ }
+
+ @Override
public Iterable<TccTxEvent> findAll() {
List<TccTxEvent> events = new ArrayList<>();
for (String globalTxId : tccEventMap.keySet()) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
index 745441a..9e24a36 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.saga.alpha.server.tcc.service;
+import java.util.Date;
import java.util.List;
import java.util.Optional;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
@@ -29,6 +30,7 @@ import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEventDBRepository;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
+import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -96,6 +98,17 @@ public class RDBTxEventRepository implements TccTxEventRepository {
}
@Override
+ public Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable) {
+ return globalTxEventRepository.findTimeoutGlobalTx(deadLine, txType, pageable);
+ }
+
+ @Override
+ public void clearCompletedGlobalTx(Pageable pageable) {
+ globalTxEventRepository.findCompletedGlobalTx(pageable).ifPresent(e -> e.forEach(t ->
+ globalTxEventRepository.deleteByGlobalId(t)));
+ }
+
+ @Override
public Iterable<TccTxEvent> findAll() {
return tccTxEventDBRepository.findAll();
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
index f167a85..ebc19ce 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
@@ -17,13 +17,14 @@
package org.apache.servicecomb.saga.alpha.server.tcc.service;
+import java.util.Date;
import java.util.List;
import java.util.Optional;
-
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.springframework.data.domain.Pageable;
public interface TccTxEventRepository {
@@ -43,6 +44,10 @@ public interface TccTxEventRepository {
Optional<TccTxEvent> findByUniqueKey(String globalTxId, String localTxId, TccTxType tccTxType);
+ Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable);
+
+ void clearCompletedGlobalTx(Pageable pageable);
+
Iterable<TccTxEvent> findAll();
void deleteAll();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
index 068e66d..1883c98 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventService.java
@@ -18,13 +18,16 @@
package org.apache.servicecomb.saga.alpha.server.tcc.service;
import java.lang.invoke.MethodHandles;
+import java.util.Date;
import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
+import org.apache.servicecomb.saga.common.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Component;
@Component
@@ -89,7 +92,7 @@ public class TccTxEventService {
globalTxEvent.getGlobalTxId(), globalTxEvent.getLocalTxId(), globalTxEvent.getTxType(), ex);
return false;
}
- // Just return the excution result back
+ // Just return the execution result back
return tccCallbackEngine.execute(globalTxEvent);
}
@@ -108,4 +111,23 @@ public class TccTxEventService {
return true;
}
-}
+ public void handleTimeoutTx(Date deadLine, int size) {
+ tccTxEventRepository.findTimeoutGlobalTx(deadLine, TccTxType.STARTED.name(), new PageRequest(0, size))
+ .ifPresent(e -> e.forEach(t -> {
+ GlobalTxEvent globalTxEvent = new GlobalTxEvent(
+ t.getServiceName(),
+ t.getInstanceId(),
+ t.getGlobalTxId(),
+ t.getLocalTxId(),
+ t.getParentTxId(),
+ TccTxType.END_TIMEOUT.name(),
+ TransactionStatus.Failed.name());
+ onTccEndedEvent(globalTxEvent);
+ }));
+ }
+
+ public void clearCompletedGlobalTx(int size) {
+ tccTxEventRepository.clearCompletedGlobalTx(new PageRequest(0, size));
+ }
+
+}
\ No newline at end of file
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
index e45f222..6993d79 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventServiceTest.java
@@ -18,16 +18,24 @@
package org.apache.servicecomb.saga.alpha.server.tcc.service;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.grpc.stub.StreamObserver;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
import org.apache.servicecomb.saga.alpha.server.tcc.TccApplication;
import org.apache.servicecomb.saga.alpha.server.tcc.TccConfiguration;
import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallbacksRegistry;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEventRepository;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEventRepository;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
import org.apache.servicecomb.saga.common.TransactionStatus;
@@ -42,12 +50,23 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
-@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class})
+@SpringBootTest(classes = {TccApplication.class, TccConfiguration.class}, properties = {
+ "spring.jpa.show-sql=true"
+})
public class TccTxEventServiceTest {
@Autowired
private TccTxEventService tccTxEventService;
+ @Autowired
+ private TccTxEventRepository tccTxEventRepository;
+
+ @Autowired
+ private GlobalTxEventRepository globalTxEventRepository;
+
+ @Autowired
+ private ParticipatedEventRepository participatedEventRepository;
+
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
private final String parentTxId = uniquify("parentTxId");
@@ -68,17 +87,10 @@ public class TccTxEventServiceTest {
@Before
public void setup() {
- tccStartEvent = new GlobalTxEvent(serviceName, instanceId, globalTxId,
- localTxId, parentTxId, TccTxType.STARTED.name(), TransactionStatus.Succeed.name());
-
- participatedEvent = new ParticipatedEvent(serviceName, instanceId, globalTxId, localTxId,
- parentTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed.name());
-
- tccEndEvent = new GlobalTxEvent(serviceName, instanceId, globalTxId,
- localTxId, parentTxId, TccTxType.ENDED.name(), TransactionStatus.Succeed.name());
-
- coordinateEvent = new TccTxEvent(serviceName, instanceId, globalTxId,
- localTxId, parentTxId, TccTxType.COORDINATED.name(), TransactionStatus.Succeed.name());
+ tccStartEvent = newGlobalTxEvent(TccTxType.STARTED, globalTxId, TransactionStatus.Succeed);
+ participatedEvent = newParticipateEvent(globalTxId, TransactionStatus.Succeed);
+ tccEndEvent = newGlobalTxEvent(TccTxType.ENDED, globalTxId, TransactionStatus.Succeed);
+ coordinateEvent = newTccTxEvent(TccTxType.COORDINATED, globalTxId, TransactionStatus.Succeed);
}
@After
@@ -101,4 +113,96 @@ public class TccTxEventServiceTest {
tccTxEventService.onTccEndedEvent(tccEndEvent);
verify(observer).onNext(any());
}
+
+ @Test
+ public void handleTimeoutGlobalTraction() throws InterruptedException {
+ StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
+ OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+ tccTxEventService.onTccStartedEvent(tccStartEvent);
+ tccTxEventService.onParticipatedEvent(participatedEvent);
+
+ Thread.sleep(3000l);
+ Date deadLine = new Date(System.currentTimeMillis() - SECONDS.toMillis(2));
+ tccTxEventService.handleTimeoutTx(deadLine, 1);
+
+ // global tx has timeout, so participated event will be coordinated through cancel.
+ Optional<GlobalTxEvent> timeoutEvent = globalTxEventRepository.findByUniqueKey(globalTxId, localTxId, TccTxType.END_TIMEOUT.name());
+ assertThat(timeoutEvent.isPresent(), is(true));
+ assertThat(timeoutEvent.get().getStatus(), is(TransactionStatus.Failed.name()));
+ assertThat(timeoutEvent.get().getTxType(), is(TccTxType.END_TIMEOUT.name()));
+ assertThat(timeoutEvent.get().getGlobalTxId(), is(globalTxId));
+ assertThat(timeoutEvent.get().getLocalTxId(), is(localTxId));
+ assertThat(timeoutEvent.get().getParentTxId(), is(parentTxId));
+ assertThat(timeoutEvent.get().getServiceName(), is(serviceName));
+ verify(observer).onNext(any());
+
+ Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
+ assertThat(events.get().size(), is(3));
+ }
+
+ @Test
+ public void clearUpCompletedTxFromGlobalTxTable() {
+ StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
+ OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+ tccTxEventService.onTccStartedEvent(tccStartEvent);
+ tccTxEventService.onParticipatedEvent(participatedEvent);
+ tccTxEventService.onTccEndedEvent(tccEndEvent);
+ tccTxEventService.onCoordinatedEvent(coordinateEvent);
+
+ tccTxEventService.clearCompletedGlobalTx(1);
+
+ assertThat(participatedEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
+ assertThat(globalTxEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
+
+ Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
+ assertThat(events.get().size(), is(4));
+ }
+
+ @Test
+ public void clearUpCompletedTxFromGlobalTxTableMoreThanOne() {
+ StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
+ OmegaCallbacksRegistry.register(serviceConfig, observer);
+
+ // one global tx
+ tccTxEventService.onTccStartedEvent(tccStartEvent);
+ tccTxEventService.onParticipatedEvent(participatedEvent);
+ tccTxEventService.onTccEndedEvent(tccEndEvent);
+ tccTxEventService.onCoordinatedEvent(coordinateEvent);
+
+ // another global tx
+ String globalTxId_2 = uniquify("globalTxId");
+ tccTxEventService.onTccStartedEvent(newGlobalTxEvent(TccTxType.STARTED, globalTxId_2, TransactionStatus.Succeed));
+ tccTxEventService.onParticipatedEvent(newParticipateEvent(globalTxId_2, TransactionStatus.Succeed));
+ tccTxEventService.onTccEndedEvent(newGlobalTxEvent(TccTxType.ENDED, globalTxId_2, TransactionStatus.Succeed));
+ tccTxEventService.onCoordinatedEvent(newTccTxEvent(TccTxType.COORDINATED, globalTxId_2, TransactionStatus.Succeed));
+
+ tccTxEventService.clearCompletedGlobalTx(2);
+
+ assertThat(participatedEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
+ assertThat(globalTxEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
+
+ Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
+ assertThat(events.get().size(), is(4));
+
+ events = tccTxEventRepository.findByGlobalTxId(globalTxId_2);
+ assertThat(events.get().size(), is(4));
+
+ }
+
+ private ParticipatedEvent newParticipateEvent(String globalTxId, TransactionStatus transactionStatus) {
+ return new ParticipatedEvent(serviceName, instanceId, globalTxId, localTxId,
+ parentTxId, confirmMethod, cancelMethod, transactionStatus.name());
+ }
+
+ private GlobalTxEvent newGlobalTxEvent(TccTxType tccTxType, String globalTxId, TransactionStatus transactionStatus) {
+ return new GlobalTxEvent(serviceName, instanceId, globalTxId,
+ localTxId, parentTxId, tccTxType.name(), transactionStatus.name());
+ }
+
+ private TccTxEvent newTccTxEvent(TccTxType tccTxType, String globalTxId, TransactionStatus transactionStatus) {
+ return new TccTxEvent(serviceName, instanceId, globalTxId,
+ localTxId, parentTxId, tccTxType.name(), transactionStatus.name());
+ }
}