You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 09:51:45 UTC
[dolphinscheduler] branch 3.0.1-prepare updated: Fix workflow instance may failover many times due to doesn't check the restart time (#11445) (#12010)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
new 41a6d09c4f Fix workflow instance may failover many times due to doesn't check the restart time (#11445) (#12010)
41a6d09c4f is described below
commit 41a6d09c4f960be0f10129f52c60b894adf57a89
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Sep 17 17:51:40 2022 +0800
Fix workflow instance may failover many times due to doesn't check the restart time (#11445) (#12010)
(cherry picked from commit 0ca308629677d6df2b0d32db4d9d7f3aac78af8d)
---
.../master/service/MasterFailoverService.java | 35 ++++++++++++++--------
.../server/master/service/FailoverServiceTest.java | 5 +++-
2 files changed, 27 insertions(+), 13 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 9aa3f2d7b5..5e422f7600 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -17,6 +17,11 @@
package org.apache.dolphinscheduler.server.master.service;
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -31,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@@ -41,9 +47,9 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.time.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@@ -51,14 +57,6 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
-import lombok.NonNull;
-
@Service
public class MasterFailoverService {
@@ -70,15 +68,19 @@ public class MasterFailoverService {
private final NettyExecutorManager nettyExecutorManager;
+ private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
- @NonNull NettyExecutorManager nettyExecutorManager) {
+ @NonNull NettyExecutorManager nettyExecutorManager,
+ @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.nettyExecutorManager = nettyExecutorManager;
+ this.processInstanceExecCacheManager = processInstanceExecCacheManager;
}
@@ -282,6 +284,15 @@ public class MasterFailoverService {
// The processInstance is newly created
return false;
}
+ if (processInstance.getRestartTime() != null && processInstance.getRestartTime().after(beFailoveredMasterStartupTime)) {
+ // the processInstance is already be failovered.
+ return false;
+ }
+
+ if (processInstanceExecCacheManager.contains(processInstance.getId())) {
+ // the processInstance is a running process instance in the current master
+ return false;
+ }
return true;
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index ab71cd75cd..cd26b1a57c 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -86,6 +86,9 @@ public class FailoverServiceTest {
@Mock
private NettyExecutorManager nettyExecutorManager;
+ @Mock
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
private static int masterPort = 5678;
private static int workerPort = 1234;
@@ -104,7 +107,7 @@ public class FailoverServiceTest {
given(masterConfig.getListenPort()).willReturn(masterPort);
MasterFailoverService masterFailoverService =
- new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager);
+ new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,