You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/10/10 13:32:35 UTC

[incubator-servicecomb-saga] 06/07: SCB-909 Add Alpha TCC event scanner.

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 0a96e64c028d363c9493b2bc3f1716cffa74e2f2
Author: cherrylzhao <zh...@126.com>
AuthorDate: Sat Sep 29 22:59:04 2018 +0800

    SCB-909 Add Alpha TCC event scanner.
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 24 +++++++--
 .../alpha/server/tcc/service/TccEventScanner.java  | 58 ++++++++++++++++++++++
 .../saga/alpha/server/tcc/TccConfiguration.java    | 25 ++++++++--
 3 files changed, 98 insertions(+), 9 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index bc4bc59..ad39d62 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -36,6 +36,7 @@ import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccEventScanner;
 import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
@@ -51,6 +52,9 @@ class AlphaConfig {
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
 
+  @Value("${alpha.tx.timeout-seconds:600}")
+  private int globalTxTimeoutSeconds;
+
   @Bean
   Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
     return new ConcurrentHashMap<>();
@@ -105,18 +109,30 @@ class AlphaConfig {
   }
 
   @Bean
-  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService, TccPendingTaskRunner tccPendingTaskRunner) {
-    tccPendingTaskRunner.start();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> tccPendingTaskRunner.shutdown()));
+  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService) {
     return new GrpcTccEventService(tccTxEventService);
   }
 
   @Bean
+  TccEventScanner tccEventScanner(TccTxEventService tccTxEventService) {
+    return new TccEventScanner(tccTxEventService, delay, globalTxTimeoutSeconds);
+  }
+
+  @Bean
   ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner) {
     ServerStartable bootstrap = new GrpcStartable(serverConfig,
         new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks), grpcTccEventService);
     new Thread(bootstrap::start).start();
+
+    tccPendingTaskRunner.start();
+    tccEventScanner.start();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      tccPendingTaskRunner.shutdown();
+      tccEventScanner.shutdown();
+    }));
+
     return bootstrap;
   }
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java
new file mode 100644
index 0000000..f5c930f
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.service;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TccEventScanner {
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  private final TccTxEventService tccTxEventService;
+
+  private final int delay;
+
+  private final long globalTxTimeoutSeconds;
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public TccEventScanner(TccTxEventService tccTxEventService, int delay, long globalTxTimeoutSeconds) {
+    this.tccTxEventService = tccTxEventService;
+    this.delay = delay;
+    this.globalTxTimeoutSeconds = globalTxTimeoutSeconds;
+  }
+
+  public void start() {
+    scheduler.scheduleWithFixedDelay(() -> {
+      tccTxEventService.handleTimeoutTx(new Date(System.currentTimeMillis() - SECONDS.toMillis(globalTxTimeoutSeconds)), 1);
+      tccTxEventService.clearCompletedGlobalTx(1);
+    }, 0, delay, MILLISECONDS);
+  }
+
+  public void shutdown() {
+    scheduler.shutdown();
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
index 50f2e00..fd93419 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
@@ -21,6 +21,7 @@ import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
 import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
 import org.apache.servicecomb.saga.alpha.server.ServerStartable;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccEventScanner;
 import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
@@ -32,23 +33,37 @@ public class TccConfiguration {
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
 
+  @Value("${alpha.tx.timeout-seconds:600}")
+  private int globalTxTimeoutSeconds;
+
   @Bean
   TccPendingTaskRunner tccPendingTaskRunner() {
     return new TccPendingTaskRunner(delay);
   }
 
   @Bean
-  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService, TccPendingTaskRunner tccPendingTaskRunner) {
-    tccPendingTaskRunner.start();
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> tccPendingTaskRunner.shutdown()));
+  GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService) {
     return new GrpcTccEventService(tccTxEventService);
   }
 
   @Bean
-  ServerStartable serverStartable(GrpcServerConfig serverConfig, GrpcTccEventService grpcTccEventService) {
+  TccEventScanner tccEventScanner(TccTxEventService tccTxEventService) {
+    return new TccEventScanner(tccTxEventService, delay, globalTxTimeoutSeconds);
+  }
+
+  @Bean
+  ServerStartable serverStartable(GrpcServerConfig serverConfig, GrpcTccEventService grpcTccEventService,
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner) {
     ServerStartable bootstrap = new GrpcStartable(serverConfig, grpcTccEventService);
     new Thread(bootstrap::start).start();
+
+    tccPendingTaskRunner.start();
+    tccEventScanner.start();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      tccPendingTaskRunner.shutdown();
+      tccEventScanner.shutdown();
+    }));
+
     return bootstrap;
   }
-
 }