You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2019/12/20 07:43:40 UTC
[incubator-dolphinscheduler] branch dev updated: sleep when
resource in not satisfy. fix #1522 (#1523)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 076ccbd sleep when resource in not satisfy. fix #1522 (#1523)
076ccbd is described below
commit 076ccbdb9b3657f286198caf23f500341269dff5
Author: Tboy <gu...@immomo.com>
AuthorDate: Fri Dec 20 15:43:34 2019 +0800
sleep when resource in not satisfy. fix #1522 (#1523)
* fix #1515
* sleep when resource in not satisfy. fix #1522
* add sleep 1s for no command
---
.../master/runner/MasterSchedulerThread.java | 60 +++++++++++-----------
1 file changed, 31 insertions(+), 29 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
index 69c2304..8d7d5a0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
@@ -98,39 +98,41 @@ public class MasterSchedulerThread implements Runnable {
InterProcessMutex mutex = null;
try {
- if(OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory())){
- if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
-
- // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
- String znodeLock = zkMasterClient.getMasterLockPath();
-
- mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
- mutex.acquire();
-
- ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
- int activeCount = poolExecutor.getActiveCount();
- // make sure to scan and delete command table in one transaction
- Command command = processDao.findOneCommand();
- if (command != null) {
- logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
-
- try{
- processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
- if (processInstance != null) {
- logger.info("start master exec thread , split DAG ...");
- masterExecService.execute(new MasterExecThread(processInstance,processDao));
- }
- }catch (Exception e){
- logger.error("scan command error ", e);
- processDao.moveToErrorCommand(command, e.toString());
+ boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
+ if(!runCheckFlag) {
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ continue;
+ }
+ if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
+
+ // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
+ String znodeLock = zkMasterClient.getMasterLockPath();
+
+ mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
+ mutex.acquire();
+
+ ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
+ int activeCount = poolExecutor.getActiveCount();
+ // make sure to scan and delete command table in one transaction
+ Command command = processDao.findOneCommand();
+ if (command != null) {
+ logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
+
+ try{
+ processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
+ if (processInstance != null) {
+ logger.info("start master exec thread , split DAG ...");
+ masterExecService.execute(new MasterExecThread(processInstance,processDao));
}
+ }catch (Exception e){
+ logger.error("scan command error ", e);
+ processDao.moveToErrorCommand(command, e.toString());
}
+ } else{
+ //indicate that no command ,sleep for 1s
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
-
- // accessing the command table every SLEEP_TIME_MILLIS milliseconds
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-
}catch (Exception e){
logger.error("master scheduler thread exception : " + e.getMessage(),e);
}finally{