You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:29 UTC
[dolphinscheduler] 02/29: Optimize MasterServer, add MasterRPCService (#10371)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 81cadd15d2b7058eb94f84ad29d25f71324d4253
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Jun 9 10:00:05 2022 +0800
Optimize MasterServer, add MasterRPCService (#10371)
* Optimize MasterServer, avoid NPE
(cherry picked from commit 3ecbee3885090b987e120667e1ccf6f6c24d1df8)
---
.../apache/dolphinscheduler/common/Constants.java | 2 +-
.../server/master/MasterServer.java | 66 ++---------
.../cache/ProcessInstanceExecCacheManager.java | 8 +-
.../impl/ProcessInstanceExecCacheManagerImpl.java | 10 +-
.../processor/queue/StateEventResponseService.java | 4 +-
...ExecuteThread.java => TaskExecuteRunnable.java} | 37 +++---
.../processor/queue/TaskExecuteThreadPool.java | 16 +--
.../registry/MasterConnectionStateListener.java | 62 ++++++++++
.../master/registry/MasterRegistryClient.java | 84 ++++---------
.../MasterRPCServer.java} | 130 ++++-----------------
.../server/master/runner/EventExecuteService.java | 2 +-
.../master/runner/FailoverExecuteThread.java | 2 +
.../master/runner/MasterSchedulerService.java | 2 +-
.../master/runner/StateWheelExecuteThread.java | 24 ++--
...uteThread.java => WorkflowExecuteRunnable.java} | 100 ++++++++--------
.../master/runner/WorkflowExecuteThreadPool.java | 10 +-
.../server/master/service/FailoverService.java | 33 ++++--
...hreadTest.java => WorkflowExecuteTaskTest.java} | 19 ++-
.../ProcessInstanceExecCacheManagerImplTest.java | 12 +-
.../master/registry/MasterRegistryClientTest.java | 7 --
.../server/registry/HeartBeatTask.java | 2 +-
.../service/registry/RegistryClient.java | 3 +-
22 files changed, 273 insertions(+), 362 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 860e493ee1..0382b53c2c 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -795,7 +795,7 @@ public final class Constants {
*/
public static final String PSTREE = "pstree";
- public static final Boolean KUBERNETES_MODE = !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
+ public static final boolean KUBERNETES_MODE = !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
/**
* dry run flag
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6ab0d4e51a..cb01e14504 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -20,18 +20,8 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.remote.NettyRemotingServer;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
-import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
@@ -58,14 +48,9 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
public class MasterServer implements IStoppable {
private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
- @Autowired
- private MasterConfig masterConfig;
-
@Autowired
private SpringApplicationContext springApplicationContext;
- private NettyRemotingServer nettyRemotingServer;
-
@Autowired
private MasterRegistryClient masterRegistryClient;
@@ -78,24 +63,6 @@ public class MasterServer implements IStoppable {
@Autowired
private Scheduler scheduler;
- @Autowired
- private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
-
- @Autowired
- private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
-
- @Autowired
- private TaskEventProcessor taskEventProcessor;
-
- @Autowired
- private StateEventProcessor stateEventProcessor;
-
- @Autowired
- private CacheProcessor cacheProcessor;
-
- @Autowired
- private TaskKillResponseProcessor taskKillResponseProcessor;
-
@Autowired
private EventExecuteService eventExecuteService;
@@ -103,7 +70,7 @@ public class MasterServer implements IStoppable {
private FailoverExecuteThread failoverExecuteThread;
@Autowired
- private LoggerRequestProcessor loggerRequestProcessor;
+ private MasterRPCServer masterRPCServer;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
@@ -115,25 +82,8 @@ public class MasterServer implements IStoppable {
*/
@PostConstruct
public void run() throws SchedulerException {
- // init remoting server
- NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(masterConfig.getListenPort());
- this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
-
- // logger server
- this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
-
- this.nettyRemotingServer.start();
+ // init rpc server
+ this.masterRPCServer.start();
// install task plugin
this.taskPluginManager.installPlugin();
@@ -168,6 +118,7 @@ public class MasterServer implements IStoppable {
try {
// execute only once
if (Stopper.isStopped()) {
+ logger.warn("MasterServer has been stopped ..., current cause: {}", cause);
return;
}
@@ -184,10 +135,13 @@ public class MasterServer implements IStoppable {
}
// close
this.masterSchedulerService.close();
- this.nettyRemotingServer.close();
+ this.masterRPCServer.close();
this.masterRegistryClient.closeRegistry();
- // close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
+ // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
+ // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
+
+ logger.info("MasterServer stopped...");
} catch (Exception e) {
logger.error("master server stop exception ", e);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
index 5fdf64493c..2f5f6dc472 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.cache;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
@@ -32,7 +32,7 @@ public interface ProcessInstanceExecCacheManager {
* @param processInstanceId processInstanceId
* @return WorkflowExecuteThread
*/
- WorkflowExecuteThread getByProcessInstanceId(int processInstanceId);
+ WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);
/**
* judge the process instance does it exist
@@ -55,12 +55,12 @@ public interface ProcessInstanceExecCacheManager {
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
- void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread);
+ void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread);
/**
* get all WorkflowExecuteThread from cache
*
* @return all WorkflowExecuteThread in cache
*/
- Collection<WorkflowExecuteThread> getAll();
+ Collection<WorkflowExecuteRunnable> getAll();
}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
index 1d0ab4841a..137bd4ecc5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
@@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,10 +33,10 @@ import com.google.common.collect.ImmutableList;
@Component
public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager {
- private final ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps = new ConcurrentHashMap<>();
@Override
- public WorkflowExecuteThread getByProcessInstanceId(int processInstanceId) {
+ public WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId) {
return processInstanceExecMaps.get(processInstanceId);
}
@@ -51,7 +51,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
}
@Override
- public void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread) {
+ public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) {
if (workflowExecuteThread == null) {
return;
}
@@ -59,7 +59,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
}
@Override
- public Collection<WorkflowExecuteThread> getAll() {
+ public Collection<WorkflowExecuteRunnable> getAll() {
return ImmutableList.copyOf(processInstanceExecMaps.values());
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index cba592c63d..83772a1054 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import java.util.ArrayList;
@@ -135,7 +135,7 @@ public class StateEventResponseService {
return;
}
- WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+ WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
switch (stateEvent.getType()) {
case TASK_STATE_CHANGE:
workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
similarity index 84%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 47b190e246..593567c6eb 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -25,11 +25,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
@@ -40,9 +41,9 @@ import io.netty.channel.Channel;
/**
* task execute thread
*/
-public class TaskExecuteThread {
+public class TaskExecuteRunnable implements Runnable {
- private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
+ private static final Logger logger = LoggerFactory.getLogger(TaskExecuteRunnable.class);
private final int processInstanceId;
@@ -56,8 +57,8 @@ public class TaskExecuteThread {
private DataQualityResultOperator dataQualityResultOperator;
- public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
- ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
+ public TaskExecuteRunnable(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
+ ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
this.processInstanceId = processInstanceId;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
@@ -65,6 +66,7 @@ public class TaskExecuteThread {
this.dataQualityResultOperator = dataQualityResultOperator;
}
+ @Override
public void run() {
while (!this.events.isEmpty()) {
TaskEvent event = this.events.peek();
@@ -113,12 +115,12 @@ public class TaskExecuteThread {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
- TaskInstance taskInstance;
- WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
- if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
- taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+ Optional<TaskInstance> taskInstance;
+ WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
+ taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
} else {
- taskInstance = processService.findTaskInstanceById(taskInstanceId);
+ taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
}
switch (event) {
@@ -148,11 +150,12 @@ public class TaskExecuteThread {
/**
* handle dispatch event
*/
- private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
- if (taskInstance == null) {
+ private void handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
+ if (!taskInstanceOptional.isPresent()) {
logger.error("taskInstance is null");
return;
}
+ TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
return;
}
@@ -164,10 +167,11 @@ public class TaskExecuteThread {
/**
* handle running event
*/
- private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+ private void handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
try {
- if (taskInstance != null) {
+ if (taskInstanceOptional.isPresent()) {
+ TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
} else {
@@ -194,10 +198,11 @@ public class TaskExecuteThread {
/**
* handle result event
*/
- private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+ private void handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
try {
- if (taskInstance != null) {
+ if (taskInstanceOptional.isPresent()) {
+ TaskInstance taskInstance = taskInstanceOptional.get();
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
taskInstance.setStartTime(taskEvent.getStartTime());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index c9c2868d36..323ea86411 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -40,7 +40,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class);
- private final ConcurrentHashMap<String, TaskExecuteThread> multiThreadFilterMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, TaskExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap<>();
@Autowired
private MasterConfig masterConfig;
@@ -67,7 +67,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* task event thread map
*/
- private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Integer, TaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
@@ -83,26 +83,26 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
return;
}
if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
- TaskExecuteThread taskExecuteThread = new TaskExecuteThread(
+ TaskExecuteRunnable taskExecuteThread = new TaskExecuteRunnable(
taskEvent.getProcessInstanceId(),
processService, workflowExecuteThreadPool,
processInstanceExecCacheManager,
dataQualityResultOperator);
taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
}
- TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
- if (taskExecuteThread != null) {
- taskExecuteThread.addEvent(taskEvent);
+ TaskExecuteRunnable taskExecuteRunnable= taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
+ if (taskExecuteRunnable != null) {
+ taskExecuteRunnable.addEvent(taskEvent);
}
}
public void eventHandler() {
- for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) {
+ for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) {
executeEvent(taskExecuteThread);
}
}
- public void executeEvent(TaskExecuteThread taskExecuteThread) {
+ public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
if (taskExecuteThread.eventSize() == 0) {
return;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
new file mode 100644
index 0000000000..bc1b217f9e
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterConnectionStateListener implements ConnectionListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(MasterConnectionStateListener.class);
+
+ private final String masterNodePath;
+ private final RegistryClient registryClient;
+
+ public MasterConnectionStateListener(String masterNodePath, RegistryClient registryClient) {
+ this.masterNodePath = checkNotNull(masterNodePath);
+ this.registryClient = checkNotNull(registryClient);
+ }
+
+ @Override
+ public void onUpdate(ConnectionState state) {
+ switch (state) {
+ case CONNECTED:
+ logger.debug("registry connection state is {}", state);
+ break;
+ case SUSPENDED:
+ logger.warn("registry connection state is {}, ready to retry connection", state);
+ break;
+ case RECONNECTED:
+ logger.debug("registry connection state is {}, clean the node info", state);
+ registryClient.remove(masterNodePath);
+ registryClient.persistEphemeral(masterNodePath, "");
+ break;
+ case DISCONNECTED:
+ logger.warn("registry connection state is {}, ready to stop myself", state);
+ registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
+ break;
+ default:
+ }
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index c79f9b7071..5cf1c945af 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -24,35 +24,18 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
-import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
-import java.time.Duration;
import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -65,9 +48,8 @@ import org.springframework.stereotype.Component;
import com.google.common.collect.Sets;
/**
- * zookeeper master client
- * <p>
- * single instance
+ * <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
+ * <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient {
@@ -101,8 +83,10 @@ public class MasterRegistryClient {
* master startup time, ms
*/
private long startupTime;
+ private String masterAddress;
public void init() {
+ this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
@@ -111,11 +95,12 @@ public class MasterRegistryClient {
try {
// master registry
registry();
-
+ registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(),
+ registryClient));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
logger.error("master start up exception", e);
- throw new RuntimeException("master start up error", e);
+ throw new RegistryException("Master registry client start up error", e);
}
}
@@ -198,24 +183,25 @@ public class MasterRegistryClient {
}
/**
- * registry
+ * Registry the current master server itself to registry.
*/
- public void registry() {
- String address = NetUtils.getAddr(masterConfig.getListenPort());
- String localNodePath = getMasterPath();
+ void registry() {
+ logger.info("master node : {} registering to registry center...", masterAddress);
+ String localNodePath = getCurrentNodePath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- masterConfig.getMaxCpuLoadAvg(),
- masterConfig.getReservedMemory(),
- Sets.newHashSet(getMasterPath()),
- Constants.MASTER_TYPE,
- registryClient);
+ masterConfig.getMaxCpuLoadAvg(),
+ masterConfig.getReservedMemory(),
+ Sets.newHashSet(localNodePath),
+ Constants.MASTER_TYPE,
+ registryClient);
// remove before persist
registryClient.remove(localNodePath);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
+ logger.warn("The current master server node:{} cannot find in registry....", NetUtils.getHost());
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
@@ -225,37 +211,17 @@ public class MasterRegistryClient {
// delete dead server
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
- registryClient.addConnectionStateListener(this::handleConnectionState);
- this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
- logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
-
- }
+ this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS);
+ logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s",
+ masterAddress,
+ masterHeartbeatInterval);
- public void handleConnectionState(ConnectionState state) {
- String localNodePath = getMasterPath();
- switch (state) {
- case CONNECTED:
- logger.debug("registry connection state is {}", state);
- break;
- case SUSPENDED:
- logger.warn("registry connection state is {}, ready to retry connection", state);
- break;
- case RECONNECTED:
- logger.debug("registry connection state is {}, clean the node info", state);
- registryClient.persistEphemeral(localNodePath, "");
- break;
- case DISCONNECTED:
- logger.warn("registry connection state is {}, ready to stop myself", state);
- registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
- break;
- default:
- }
}
public void deregister() {
try {
String address = getLocalAddress();
- String localNodePath = getMasterPath();
+ String localNodePath = getCurrentNodePath();
registryClient.remove(localNodePath);
logger.info("master node : {} unRegistry to register center.", address);
heartBeatExecutor.shutdown();
@@ -269,7 +235,7 @@ public class MasterRegistryClient {
/**
* get master path
*/
- private String getMasterPath() {
+ private String getCurrentNodePath() {
String address = getLocalAddress();
return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
similarity index 51%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 6ab0d4e51a..e0b65dd077 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -15,11 +15,8 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master;
+package org.apache.dolphinscheduler.server.master.rpc;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@@ -31,52 +28,27 @@ import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
-import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
-import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
-import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import javax.annotation.PostConstruct;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.cache.annotation.EnableCaching;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.transaction.annotation.EnableTransactionManagement;
-
-@SpringBootApplication
-@ComponentScan("org.apache.dolphinscheduler")
-@EnableTransactionManagement
-@EnableCaching
-public class MasterServer implements IStoppable {
- private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
+import org.springframework.stereotype.Service;
- @Autowired
- private MasterConfig masterConfig;
+/**
+ * Master RPC Server, used to send/receive request to other system.
+ */
+@Service
+public class MasterRPCServer implements AutoCloseable {
- @Autowired
- private SpringApplicationContext springApplicationContext;
+ private static final Logger logger = LoggerFactory.getLogger(MasterRPCServer.class);
private NettyRemotingServer nettyRemotingServer;
@Autowired
- private MasterRegistryClient masterRegistryClient;
-
- @Autowired
- private TaskPluginManager taskPluginManager;
-
- @Autowired
- private MasterSchedulerService masterSchedulerService;
-
- @Autowired
- private Scheduler scheduler;
+ private MasterConfig masterConfig;
@Autowired
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@@ -97,24 +69,13 @@ public class MasterServer implements IStoppable {
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
- private EventExecuteService eventExecuteService;
-
- @Autowired
- private FailoverExecuteThread failoverExecuteThread;
+ private TaskRecallProcessor taskRecallProcessor;
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
- public static void main(String[] args) {
- Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
- SpringApplication.run(MasterServer.class);
- }
-
- /**
- * run master server
- */
@PostConstruct
- public void run() throws SchedulerException {
+ private void init() {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
@@ -126,6 +87,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
@@ -134,67 +96,19 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
-
- // install task plugin
- this.taskPluginManager.installPlugin();
-
- // self tolerant
- this.masterRegistryClient.init();
- this.masterRegistryClient.start();
- this.masterRegistryClient.setRegistryStoppable(this);
-
- this.masterSchedulerService.init();
- this.masterSchedulerService.start();
-
- this.eventExecuteService.start();
- this.failoverExecuteThread.start();
-
- this.scheduler.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- if (Stopper.isRunning()) {
- close("shutdownHook");
- }
- }));
}
- /**
- * gracefully close
- *
- * @param cause close cause
- */
- public void close(String cause) {
-
- try {
- // execute only once
- if (Stopper.isStopped()) {
- return;
- }
-
- logger.info("master server is stopping ..., cause : {}", cause);
-
- // set stop signal is true
- Stopper.stop();
-
- try {
- // thread sleep 3 seconds for thread quietly stop
- Thread.sleep(3000L);
- } catch (Exception e) {
- logger.warn("thread sleep exception ", e);
- }
- // close
- this.masterSchedulerService.close();
- this.nettyRemotingServer.close();
- this.masterRegistryClient.closeRegistry();
- // close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
- springApplicationContext.close();
- } catch (Exception e) {
- logger.error("master server stop exception ", e);
- }
+ public void start() {
+ logger.info("Starting Master RPC Server...");
+ this.nettyRemotingServer.start();
+ logger.info("Started Master RPC Server...");
}
@Override
- public void stop(String cause) {
- close(cause);
+ public void close() {
+ logger.info("Closing Master RPC Server...");
+ this.nettyRemotingServer.close();
+ logger.info("Closed Master RPC Server...");
}
+
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 6f6f718c2f..fc9c4fd854 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -62,7 +62,7 @@ public class EventExecuteService extends Thread {
}
private void eventHandler() {
- for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
+ for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index aa894f8b3b..5fd812f162 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -56,6 +56,8 @@ public class FailoverExecuteThread extends Thread {
logger.info("failover execute thread started");
while (Stopper.isRunning()) {
try {
+ // todo: DO we need to schedule a task to do this kind of check
+ // This kind of check may only need to be executed when a master server start
failoverService.checkMasterFailover();
} catch (Exception e) {
logger.error("failover execute error", e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 5c97c0b41f..e688863d06 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -165,7 +165,7 @@ public class MasterSchedulerService extends Thread {
continue;
}
- WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
+ WorkflowExecuteRunnable workflowExecuteThread = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index ba614e573b..7635209ebb 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.hadoop.util.ThreadUtil;
+import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
@@ -112,7 +113,7 @@ public class StateWheelExecuteThread extends Thread {
if (processInstanceId == null) {
continue;
}
- WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
processInstanceTimeoutCheckList.remove(processInstanceId);
@@ -219,20 +220,21 @@ public class StateWheelExecuteThread extends Thread {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
- WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
- TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
- if (taskInstance == null) {
+ Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+ if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
+ TaskInstance taskInstance = taskInstanceOptional.get();
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
@@ -252,7 +254,7 @@ public class StateWheelExecuteThread extends Thread {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
- WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
@@ -261,7 +263,7 @@ public class StateWheelExecuteThread extends Thread {
continue;
}
- TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
+ Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@@ -270,13 +272,14 @@ public class StateWheelExecuteThread extends Thread {
break;
}
- if (taskInstance == null) {
+ if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
+ TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.retryTaskIntervalOverTime()) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
@@ -297,20 +300,21 @@ public class StateWheelExecuteThread extends Thread {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
- WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
- TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
- if (taskInstance == null) {
+ Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+ if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
+ TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
continue;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
similarity index 96%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index fede5abad2..67bfe3c928 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -100,34 +100,34 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
- * master exec thread,split dag
+ * Workflow execute task, used to execute a workflow instance.
*/
-public class WorkflowExecuteThread {
+public class WorkflowExecuteRunnable implements Runnable {
/**
* logger of WorkflowExecuteThread
*/
- private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThread.class);
+ private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
/**
* master config
*/
- private MasterConfig masterConfig;
+ private final MasterConfig masterConfig;
/**
* process service
*/
- private ProcessService processService;
+ private final ProcessService processService;
/**
* alert manager
*/
- private ProcessAlertManager processAlertManager;
+ private final ProcessAlertManager processAlertManager;
/**
* netty executor manager
*/
- private NettyExecutorManager nettyExecutorManager;
+ private final NettyExecutorManager nettyExecutorManager;
/**
* process instance
@@ -162,7 +162,7 @@ public class WorkflowExecuteThread {
/**
* task instance hash map, taskId as key
*/
- private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
+ private final Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
/**
* running taskProcessor, taskCode as key, taskProcessor as value
@@ -174,34 +174,34 @@ public class WorkflowExecuteThread {
* valid task map, taskCode as key, taskId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
- private Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
+ private final Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
/**
* error task map, taskCode as key, taskInstanceId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
- private Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
+ private final Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
/**
* complete task map, taskCode as key, taskInstanceId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
- private Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
+ private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
/**
* depend failed task map, taskCode as key, taskId as value
*/
- private Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
+ private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
/**
* forbidden task map, code as key
*/
- private Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
+ private final Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
/**
* skip task map, code as key
*/
- private Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
+ private final Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
/**
* complement date list
@@ -211,27 +211,25 @@ public class WorkflowExecuteThread {
/**
* state event queue
*/
- private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
/**
* ready to submit task queue
*/
- private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
+ private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* wait to retry taskInstance map, taskCode as key, taskInstance as value
* before retry, the taskInstance id is 0
*/
- private Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
+ private final Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
/**
* state wheel execute thread
*/
- private StateWheelExecuteThread stateWheelExecuteThread;
+ private final StateWheelExecuteThread stateWheelExecuteThread;
/**
- * constructor of WorkflowExecuteThread
- *
* @param processInstance processInstance
* @param processService processService
* @param nettyExecutorManager nettyExecutorManager
@@ -239,12 +237,12 @@ public class WorkflowExecuteThread {
* @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread
*/
- public WorkflowExecuteThread(ProcessInstance processInstance,
- ProcessService processService,
- NettyExecutorManager nettyExecutorManager,
- ProcessAlertManager processAlertManager,
- MasterConfig masterConfig,
- StateWheelExecuteThread stateWheelExecuteThread) {
+ public WorkflowExecuteRunnable(ProcessInstance processInstance
+ , ProcessService processService
+ , NettyExecutorManager nettyExecutorManager
+ , ProcessAlertManager processAlertManager
+ , MasterConfig masterConfig
+ , StateWheelExecuteThread stateWheelExecuteThread) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
@@ -401,7 +399,10 @@ public class WorkflowExecuteThread {
return true;
}
- TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
+ Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
+ TaskInstance task = taskInstanceOptional.orElseThrow(
+ () -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId()));
+
if (task.getState() == null) {
logger.error("task state is null, state handler error: {}", stateEvent);
return true;
@@ -632,37 +633,37 @@ public class WorkflowExecuteThread {
/**
* get task instance from memory
*/
- public TaskInstance getTaskInstance(int taskInstanceId) {
+ public Optional<TaskInstance> getTaskInstance(int taskInstanceId) {
if (taskInstanceMap.containsKey(taskInstanceId)) {
- return taskInstanceMap.get(taskInstanceId);
+ return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
}
- return null;
+ return Optional.empty();
}
- public TaskInstance getTaskInstance(long taskCode) {
- if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
- return null;
+ public Optional<TaskInstance> getTaskInstance(long taskCode) {
+ if (taskInstanceMap.isEmpty()) {
+ return Optional.empty();
}
for (TaskInstance taskInstance : taskInstanceMap.values()) {
if (taskInstance.getTaskCode() == taskCode) {
- return taskInstance;
+ return Optional.of(taskInstance);
}
}
- return null;
+ return Optional.empty();
}
- public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) {
+ public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) {
if (activeTaskProcessorMaps.containsKey(taskCode)) {
- return activeTaskProcessorMaps.get(taskCode).taskInstance();
+ return Optional.ofNullable(activeTaskProcessorMaps.get(taskCode).taskInstance());
}
- return null;
+ return Optional.empty();
}
- public TaskInstance getRetryTaskInstanceByTaskCode(long taskCode) {
+ public Optional<TaskInstance> getRetryTaskInstanceByTaskCode(long taskCode) {
if (waitToRetryTaskInstanceMap.containsKey(taskCode)) {
- return waitToRetryTaskInstanceMap.get(taskCode);
+ return Optional.ofNullable(waitToRetryTaskInstanceMap.get(taskCode));
}
- return null;
+ return Optional.empty();
}
private boolean processStateChangeHandler(StateEvent stateEvent) {
@@ -697,7 +698,9 @@ public class WorkflowExecuteThread {
private boolean processBlockHandler(StateEvent stateEvent) {
try {
- TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
+ Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
+ TaskInstance task = taskInstanceOptional.orElseThrow(
+ () -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId()));
if (!checkTaskInstanceByStateEvent(stateEvent)) {
logger.error("task {} is not a blocking task", task.getTaskCode());
return false;
@@ -790,9 +793,10 @@ public class WorkflowExecuteThread {
}
/**
- * process start handle
+ * ProcessInstance start entrypoint.
*/
- public void startProcess() {
+ @Override
+ public void run() {
if (this.taskInstanceMap.size() > 0) {
return;
}
@@ -1305,9 +1309,9 @@ public class WorkflowExecuteThread {
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode);
- TaskInstance existTaskInstance = getTaskInstance(taskNodeObject.getCode());
- if (existTaskInstance != null) {
- taskInstances.add(existTaskInstance);
+ Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskNodeObject.getCode());
+ if (existTaskInstanceOptional.isPresent()) {
+ taskInstances.add(existTaskInstanceOptional.get());
continue;
}
TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
@@ -2024,8 +2028,6 @@ public class WorkflowExecuteThread {
}
}
-}
-
private void measureTaskState(StateEvent taskStateEvent) {
if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
// the event is broken
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 556f36a1b7..c3d771e365 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -68,7 +68,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* multi-thread filter, avoid handling workflow at the same time
*/
- private ConcurrentHashMap<String, WorkflowExecuteThread> multiThreadFilterMap = new ConcurrentHashMap();
+ private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap();
@PostConstruct
private void init() {
@@ -82,7 +82,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
* submit state event
*/
public void submitStateEvent(StateEvent stateEvent) {
- WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("workflowExecuteThread is null, stateEvent:{}", stateEvent);
return;
@@ -93,14 +93,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* start workflow
*/
- public void startWorkflow(WorkflowExecuteThread workflowExecuteThread) {
- submit(workflowExecuteThread::startProcess);
+ public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
+ submit(workflowExecuteThread);
}
/**
* execute workflow
*/
- public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
+ public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 2557b659e0..dad2dbfe1e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.master.service;
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
@@ -36,13 +39,15 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,12 +75,14 @@ public class FailoverService {
/**
* check master failover
*/
+ @Counted(value = "failover_scheduler_check_task_count")
+ @Timed(value = "failover_scheduler_check_task_time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
public void checkMasterFailover() {
List<String> hosts = getNeedFailoverMasterServers();
if (CollectionUtils.isEmpty(hosts)) {
return;
}
- LOGGER.info("need failover hosts:{}", hosts);
+ LOGGER.info("{} begin to failover hosts:{}", getLocalAddress(), hosts);
for (String host : hosts) {
failoverMasterWithLock(host);
@@ -114,9 +121,9 @@ public class FailoverService {
}
/**
- * failover master
- * <p>
- * failover process instance and associated task instance
+ * Failover master, will failover process instance and associated task instance.
+ * <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
+ * then the process instance will be failovered.
*
* @param masterHost master host
*/
@@ -125,11 +132,11 @@ public class FailoverService {
return;
}
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
- long startTime = System.currentTimeMillis();
+ StopWatch failoverTimeCost = StopWatch.createStarted();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
- LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
+ LOGGER.info("start master[{}] failover, need to failover process list size:{}", masterHost, needFailoverProcessInstanceList.size());
- // servers need to contains master hosts and worker hosts, otherwise the logic task will failover fail.
+ // servers need to contain master hosts and worker hosts, otherwise the logic task will failover fail.
List<Server> servers = registryClient.getServerList(NodeType.WORKER);
servers.addAll(registryClient.getServerList(NodeType.MASTER));
@@ -145,7 +152,7 @@ public class FailoverService {
}
if (serverStartupTime != null && processInstance.getRestartTime() != null
- && processInstance.getRestartTime().after(serverStartupTime)) {
+ && processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}
@@ -155,7 +162,8 @@ public class FailoverService {
processService.processNeedFailoverProcessInstances(processInstance);
}
- LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
+ failoverTimeCost.stop();
+ LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}
/**
@@ -248,7 +256,10 @@ public class FailoverService {
}
/**
- * get need failover master servers
+ * Get need failover master servers.
+ * <p>
+ * Query the process instances from database, if the processInstance's host doesn't exist in registry
+ * or the host is the currentServer, then it will need to failover.
*
* @return need failover master servers
*/
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
similarity index 95%
rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
rename to dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
index 6e1c84382d..339c5e80ae 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
@@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -37,7 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -70,10 +69,10 @@ import org.springframework.context.ApplicationContext;
* test for WorkflowExecuteThread
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({WorkflowExecuteThread.class})
-public class WorkflowExecuteThreadTest {
+@PrepareForTest({WorkflowExecuteRunnable.class})
+public class WorkflowExecuteTaskTest {
- private WorkflowExecuteThread workflowExecuteThread;
+ private WorkflowExecuteRunnable workflowExecuteThread;
private ProcessInstance processInstance;
@@ -118,9 +117,9 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
- workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread));
+ workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread));
// prepareProcess init dag
- Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
+ Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(workflowExecuteThread, new DAG());
PowerMockito.doNothing().when(workflowExecuteThread, "endProcess");
@@ -132,7 +131,7 @@ public class WorkflowExecuteThreadTest {
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_START_NODES, "1,2,3");
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam));
- Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
+ Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Method method = masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class);
method.setAccessible(true);
List<String> nodeNames = (List<String>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
@@ -158,7 +157,7 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processService.findTaskInstanceByIdList(
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
- Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
+ Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
method.setAccessible(true);
List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
@@ -202,7 +201,7 @@ public class WorkflowExecuteThreadTest {
completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
- Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
+ Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
completeTaskMapField.setAccessible(true);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
index 6fa9116e63..d7b53f3c34 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.cache.impl;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
@@ -37,7 +37,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
@Mock
- private WorkflowExecuteThread workflowExecuteThread;
+ private WorkflowExecuteRunnable workflowExecuteThread;
@Before
public void before() {
@@ -47,7 +47,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
@Test
public void testGetByProcessInstanceId() {
- WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
Assert.assertEquals("workflowExecuteThread1", workflowExecuteThread.getKey());
}
@@ -59,20 +59,20 @@ public class ProcessInstanceExecCacheManagerImplTest {
@Test
public void testCacheNull() {
processInstanceExecCacheManager.cache(2, null);
- WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
Assert.assertNull(workflowExecuteThread);
}
@Test
public void testRemoveByProcessInstanceId() {
processInstanceExecCacheManager.removeByProcessInstanceId(1);
- WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
+ WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
Assert.assertNull(workflowExecuteThread);
}
@Test
public void testGetAll() {
- Collection<WorkflowExecuteThread> workflowExecuteThreads = processInstanceExecCacheManager.getAll();
+ Collection<WorkflowExecuteRunnable> workflowExecuteThreads = processInstanceExecCacheManager.getAll();
Assert.assertEquals(1, workflowExecuteThreads.size());
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index d3d5b03de9..2a4b4c33ff 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -111,13 +111,6 @@ public class MasterRegistryClientTest {
masterRegistryClient.registry();
}
- @Test
- public void handleConnectionStateTest() {
- masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
- masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
- masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
- }
-
@Test
public void removeNodePathTest() {
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 17b4df14a9..129aaff0de 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -89,7 +89,7 @@ public class HeartBeatTask implements Runnable {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
} catch (Throwable ex) {
- logger.error("error write heartbeat info", ex);
+ logger.error("HeartBeat task execute failed", ex);
}
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index 2f98b09b34..c606007e88 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.registry;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
@@ -29,8 +30,6 @@ import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
-import static com.google.common.base.Preconditions.checkArgument;
-
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;