You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/10/10 13:32:35 UTC
[incubator-servicecomb-saga] 06/07: SCB-909 Add Alpha TCC event
scanner.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 0a96e64c028d363c9493b2bc3f1716cffa74e2f2
Author: cherrylzhao <zh...@126.com>
AuthorDate: Sat Sep 29 22:59:04 2018 +0800
SCB-909 Add Alpha TCC event scanner.
---
.../servicecomb/saga/alpha/server/AlphaConfig.java | 24 +++++++--
.../alpha/server/tcc/service/TccEventScanner.java | 58 ++++++++++++++++++++++
.../saga/alpha/server/tcc/TccConfiguration.java | 25 ++++++++--
3 files changed, 98 insertions(+), 9 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index bc4bc59..ad39d62 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -36,6 +36,7 @@ import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccEventScanner;
import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.domain.EntityScan;
@@ -51,6 +52,9 @@ class AlphaConfig {
@Value("${alpha.compensation.retry.delay:3000}")
private int delay;
+ @Value("${alpha.tx.timeout-seconds:600}")
+ private int globalTxTimeoutSeconds;
+
@Bean
Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
return new ConcurrentHashMap<>();
@@ -105,18 +109,30 @@ class AlphaConfig {
}
@Bean
- GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService, TccPendingTaskRunner tccPendingTaskRunner) {
- tccPendingTaskRunner.start();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> tccPendingTaskRunner.shutdown()));
+ GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService) {
return new GrpcTccEventService(tccTxEventService);
}
@Bean
+ TccEventScanner tccEventScanner(TccTxEventService tccTxEventService) {
+ return new TccEventScanner(tccTxEventService, delay, globalTxTimeoutSeconds);
+ }
+
+ @Bean
ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService) {
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
+ TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner) {
ServerStartable bootstrap = new GrpcStartable(serverConfig,
new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks), grpcTccEventService);
new Thread(bootstrap::start).start();
+
+ tccPendingTaskRunner.start();
+ tccEventScanner.start();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ tccPendingTaskRunner.shutdown();
+ tccEventScanner.shutdown();
+ }));
+
return bootstrap;
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java
new file mode 100644
index 0000000..f5c930f
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/service/TccEventScanner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.service;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TccEventScanner {
+
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ private final TccTxEventService tccTxEventService;
+
+ private final int delay;
+
+ private final long globalTxTimeoutSeconds;
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public TccEventScanner(TccTxEventService tccTxEventService, int delay, long globalTxTimeoutSeconds) {
+ this.tccTxEventService = tccTxEventService;
+ this.delay = delay;
+ this.globalTxTimeoutSeconds = globalTxTimeoutSeconds;
+ }
+
+ public void start() {
+ scheduler.scheduleWithFixedDelay(() -> {
+ tccTxEventService.handleTimeoutTx(new Date(System.currentTimeMillis() - SECONDS.toMillis(globalTxTimeoutSeconds)), 1);
+ tccTxEventService.clearCompletedGlobalTx(1);
+ }, 0, delay, MILLISECONDS);
+ }
+
+ public void shutdown() {
+ scheduler.shutdown();
+ }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
index 50f2e00..fd93419 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/tcc/TccConfiguration.java
@@ -21,6 +21,7 @@ import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
import org.apache.servicecomb.saga.alpha.server.ServerStartable;
import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccPendingTaskRunner;
+import org.apache.servicecomb.saga.alpha.server.tcc.service.TccEventScanner;
import org.apache.servicecomb.saga.alpha.server.tcc.service.TccTxEventService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@@ -32,23 +33,37 @@ public class TccConfiguration {
@Value("${alpha.compensation.retry.delay:3000}")
private int delay;
+ @Value("${alpha.tx.timeout-seconds:600}")
+ private int globalTxTimeoutSeconds;
+
@Bean
TccPendingTaskRunner tccPendingTaskRunner() {
return new TccPendingTaskRunner(delay);
}
@Bean
- GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService, TccPendingTaskRunner tccPendingTaskRunner) {
- tccPendingTaskRunner.start();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> tccPendingTaskRunner.shutdown()));
+ GrpcTccEventService grpcTccEventService(TccTxEventService tccTxEventService) {
return new GrpcTccEventService(tccTxEventService);
}
@Bean
- ServerStartable serverStartable(GrpcServerConfig serverConfig, GrpcTccEventService grpcTccEventService) {
+ TccEventScanner tccEventScanner(TccTxEventService tccTxEventService) {
+ return new TccEventScanner(tccTxEventService, delay, globalTxTimeoutSeconds);
+ }
+
+ @Bean
+ ServerStartable serverStartable(GrpcServerConfig serverConfig, GrpcTccEventService grpcTccEventService,
+ TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner) {
ServerStartable bootstrap = new GrpcStartable(serverConfig, grpcTccEventService);
new Thread(bootstrap::start).start();
+
+ tccPendingTaskRunner.start();
+ tccEventScanner.start();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ tccPendingTaskRunner.shutdown();
+ tccEventScanner.shutdown();
+ }));
+
return bootstrap;
}
-
}