You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:35 UTC

[dolphinscheduler] 08/29: [Fix-10413] Fix Master startup failure the server still hang (#10500)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 90c87f012129e812d87a3f6e847de16f525611ea
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Jun 20 22:35:06 2022 +0800

    [Fix-10413] Fix Master startup failure the server still hang (#10500)
    
    * Fix Master startup failure the server still hang
    
    (cherry picked from commit 117f78ec4b0e2438082a3e25158492eca1b9b1be)
---
 .../common/storage/StorageOperate.java             | 32 +++++++++----------
 .../common/thread/BaseDaemonThread.java            | 36 ++++++++++++++++++++++
 .../master/consumer/TaskPriorityQueueConsumer.java | 11 ++++---
 .../processor/queue/StateEventResponseService.java |  8 +++--
 .../master/processor/queue/TaskEventService.java   | 17 +++++++---
 .../server/master/runner/EventExecuteService.java  |  8 +++--
 .../master/runner/FailoverExecuteThread.java       | 10 ++++--
 .../master/runner/MasterSchedulerService.java      | 10 ++++--
 .../master/runner/StateWheelExecuteThread.java     |  7 ++++-
 9 files changed, 103 insertions(+), 36 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java
index 5248586de1..7854eaa032 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java
@@ -28,21 +28,21 @@ import java.util.List;
 
 public interface StorageOperate {
 
-    public static final String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
+    String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
 
     /**
      * if the resource of tenant 's exist, the resource of folder will be created
      * @param tenantCode
      * @throws Exception
      */
-    public void createTenantDirIfNotExists(String tenantCode) throws Exception;
+    void createTenantDirIfNotExists(String tenantCode) throws Exception;
 
     /**
      * get the resource directory of tenant
      * @param tenantCode
      * @return
      */
-    public String getResDir(String tenantCode);
+    String getResDir(String tenantCode);
 
     /**
      * return the udf directory of tenant
@@ -50,7 +50,7 @@ public interface StorageOperate {
      * @return
      */
 
-    public String getUdfDir(String tenantCode);
+    String getUdfDir(String tenantCode);
 
     /**
      * create the directory that the path of tenant wanted to create
@@ -59,7 +59,7 @@ public interface StorageOperate {
      * @return
      * @throws IOException
      */
-    public boolean mkdir(String tenantCode,String path) throws IOException;
+    boolean mkdir(String tenantCode, String path) throws IOException;
 
     /**
      * get the path of the resource file
@@ -67,7 +67,7 @@ public interface StorageOperate {
      * @param fullName
      * @return
      */
-    public String getResourceFileName(String tenantCode, String fullName);
+    String getResourceFileName(String tenantCode, String fullName);
 
     /**
      * get the path of the file
@@ -76,7 +76,7 @@ public interface StorageOperate {
      * @param fileName
      * @return
      */
-    public String getFileName(ResourceType resourceType, String tenantCode, String fileName);
+    String getFileName(ResourceType resourceType, String tenantCode, String fileName);
 
     /**
      * predicate  if the resource of tenant exists
@@ -85,7 +85,7 @@ public interface StorageOperate {
      * @return
      * @throws IOException
      */
-    public  boolean exists(String tenantCode,String fileName) throws IOException;
+    boolean exists(String tenantCode, String fileName) throws IOException;
 
     /**
      * delete the resource of  filePath
@@ -96,7 +96,7 @@ public interface StorageOperate {
      * @return
      * @throws IOException
      */
-    public boolean delete(String tenantCode,String filePath, boolean recursive) throws IOException;
+    boolean delete(String tenantCode, String filePath, boolean recursive) throws IOException;
 
     /**
      * copy the file from srcPath to dstPath
@@ -107,7 +107,7 @@ public interface StorageOperate {
      * @return
      * @throws IOException
      */
-    public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
+    boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
 
     /**
      * get the root path of the tenant with resourceType
@@ -115,7 +115,7 @@ public interface StorageOperate {
      * @param tenantCode
      * @return
      */
-    public  String getDir(ResourceType resourceType, String tenantCode);
+    String getDir(ResourceType resourceType, String tenantCode);
 
     /**
      * upload the local srcFile to dstPath
@@ -127,7 +127,7 @@ public interface StorageOperate {
      * @return
      * @throws IOException
      */
-    public boolean upload(String tenantCode,String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
+    boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
 
     /**
      * download the srcPath to local
@@ -138,7 +138,7 @@ public interface StorageOperate {
      * @param overwrite
      * @throws IOException
      */
-    public void download(String tenantCode,String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException;
+    void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException;
 
     /**
      * vim the context of filePath
@@ -149,7 +149,7 @@ public interface StorageOperate {
      * @return
      * @throws IOException
      */
-    public List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException;
+    List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException;
 
     /**
      * delete the files and directory of the tenant
@@ -157,13 +157,13 @@ public interface StorageOperate {
      * @param tenantCode
      * @throws Exception
      */
-    public void deleteTenant(String tenantCode) throws Exception;
+    void deleteTenant(String tenantCode) throws Exception;
 
     /**
      * return the storageType
      *
      * @return
      */
-    public ResUploadType returnStorageType();
+    ResUploadType returnStorageType();
 
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
new file mode 100644
index 0000000000..88a44004cb
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.thread;
+
+/**
+ * All thread used in DolphinScheduler should extend with this class to avoid the server hang issue.
+ */
+public abstract class BaseDaemonThread extends Thread {
+
+    protected BaseDaemonThread(Runnable runnable) {
+        super(runnable);
+        this.setDaemon(true);
+    }
+
+    protected BaseDaemonThread(String threadName) {
+        super();
+        this.setName(threadName);
+        this.setDaemon(true);
+    }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index d75595de1a..eab33a07a6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.consumer;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -60,7 +61,7 @@ import org.springframework.stereotype.Component;
  * TaskUpdateQueue consumer
  */
 @Component
-public class TaskPriorityQueueConsumer extends Thread {
+public class TaskPriorityQueueConsumer extends BaseDaemonThread {
 
     /**
      * logger of TaskUpdateQueueConsumer
@@ -108,6 +109,10 @@ public class TaskPriorityQueueConsumer extends Thread {
      */
     private ThreadPoolExecutor consumerThreadPoolExecutor;
 
+    protected TaskPriorityQueueConsumer() {
+        super("TaskPriorityQueueConsumeThread");
+    }
+
     @PostConstruct
     public void init() {
         this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
@@ -198,10 +203,8 @@ public class TaskPriorityQueueConsumer extends Thread {
             } else {
                 logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
             }
-        } catch (RuntimeException e) {
+        } catch (RuntimeException | ExecuteException e) {
             logger.error("Master dispatch task to worker error: ", e);
-        } catch (ExecuteException e) {
-            logger.error("Master dispatch task to worker error: {}", e);
         }
         return result;
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index 83772a1054..e34dedca67 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
@@ -70,7 +71,6 @@ public class StateEventResponseService {
     @PostConstruct
     public void start() {
         this.responseWorker = new StateEventResponseWorker();
-        this.responseWorker.setName("StateEventResponseWorker");
         this.responseWorker.start();
     }
 
@@ -101,7 +101,11 @@ public class StateEventResponseService {
     /**
      * task worker thread
      */
-    class StateEventResponseWorker extends Thread {
+    class StateEventResponseWorker extends BaseDaemonThread {
+
+        protected StateEventResponseWorker() {
+            super("StateEventResponseWorker");
+        }
 
         @Override
         public void run() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index 924dd131f8..bed3b3d9ed 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 
 import java.util.ArrayList;
@@ -63,11 +64,9 @@ public class TaskEventService {
     @PostConstruct
     public void start() {
         this.taskEventThread = new TaskEventThread();
-        this.taskEventThread.setName("TaskEventThread");
         this.taskEventThread.start();
 
         this.taskEventHandlerThread = new TaskEventHandlerThread();
-        this.taskEventHandlerThread.setName("TaskEventHandlerThread");
         this.taskEventHandlerThread.start();
     }
 
@@ -85,7 +84,7 @@ public class TaskEventService {
                 taskExecuteThreadPool.eventHandler();
             }
         } catch (Exception e) {
-            logger.error("stop error:", e);
+            logger.error("TaskEventService stop error:", e);
         }
     }
 
@@ -101,7 +100,11 @@ public class TaskEventService {
     /**
      * task worker thread
      */
-    class TaskEventThread extends Thread {
+    class TaskEventThread extends BaseDaemonThread {
+        protected TaskEventThread() {
+            super("TaskEventLoopThread");
+        }
+
         @Override
         public void run() {
             while (Stopper.isRunning()) {
@@ -123,7 +126,11 @@ public class TaskEventService {
     /**
      * event handler thread
      */
-    class TaskEventHandlerThread extends Thread {
+    class TaskEventHandlerThread extends BaseDaemonThread {
+
+        protected TaskEventHandlerThread() {
+            super("TaskEventHandlerThread");
+        }
 
         @Override
         public void run() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index fc9c4fd854..97c67d8493 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 
@@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 @Service
-public class EventExecuteService extends Thread {
+public class EventExecuteService extends BaseDaemonThread {
 
     private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);
 
@@ -42,9 +43,12 @@ public class EventExecuteService extends Thread {
     @Autowired
     private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
+    protected EventExecuteService() {
+        super("EventServiceStarted");
+    }
+
     @Override
     public synchronized void start() {
-        super.setName("EventServiceStarted");
         super.start();
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 4bac6cdf15..1c6ea144e4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 @Service
-public class FailoverExecuteThread extends Thread {
+public class FailoverExecuteThread extends BaseDaemonThread {
 
     private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class);
 
@@ -42,16 +43,19 @@ public class FailoverExecuteThread extends Thread {
     @Autowired
     private FailoverService failoverService;
 
+    protected FailoverExecuteThread() {
+        super("FailoverExecuteThread");
+    }
+
     @Override
     public synchronized void start() {
-        super.setName("FailoverExecuteThread");
         super.start();
     }
 
     @Override
     public void run() {
         // when startup, wait 10s for ready
-        ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 10);
+        ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
 
         logger.info("failover execute thread started");
         while (Stopper.isRunning()) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 23577a90db..94991a50c3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.SlotCheckState;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -52,7 +53,7 @@ import org.springframework.stereotype.Service;
  * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
  */
 @Service
-public class MasterSchedulerService extends Thread {
+public class MasterSchedulerService extends BaseDaemonThread {
 
     /**
      * logger of MasterSchedulerService
@@ -102,6 +103,10 @@ public class MasterSchedulerService extends Thread {
     @Autowired
     private StateWheelExecuteThread stateWheelExecuteThread;
 
+    protected MasterSchedulerService() {
+        super("MasterCommandLoopThread");
+    }
+
     /**
      * constructor of MasterSchedulerService
      */
@@ -113,9 +118,8 @@ public class MasterSchedulerService extends Thread {
 
     @Override
     public synchronized void start() {
-        super.setName("MasterSchedulerService");
-        super.start();
         this.stateWheelExecuteThread.start();
+        super.start();
     }
 
     public void close() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index e85ddf08bd..12d404406c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -50,7 +51,7 @@ import org.springframework.stereotype.Component;
  * 4. timeout process check
  */
 @Component
-public class StateWheelExecuteThread extends Thread {
+public class StateWheelExecuteThread extends BaseDaemonThread {
 
     private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
 
@@ -83,6 +84,10 @@ public class StateWheelExecuteThread extends Thread {
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
+    protected StateWheelExecuteThread() {
+        super("StateWheelExecuteThread");
+    }
+
     @Override
     public void run() {
         Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);