You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/05/19 11:49:56 UTC
[incubator-dolphinscheduler] branch dev updated: simply
NettyExecutorManager.execute logic making it readable (#2744)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1a78c11 simply NettyExecutorManager.execute logic making it readable (#2744)
1a78c11 is described below
commit 1a78c1150c89d3bbea9029bcc9b2489b6460b278
Author: gabry.wu <wu...@qq.com>
AuthorDate: Tue May 19 19:49:39 2020 +0800
simply NettyExecutorManager.execute logic making it readable (#2744)
* simply NettyExecutorManager.execute logic making it readable
* fix NPE
* remove unused import
Co-authored-by: dailidong <da...@gmail.com>
---
.../dispatch/executor/NettyExecutorManager.java | 54 ++++++++--------------
1 file changed, 20 insertions(+), 34 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 7ded3b0..6fc3f45 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.dispatch.executor;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -36,10 +35,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
/**
* netty executor manager
@@ -87,17 +83,11 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
-
- /**
- * all nodes
- */
- Set<String> allNodes = getAllNodes(context);
-
- /**
- * fail nodes
- */
- Set<String> failNodeSet = new HashSet<>();
-
+ LinkedList<String> allNodes = new LinkedList<>();
+ Set<String> nodes = getAllNodes(context);
+ if (nodes != null) {
+ allNodes.addAll(nodes);
+ }
/**
* build command accord executeContext
*/
@@ -106,31 +96,27 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
/**
* execute task host
*/
- Host host = context.getHost();
+ String startHostAddress = context.getHost().getAddress();
+ // remove start host address and add it to head
+ allNodes.remove(startHostAddress);
+ allNodes.addFirst(startHostAddress);
+
boolean success = false;
- while (!success) {
+ for (String address : allNodes) {
try {
- doExecute(host,command);
+ Host host = Host.of(address);
+ doExecute(host, command);
success = true;
context.setHost(host);
+ break;
} catch (ExecuteException ex) {
- logger.error(String.format("execute command : %s error", command), ex);
- try {
- failNodeSet.add(host.getAddress());
- Set<String> tmpAllIps = new HashSet<>(allNodes);
- Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
- if (remained != null && remained.size() > 0) {
- host = Host.of(remained.iterator().next());
- logger.error("retry execute command : {} host : {}", command, host);
- } else {
- throw new ExecuteException("fail after try all nodes");
- }
- } catch (Throwable t) {
- throw new ExecuteException("fail after try all nodes");
- }
+ logger.error("retry execute command : {} host : {}", command, address);
}
}
-
+ if (!success) {
+ throw new ExecuteException("fail after try all nodes");
+ }
+
return success;
}