You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/02/10 11:07:57 UTC

[dolphinscheduler] branch dev updated: Optimize some code (#8324)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 66dcf23  Optimize some code (#8324)
66dcf23 is described below

commit 66dcf2376a18995142dd4b3f0cb54df385720e95
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Feb 10 19:07:53 2022 +0800

    Optimize some code (#8324)
---
 .../master/registry/MasterRegistryClient.java      |  1 +
 .../registry/MasterRegistryDataListener.java       |  4 +-
 .../server/master/registry/ServerNodeManager.java  | 31 +++++++-------
 .../master/runner/MasterSchedulerService.java      | 38 +++++++---------
 .../remote/NettyRemotingServer.java                | 50 ++++------------------
 .../service/process/ProcessService.java            |  2 +-
 .../service/queue/MasterPriorityQueue.java         |  4 +-
 7 files changed, 46 insertions(+), 84 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index d23ab36..77e27ec 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -122,6 +122,7 @@ public class MasterRegistryClient {
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
         } catch (Exception e) {
             logger.error("master start up exception", e);
+            throw new RuntimeException("master start up error", e);
         } finally {
             registryClient.releaseLock(nodeLock);
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index 361f09f..30cea36 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -56,7 +56,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
         }
     }
 
-    public void handleMasterEvent(Event event) {
+    private void handleMasterEvent(Event event) {
         final String path = event.path();
         switch (event.type()) {
             case ADD:
@@ -70,7 +70,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
         }
     }
 
-    public void handleWorkerEvent(Event event) {
+    private void handleWorkerEvent(Event event) {
         final String path = event.path();
         switch (event.type()) {
             case ADD:
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index b7e904b..02f7f98 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -117,7 +116,7 @@ public class ServerNodeManager implements InitializingBean {
     @Autowired
     private WorkerGroupMapper workerGroupMapper;
 
-    private MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
+    private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
 
     /**
      * alert dao
@@ -125,15 +124,16 @@ public class ServerNodeManager implements InitializingBean {
     @Autowired
     private AlertDao alertDao;
 
-    public static volatile List<Integer> SLOT_LIST = new ArrayList<>();
+    private static volatile int MASTER_SLOT = 0;
 
-    public static volatile Integer MASTER_SIZE = 0;
+    private static volatile int MASTER_SIZE = 0;
 
-    public static Integer getSlot() {
-        if (SLOT_LIST.size() > 0) {
-            return SLOT_LIST.get(0);
-        }
-        return 0;
+    public static int getSlot() {
+        return MASTER_SLOT;
+    }
+
+    public static int getMasterSize() {
+        return MASTER_SIZE;
     }
 
 
@@ -295,7 +295,7 @@ public class ServerNodeManager implements InitializingBean {
     }
 
     private void updateMasterNodes() {
-        SLOT_LIST.clear();
+        MASTER_SLOT = 0;
         this.masterNodes.clear();
         String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
         try {
@@ -333,17 +333,18 @@ public class ServerNodeManager implements InitializingBean {
     private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
         masterLock.lock();
         try {
+            String host = NetUtils.getHost();
             this.masterNodes.addAll(nodes);
             this.masterPriorityQueue.clear();
             this.masterPriorityQueue.putList(masterNodes);
-            int index = masterPriorityQueue.getIndex(NetUtils.getHost());
+            int index = masterPriorityQueue.getIndex(host);
             if (index >= 0) {
                 MASTER_SIZE = nodes.size();
-                SLOT_LIST.add(masterPriorityQueue.getIndex(NetUtils.getHost()));
+                MASTER_SLOT = index;
+            } else {
+                logger.warn("current host:{} is not in active master list", host);
             }
-            logger.info("update master nodes, master size: {}, slot: {}",
-                    MASTER_SIZE, SLOT_LIST.toString()
-            );
+            logger.info("update master nodes, master size: {}, slot: {}", MASTER_SIZE, MASTER_SLOT);
         } finally {
             masterLock.unlock();
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index ff30f28..f8a1ef6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -30,14 +30,12 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
-import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -185,29 +183,24 @@ public class MasterSchedulerService extends Thread {
             return null;
         }
 
-        ProcessInstance[] processInstances = new ProcessInstance[commands.size()];
+        List<ProcessInstance> processInstances = new ArrayList<>(commands.size());
         CountDownLatch latch = new CountDownLatch(commands.size());
-        for (int i = 0; i < commands.size(); i++) {
-            int index = i;
-            this.masterPrepareExecService.execute(() -> {
-                Command command = commands.get(index);
-                // slot check again
-                if (!slotCheck(command)) {
-                    latch.countDown();
-                    return;
-                }
-
+        for (final Command command : commands) {
+            masterPrepareExecService.execute(() -> {
                 try {
+                    // slot check again
+                    if (!slotCheck(command)) {
+                        return;
+                    }
                     ProcessInstance processInstance = processService.handleCommand(logger,
                             getLocalAddress(),
                             command);
                     if (processInstance != null) {
-                        processInstances[index] = processInstance;
-                        logger.info("handle command command {} end, create process instance {}",
-                                command.getId(), processInstance.getId());
+                        processInstances.add(processInstance);
+                        logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
                     }
                 } catch (Exception e) {
-                    logger.error("scan command error ", e);
+                    logger.error("handle command error ", e);
                     processService.moveToErrorCommand(command, e.toString());
                 } finally {
                     latch.countDown();
@@ -222,7 +215,7 @@ public class MasterSchedulerService extends Thread {
             logger.error("countDownLatch await error ", e);
         }
 
-        return Arrays.asList(processInstances);
+        return processInstances;
     }
 
     private List<Command> findCommands() {
@@ -230,9 +223,10 @@ public class MasterSchedulerService extends Thread {
         int pageSize = masterConfig.getFetchCommandNum();
         List<Command> result = new ArrayList<>();
         while (Stopper.isRunning()) {
-            if (ServerNodeManager.MASTER_SIZE == 0) {
+            if (ServerNodeManager.getMasterSize() == 0) {
                 return result;
             }
+            // todo: Can we use the slot to scan database?
             List<Command> commandList = processService.findCommandPage(pageSize, pageNumber);
             if (commandList.size() == 0) {
                 return result;
@@ -253,10 +247,8 @@ public class MasterSchedulerService extends Thread {
 
     private boolean slotCheck(Command command) {
         int slot = ServerNodeManager.getSlot();
-        if (ServerNodeManager.MASTER_SIZE != 0 && command.getId() % ServerNodeManager.MASTER_SIZE == slot) {
-            return true;
-        }
-        return false;
+        int masterSize = ServerNodeManager.getMasterSize();
+        return masterSize != 0 && command.getId() % masterSize == slot;
     }
 
     private String getLocalAddress() {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index f4a8869..665779d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -32,11 +32,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -60,11 +61,6 @@ public class NettyRemotingServer {
     private final ServerBootstrap serverBootstrap = new ServerBootstrap();
 
     /**
-     * encoder
-     */
-    private final NettyEncoder encoder = new NettyEncoder();
-
-    /**
      * default executor
      */
     private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
@@ -106,42 +102,14 @@ public class NettyRemotingServer {
      */
     public NettyRemotingServer(final NettyServerConfig serverConfig) {
         this.serverConfig = serverConfig;
+        ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerBossThread_%s").build();
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerWorkerThread_%s").build();
         if (NettyUtils.useEpoll()) {
-            this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
-
-            this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
+            this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
+            this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
         } else {
-            this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
-
-            this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
+            this.bossGroup = new NioEventLoopGroup(1, bossThreadFactory);
+            this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
         }
     }
 
@@ -191,7 +159,7 @@ public class NettyRemotingServer {
      */
     private void initNettyChannel(SocketChannel ch) {
         ch.pipeline()
-                .addLast("encoder", encoder)
+                .addLast("encoder", new NettyEncoder())
                 .addLast("decoder", new NettyDecoder())
                 .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
                 .addLast("handler", serverHandler);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b795385..d6251d5 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -275,7 +275,7 @@ public class ProcessService {
         }
         processInstance.setCommandType(command.getCommandType());
         processInstance.addHistoryCmd(command.getCommandType());
-        //if the processDefination is serial
+        //if the processDefinition is serial
         ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
         if (processDefinition.getExecutionType().typeIsSerial()) {
             saveSerialProcess(processInstance, processDefinition);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
index 7743203..66385db 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
@@ -97,12 +97,12 @@ public class MasterPriorityQueue implements TaskPriorityQueue<Server> {
     }
 
     /**
-     * server comparator
+     * server comparator, used to sort server by createTime in reverse order.
      */
     private class ServerComparator implements Comparator<Server> {
         @Override
         public int compare(Server o1, Server o2) {
-            return o1.getCreateTime().before(o2.getCreateTime()) ? 1 : 0;
+            return o2.getCreateTime().compareTo(o1.getCreateTime());
         }
     }