You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2019/12/20 07:43:40 UTC

[incubator-dolphinscheduler] branch dev updated: sleep when resource in not satisfy. fix #1522 (#1523)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 076ccbd  sleep when resource in not satisfy. fix #1522 (#1523)
076ccbd is described below

commit 076ccbdb9b3657f286198caf23f500341269dff5
Author: Tboy <gu...@immomo.com>
AuthorDate: Fri Dec 20 15:43:34 2019 +0800

    sleep when resource in not satisfy. fix #1522 (#1523)
    
    * fix #1515
    
    * sleep when resource in not satisfy. fix #1522
    
    * add sleep 1s for no command
---
 .../master/runner/MasterSchedulerThread.java       | 60 +++++++++++-----------
 1 file changed, 31 insertions(+), 29 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
index 69c2304..8d7d5a0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
@@ -98,39 +98,41 @@ public class MasterSchedulerThread implements Runnable {
             InterProcessMutex mutex = null;
             try {
 
-                if(OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory())){
-                    if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
-
-                        // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
-                        String znodeLock = zkMasterClient.getMasterLockPath();
-
-                        mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
-                        mutex.acquire();
-
-                        ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
-                        int activeCount = poolExecutor.getActiveCount();
-                        // make sure to scan and delete command  table in one transaction
-                        Command command = processDao.findOneCommand();
-                        if (command != null) {
-                            logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
-
-                            try{
-                                processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
-                                if (processInstance != null) {
-                                    logger.info("start master exec thread , split DAG ...");
-                                    masterExecService.execute(new MasterExecThread(processInstance,processDao));
-                                }
-                            }catch (Exception e){
-                                logger.error("scan command error ", e);
-                                processDao.moveToErrorCommand(command, e.toString());
+                boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
+                if(!runCheckFlag) {
+                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                    continue;
+                }
+                if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
+
+                    // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
+                    String znodeLock = zkMasterClient.getMasterLockPath();
+
+                    mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
+                    mutex.acquire();
+
+                    ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
+                    int activeCount = poolExecutor.getActiveCount();
+                    // make sure to scan and delete command  table in one transaction
+                    Command command = processDao.findOneCommand();
+                    if (command != null) {
+                        logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
+
+                        try{
+                            processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
+                            if (processInstance != null) {
+                                logger.info("start master exec thread , split DAG ...");
+                                masterExecService.execute(new MasterExecThread(processInstance,processDao));
                             }
+                        }catch (Exception e){
+                            logger.error("scan command error ", e);
+                            processDao.moveToErrorCommand(command, e.toString());
                         }
+                    } else{
+                        //indicate that no command ,sleep for 1s
+                        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                     }
                 }
-
-                // accessing the command table every SLEEP_TIME_MILLIS milliseconds
-                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-
             }catch (Exception e){
                 logger.error("master scheduler thread exception : " + e.getMessage(),e);
             }finally{