You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/10/10 13:32:32 UTC
[incubator-servicecomb-saga] 03/07: SCB-909 Revise finding
coordinate event from participate.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 2285aefe0fb1de94ab2bbb1d012dc699ad6332bf
Author: cherrylzhao <zh...@126.com>
AuthorDate: Fri Sep 28 21:57:07 2018 +0800
SCB-909 Revise finding coordinate event from participate.
---
.../server/tcc/callback/TccCallbackEngine.java | 37 +++++++++++-----------
.../tcc/service/MemoryTxEventRepository.java | 17 +++++++---
.../server/tcc/service/RDBTxEventRepository.java | 5 +++
.../server/tcc/service/TccTxEventRepository.java | 2 ++
.../saga/alpha/server/tcc/TccApplication.java | 28 ----------------
...{TccApplication.java => TestConfiguration.java} | 13 +++-----
6 files changed, 42 insertions(+), 60 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
index 41a5191..7f684f4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
@@ -17,12 +17,13 @@
package org.apache.servicecomb.saga.alpha.server.tcc.callback;
-import com.google.common.collect.Lists;
import java.lang.invoke.MethodHandles;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEventRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventRepository;
import org.apache.servicecomb.saga.common.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,25 +38,23 @@ public class TccCallbackEngine implements CallbackEngine {
private OmegaCallbackWrapper omegaCallbackWrapper;
@Autowired
- private ParticipatedEventRepository participatedEventRepository;
+ private TccTxEventRepository tccTxEventRepository;
@Override
public boolean execute(GlobalTxEvent request) {
- boolean result = true;
- List<ParticipatedEvent> events = participatedEventRepository.findByGlobalTxId(request.getGlobalTxId()).orElse(
- Lists.newArrayList());
- for (ParticipatedEvent event : events) {
- try {
- // only invoke the event is succeed
- if (event.getStatus().equals(TransactionStatus.Succeed.toString())) {
- omegaCallbackWrapper.invoke(event, TransactionStatus.valueOf(request.getStatus()));
- }
- } catch (Exception ex) {
- logError(event, ex);
- result = false;
- }
- }
- return result;
+ AtomicBoolean result = new AtomicBoolean(true);
+ tccTxEventRepository.findParticipatedByGlobalTxId(request.getGlobalTxId())
+ .orElseGet(ArrayList::new).stream()
+ .filter(e -> e.getStatus().equals(TransactionStatus.Succeed.toString()))
+ .collect(Collectors.toList()).forEach(e -> {
+ try {
+ omegaCallbackWrapper.invoke(e, TransactionStatus.valueOf(request.getStatus()));
+ } catch (Exception ex) {
+ logError(e, ex);
+ result.set(false);
+ }
+ });
+ return result.get();
}
private void logError(ParticipatedEvent event, Exception ex) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
index f057a9d..2ec392b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
@@ -26,12 +26,11 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
@@ -77,6 +76,16 @@ public class MemoryTxEventRepository implements TccTxEventRepository {
}
@Override
+ public Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String globalTxId) {
+ return Optional.of(
+ findByGlobalTxId(globalTxId)
+ .orElse(new ArrayList<>()).stream()
+ .filter(e -> TccTxType.PARTICIPATED.name().equals(e.getTxType()))
+ .map(EventConverter::convertToParticipatedEvent).collect(Collectors.toList())
+ );
+ }
+
+ @Override
public Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String globalTxId, TccTxType tccTxType) {
Set<TccTxEvent> events = tccEventMap.get(globalTxId);
if ( events != null) {
@@ -101,8 +110,8 @@ public class MemoryTxEventRepository implements TccTxEventRepository {
@Override
public Iterable<TccTxEvent> findAll() {
List<TccTxEvent> events = new ArrayList<>();
- for (String golableTxId : tccEventMap.keySet()) {
- events.addAll(tccEventMap.get(golableTxId));
+ for (String globalTxId : tccEventMap.keySet()) {
+ events.addAll(tccEventMap.get(globalTxId));
}
return events;
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
index 938b63a..745441a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
@@ -81,6 +81,11 @@ public class RDBTxEventRepository implements TccTxEventRepository {
}
@Override
+ public Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String globalTxId) {
+ return participatedEventRepository.findByGlobalTxId(globalTxId);
+ }
+
+ @Override
public Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String globalTxId, TccTxType tccTxType) {
return tccTxEventDBRepository.findByGlobalTxIdAndTxType(globalTxId, tccTxType.name());
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
index 65ad2c0..f167a85 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
@@ -37,6 +37,8 @@ public interface TccTxEventRepository {
Optional<List<TccTxEvent>> findByGlobalTxId(String globalTxId);
+ Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String globalTxId);
+
Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String globalTxId, TccTxType tccTxType);
Optional<TccTxEvent> findByUniqueKey(String globalTxId, String localTxId, TccTxType tccTxType);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
index 479232b..d24425c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
@@ -18,14 +18,8 @@
package org.apache.servicecomb.saga.alpha.server.tcc;
import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
-import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
-import org.apache.servicecomb.saga.alpha.server.ServerStartable;
-import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
-import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
@SpringBootApplication(scanBasePackageClasses = GrpcTccEventService.class)
@@ -34,26 +28,4 @@ public class TccApplication {
public static void main(String[] args) {
SpringApplication.run(TccApplication.class, args);
}
-
- @Value("${alpha.compensation.retry.delay:3000}")
- private int delay;
-
- @Bean
- TccPendingTaskRunner tccPendingTaskRunner() {
- return new TccPendingTaskRunner(delay);
- }
-
- @Bean
- GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService, TccPendingTaskRunner tccPendingTaskRunner) {
- tccPendingTaskRunner.start();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> tccPendingTaskRunner.shutdown()));
- return new GrpcTccEventService(tccTxEventService);
- }
-
- @Bean
- ServerStartable serverStartable(GrpcServerConfig serverConfig, GrpcTccEventService grpcTccEventService) {
- ServerStartable bootstrap = new GrpcStartable(serverConfig, grpcTccEventService);
- new Thread(bootstrap::start).start();
- return bootstrap;
- }
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TestConfiguration.java
similarity index 83%
copy from alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
copy to alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TestConfiguration.java
index 479232b..2add869 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TestConfiguration.java
@@ -23,17 +23,11 @@ import org.apache.servicecomb.saga.alpha.server.ServerStartable;
import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Configuration;
-@SpringBootApplication(scanBasePackageClasses = GrpcTccEventService.class)
-@Import(GrpcServerConfig.class)
-public class TccApplication {
- public static void main(String[] args) {
- SpringApplication.run(TccApplication.class, args);
- }
+@Configuration
+public class TestConfiguration {
@Value("${alpha.compensation.retry.delay:3000}")
private int delay;
@@ -56,4 +50,5 @@ public class TccApplication {
new Thread(bootstrap::start).start();
return bootstrap;
}
+
}