You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:35 UTC
[dolphinscheduler] 08/29: [Fix-10413] Fix Master startup failure the server still hang (#10500)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 90c87f012129e812d87a3f6e847de16f525611ea
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Jun 20 22:35:06 2022 +0800
[Fix-10413] Fix Master startup failure the server still hang (#10500)
* Fix Master startup failure the server still hang
(cherry picked from commit 117f78ec4b0e2438082a3e25158492eca1b9b1be)
---
.../common/storage/StorageOperate.java | 32 +++++++++----------
.../common/thread/BaseDaemonThread.java | 36 ++++++++++++++++++++++
.../master/consumer/TaskPriorityQueueConsumer.java | 11 ++++---
.../processor/queue/StateEventResponseService.java | 8 +++--
.../master/processor/queue/TaskEventService.java | 17 +++++++---
.../server/master/runner/EventExecuteService.java | 8 +++--
.../master/runner/FailoverExecuteThread.java | 10 ++++--
.../master/runner/MasterSchedulerService.java | 10 ++++--
.../master/runner/StateWheelExecuteThread.java | 7 ++++-
9 files changed, 103 insertions(+), 36 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java
index 5248586de1..7854eaa032 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java
@@ -28,21 +28,21 @@ import java.util.List;
public interface StorageOperate {
- public static final String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
+ String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
/**
* if the resource of tenant 's exist, the resource of folder will be created
* @param tenantCode
* @throws Exception
*/
- public void createTenantDirIfNotExists(String tenantCode) throws Exception;
+ void createTenantDirIfNotExists(String tenantCode) throws Exception;
/**
* get the resource directory of tenant
* @param tenantCode
* @return
*/
- public String getResDir(String tenantCode);
+ String getResDir(String tenantCode);
/**
* return the udf directory of tenant
@@ -50,7 +50,7 @@ public interface StorageOperate {
* @return
*/
- public String getUdfDir(String tenantCode);
+ String getUdfDir(String tenantCode);
/**
* create the directory that the path of tenant wanted to create
@@ -59,7 +59,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
- public boolean mkdir(String tenantCode,String path) throws IOException;
+ boolean mkdir(String tenantCode, String path) throws IOException;
/**
* get the path of the resource file
@@ -67,7 +67,7 @@ public interface StorageOperate {
* @param fullName
* @return
*/
- public String getResourceFileName(String tenantCode, String fullName);
+ String getResourceFileName(String tenantCode, String fullName);
/**
* get the path of the file
@@ -76,7 +76,7 @@ public interface StorageOperate {
* @param fileName
* @return
*/
- public String getFileName(ResourceType resourceType, String tenantCode, String fileName);
+ String getFileName(ResourceType resourceType, String tenantCode, String fileName);
/**
* predicate if the resource of tenant exists
@@ -85,7 +85,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
- public boolean exists(String tenantCode,String fileName) throws IOException;
+ boolean exists(String tenantCode, String fileName) throws IOException;
/**
* delete the resource of filePath
@@ -96,7 +96,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
- public boolean delete(String tenantCode,String filePath, boolean recursive) throws IOException;
+ boolean delete(String tenantCode, String filePath, boolean recursive) throws IOException;
/**
* copy the file from srcPath to dstPath
@@ -107,7 +107,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
- public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
+ boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
/**
* get the root path of the tenant with resourceType
@@ -115,7 +115,7 @@ public interface StorageOperate {
* @param tenantCode
* @return
*/
- public String getDir(ResourceType resourceType, String tenantCode);
+ String getDir(ResourceType resourceType, String tenantCode);
/**
* upload the local srcFile to dstPath
@@ -127,7 +127,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
- public boolean upload(String tenantCode,String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
+ boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
/**
* download the srcPath to local
@@ -138,7 +138,7 @@ public interface StorageOperate {
* @param overwrite
* @throws IOException
*/
- public void download(String tenantCode,String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException;
+ void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException;
/**
* vim the context of filePath
@@ -149,7 +149,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
- public List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException;
+ List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException;
/**
* delete the files and directory of the tenant
@@ -157,13 +157,13 @@ public interface StorageOperate {
* @param tenantCode
* @throws Exception
*/
- public void deleteTenant(String tenantCode) throws Exception;
+ void deleteTenant(String tenantCode) throws Exception;
/**
* return the storageType
*
* @return
*/
- public ResUploadType returnStorageType();
+ ResUploadType returnStorageType();
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
new file mode 100644
index 0000000000..88a44004cb
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.thread;
+
+/**
+ * All thread used in DolphinScheduler should extend with this class to avoid the server hang issue.
+ */
+public abstract class BaseDaemonThread extends Thread {
+
+ protected BaseDaemonThread(Runnable runnable) {
+ super(runnable);
+ this.setDaemon(true);
+ }
+
+ protected BaseDaemonThread(String threadName) {
+ super();
+ this.setName(threadName);
+ this.setDaemon(true);
+ }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index d75595de1a..eab33a07a6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -60,7 +61,7 @@ import org.springframework.stereotype.Component;
* TaskUpdateQueue consumer
*/
@Component
-public class TaskPriorityQueueConsumer extends Thread {
+public class TaskPriorityQueueConsumer extends BaseDaemonThread {
/**
* logger of TaskUpdateQueueConsumer
@@ -108,6 +109,10 @@ public class TaskPriorityQueueConsumer extends Thread {
*/
private ThreadPoolExecutor consumerThreadPoolExecutor;
+ protected TaskPriorityQueueConsumer() {
+ super("TaskPriorityQueueConsumeThread");
+ }
+
@PostConstruct
public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
@@ -198,10 +203,8 @@ public class TaskPriorityQueueConsumer extends Thread {
} else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
}
- } catch (RuntimeException e) {
+ } catch (RuntimeException | ExecuteException e) {
logger.error("Master dispatch task to worker error: ", e);
- } catch (ExecuteException e) {
- logger.error("Master dispatch task to worker error: {}", e);
}
return result;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index 83772a1054..e34dedca67 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
@@ -70,7 +71,6 @@ public class StateEventResponseService {
@PostConstruct
public void start() {
this.responseWorker = new StateEventResponseWorker();
- this.responseWorker.setName("StateEventResponseWorker");
this.responseWorker.start();
}
@@ -101,7 +101,11 @@ public class StateEventResponseService {
/**
* task worker thread
*/
- class StateEventResponseWorker extends Thread {
+ class StateEventResponseWorker extends BaseDaemonThread {
+
+ protected StateEventResponseWorker() {
+ super("StateEventResponseWorker");
+ }
@Override
public void run() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index 924dd131f8..bed3b3d9ed 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import java.util.ArrayList;
@@ -63,11 +64,9 @@ public class TaskEventService {
@PostConstruct
public void start() {
this.taskEventThread = new TaskEventThread();
- this.taskEventThread.setName("TaskEventThread");
this.taskEventThread.start();
this.taskEventHandlerThread = new TaskEventHandlerThread();
- this.taskEventHandlerThread.setName("TaskEventHandlerThread");
this.taskEventHandlerThread.start();
}
@@ -85,7 +84,7 @@ public class TaskEventService {
taskExecuteThreadPool.eventHandler();
}
} catch (Exception e) {
- logger.error("stop error:", e);
+ logger.error("TaskEventService stop error:", e);
}
}
@@ -101,7 +100,11 @@ public class TaskEventService {
/**
* task worker thread
*/
- class TaskEventThread extends Thread {
+ class TaskEventThread extends BaseDaemonThread {
+ protected TaskEventThread() {
+ super("TaskEventLoopThread");
+ }
+
@Override
public void run() {
while (Stopper.isRunning()) {
@@ -123,7 +126,11 @@ public class TaskEventService {
/**
* event handler thread
*/
- class TaskEventHandlerThread extends Thread {
+ class TaskEventHandlerThread extends BaseDaemonThread {
+
+ protected TaskEventHandlerThread() {
+ super("TaskEventHandlerThread");
+ }
@Override
public void run() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index fc9c4fd854..97c67d8493 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
-public class EventExecuteService extends Thread {
+public class EventExecuteService extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);
@@ -42,9 +43,12 @@ public class EventExecuteService extends Thread {
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+ protected EventExecuteService() {
+ super("EventServiceStarted");
+ }
+
@Override
public synchronized void start() {
- super.setName("EventServiceStarted");
super.start();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 4bac6cdf15..1c6ea144e4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
-public class FailoverExecuteThread extends Thread {
+public class FailoverExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class);
@@ -42,16 +43,19 @@ public class FailoverExecuteThread extends Thread {
@Autowired
private FailoverService failoverService;
+ protected FailoverExecuteThread() {
+ super("FailoverExecuteThread");
+ }
+
@Override
public synchronized void start() {
- super.setName("FailoverExecuteThread");
super.start();
}
@Override
public void run() {
// when startup, wait 10s for ready
- ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 10);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
logger.info("failover execute thread started");
while (Stopper.isRunning()) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 23577a90db..94991a50c3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -52,7 +53,7 @@ import org.springframework.stereotype.Service;
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@Service
-public class MasterSchedulerService extends Thread {
+public class MasterSchedulerService extends BaseDaemonThread {
/**
* logger of MasterSchedulerService
@@ -102,6 +103,10 @@ public class MasterSchedulerService extends Thread {
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
+ protected MasterSchedulerService() {
+ super("MasterCommandLoopThread");
+ }
+
/**
* constructor of MasterSchedulerService
*/
@@ -113,9 +118,8 @@ public class MasterSchedulerService extends Thread {
@Override
public synchronized void start() {
- super.setName("MasterSchedulerService");
- super.start();
this.stateWheelExecuteThread.start();
+ super.start();
}
public void close() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index e85ddf08bd..12d404406c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -50,7 +51,7 @@ import org.springframework.stereotype.Component;
* 4. timeout process check
*/
@Component
-public class StateWheelExecuteThread extends Thread {
+public class StateWheelExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
@@ -83,6 +84,10 @@ public class StateWheelExecuteThread extends Thread {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+ protected StateWheelExecuteThread() {
+ super("StateWheelExecuteThread");
+ }
+
@Override
public void run() {
Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);