You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/10/10 13:32:32 UTC

[incubator-servicecomb-saga] 03/07: SCB-909 Revise finding coordinate event from participate.

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 2285aefe0fb1de94ab2bbb1d012dc699ad6332bf
Author: cherrylzhao <zh...@126.com>
AuthorDate: Fri Sep 28 21:57:07 2018 +0800

    SCB-909 Revise finding coordinate event from participate.
---
 .../server/tcc/callback/TccCallbackEngine.java     | 37 +++++++++++-----------
 .../tcc/service/MemoryTxEventRepository.java       | 17 +++++++---
 .../server/tcc/service/RDBTxEventRepository.java   |  5 +++
 .../server/tcc/service/TccTxEventRepository.java   |  2 ++
 .../saga/alpha/server/tcc/TccApplication.java      | 28 ----------------
 ...{TccApplication.java => TestConfiguration.java} | 13 +++-----
 6 files changed, 42 insertions(+), 60 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
index 41a5191..7f684f4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
@@ -17,12 +17,13 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.callback;
 
-import com.google.common.collect.Lists;
 import java.lang.invoke.MethodHandles;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEventRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventRepository;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,25 +38,23 @@ public class TccCallbackEngine implements CallbackEngine {
   private OmegaCallbackWrapper omegaCallbackWrapper;
 
   @Autowired
-  private ParticipatedEventRepository participatedEventRepository;
+  private TccTxEventRepository tccTxEventRepository;
 
   @Override
   public boolean execute(GlobalTxEvent request) {
-    boolean result = true;
-    List<ParticipatedEvent> events = participatedEventRepository.findByGlobalTxId(request.getGlobalTxId()).orElse(
-        Lists.newArrayList());
-    for (ParticipatedEvent event : events) {
-      try {
-        // only invoke the event is succeed
-        if (event.getStatus().equals(TransactionStatus.Succeed.toString())) {
-          omegaCallbackWrapper.invoke(event, TransactionStatus.valueOf(request.getStatus()));
-        }
-      } catch (Exception ex) {
-        logError(event, ex);
-        result = false;
-      }
-    }
-    return result;
+    AtomicBoolean result = new AtomicBoolean(true);
+    tccTxEventRepository.findParticipatedByGlobalTxId(request.getGlobalTxId())
+        .orElseGet(ArrayList::new).stream()
+        .filter(e -> e.getStatus().equals(TransactionStatus.Succeed.toString()))
+        .collect(Collectors.toList()).forEach(e -> {
+          try {
+            omegaCallbackWrapper.invoke(e, TransactionStatus.valueOf(request.getStatus()));
+          } catch (Exception ex) {
+            logError(e, ex);
+            result.set(false);
+          }
+        });
+    return result.get();
   }
 
   private void logError(ParticipatedEvent event, Exception ex) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
index f057a9d..2ec392b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/MemoryTxEventRepository.java
@@ -26,12 +26,11 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-
+import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.GlobalTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.ParticipatedEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.jpa.TccTxType;
-import org.apache.servicecomb.saga.alpha.server.tcc.jpa.EventConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Profile;
@@ -77,6 +76,16 @@ public class MemoryTxEventRepository implements TccTxEventRepository {
   }
 
   @Override
+  public Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String globalTxId) {
+    return Optional.of(
+        findByGlobalTxId(globalTxId)
+            .orElse(new ArrayList<>()).stream()
+        .filter(e -> TccTxType.PARTICIPATED.name().equals(e.getTxType()))
+        .map(EventConverter::convertToParticipatedEvent).collect(Collectors.toList())
+    );
+  }
+
+  @Override
   public Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String globalTxId, TccTxType tccTxType) {
     Set<TccTxEvent> events = tccEventMap.get(globalTxId);
     if ( events != null) {
@@ -101,8 +110,8 @@ public class MemoryTxEventRepository implements TccTxEventRepository {
   @Override
   public Iterable<TccTxEvent> findAll() {
     List<TccTxEvent> events = new ArrayList<>();
-    for (String golableTxId : tccEventMap.keySet()) {
-      events.addAll(tccEventMap.get(golableTxId));
+    for (String globalTxId : tccEventMap.keySet()) {
+      events.addAll(tccEventMap.get(globalTxId));
     }
     return events;
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
index 938b63a..745441a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/RDBTxEventRepository.java
@@ -81,6 +81,11 @@ public class RDBTxEventRepository implements TccTxEventRepository {
   }
 
   @Override
+  public Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String globalTxId) {
+    return participatedEventRepository.findByGlobalTxId(globalTxId);
+  }
+
+  @Override
   public Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String globalTxId, TccTxType tccTxType) {
     return tccTxEventDBRepository.findByGlobalTxIdAndTxType(globalTxId, tccTxType.name());
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
index 65ad2c0..f167a85 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccTxEventRepository.java
@@ -37,6 +37,8 @@ public interface TccTxEventRepository {
 
   Optional<List<TccTxEvent>> findByGlobalTxId(String globalTxId);
 
+  Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String globalTxId);
+
   Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String globalTxId, TccTxType tccTxType);
 
   Optional<TccTxEvent> findByUniqueKey(String globalTxId, String localTxId, TccTxType tccTxType);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
index 479232b..d24425c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
@@ -18,14 +18,8 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
-import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
-import org.apache.servicecomb.saga.alpha.server.ServerStartable;
-import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
-import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
 
 @SpringBootApplication(scanBasePackageClasses = GrpcTccEventService.class)
@@ -34,26 +28,4 @@ public class TccApplication {
   public static void main(String[] args) {
     SpringApplication.run(TccApplication.class, args);
   }
-
-  @Value("${alpha.compensation.retry.delay:3000}")
-  private int delay;
-
-  @Bean
-  TccPendingTaskRunner tccPendingTaskRunner() {
-    return new TccPendingTaskRunner(delay);
-  }
-
-  @Bean
-  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService, TccPendingTaskRunner tccPendingTaskRunner) {
-    tccPendingTaskRunner.start();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> tccPendingTaskRunner.shutdown()));
-    return new GrpcTccEventService(tccTxEventService);
-  }
-
-  @Bean
-  ServerStartable serverStartable(GrpcServerConfig serverConfig, GrpcTccEventService grpcTccEventService) {
-    ServerStartable bootstrap = new GrpcStartable(serverConfig, grpcTccEventService);
-    new Thread(bootstrap::start).start();
-    return bootstrap;
-  }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TestConfiguration.java
similarity index 83%
copy from alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
copy to alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TestConfiguration.java
index 479232b..2add869 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccApplication.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TestConfiguration.java
@@ -23,17 +23,11 @@ import org.apache.servicecomb.saga.alpha.server.ServerStartable;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Configuration;
 
-@SpringBootApplication(scanBasePackageClasses = GrpcTccEventService.class)
-@Import(GrpcServerConfig.class)
-public class TccApplication {
-  public static void main(String[] args) {
-    SpringApplication.run(TccApplication.class, args);
-  }
+@Configuration
+public class TestConfiguration {
 
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
@@ -56,4 +50,5 @@ public class TccApplication {
     new Thread(bootstrap::start).start();
     return bootstrap;
   }
+
 }