You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/05/19 11:49:56 UTC

[incubator-dolphinscheduler] branch dev updated: simply NettyExecutorManager.execute logic making it readable (#2744)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1a78c11  simply NettyExecutorManager.execute logic making it readable (#2744)
1a78c11 is described below

commit 1a78c1150c89d3bbea9029bcc9b2489b6460b278
Author: gabry.wu <wu...@qq.com>
AuthorDate: Tue May 19 19:49:39 2020 +0800

    simply NettyExecutorManager.execute logic making it readable (#2744)
    
    * simply NettyExecutorManager.execute logic making it readable
    
    * fix NPE
    
    * remove unused import
    
    Co-authored-by: dailidong <da...@gmail.com>
---
 .../dispatch/executor/NettyExecutorManager.java    | 54 ++++++++--------------
 1 file changed, 20 insertions(+), 34 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 7ded3b0..6fc3f45 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.executor;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -36,10 +35,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 
 /**
  *  netty executor manager
@@ -87,17 +83,11 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
      */
     @Override
     public Boolean execute(ExecutionContext context) throws ExecuteException {
-
-        /**
-         *  all nodes
-         */
-        Set<String> allNodes = getAllNodes(context);
-
-        /**
-         * fail nodes
-         */
-        Set<String> failNodeSet = new HashSet<>();
-
+        LinkedList<String> allNodes = new LinkedList<>();
+        Set<String> nodes = getAllNodes(context);
+        if (nodes != null) {
+            allNodes.addAll(nodes);
+        }
         /**
          *  build command accord executeContext
          */
@@ -106,31 +96,27 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
         /**
          * execute task host
          */
-        Host host = context.getHost();
+        String startHostAddress = context.getHost().getAddress();
+        // remove start host address and add it to head
+        allNodes.remove(startHostAddress);
+        allNodes.addFirst(startHostAddress);
+ 
         boolean success = false;
-        while (!success) {
+        for (String address : allNodes) {
             try {
-                doExecute(host,command);
+                Host host = Host.of(address);
+                doExecute(host, command);
                 success = true;
                 context.setHost(host);
+                break;
             } catch (ExecuteException ex) {
-                logger.error(String.format("execute command : %s error", command), ex);
-                try {
-                    failNodeSet.add(host.getAddress());
-                    Set<String> tmpAllIps = new HashSet<>(allNodes);
-                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
-                    if (remained != null && remained.size() > 0) {
-                        host = Host.of(remained.iterator().next());
-                        logger.error("retry execute command : {} host : {}", command, host);
-                    } else {
-                        throw new ExecuteException("fail after try all nodes");
-                    }
-                } catch (Throwable t) {
-                    throw new ExecuteException("fail after try all nodes");
-                }
+                logger.error("retry execute command : {} host : {}", command, address);
             }
         }
-
+        if (!success) {
+            throw new ExecuteException("fail after try all nodes");
+        }
+        
         return success;
     }