You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/06/06 02:49:31 UTC
[dolphinscheduler] branch dev updated: [Fix] [Worker] Fix worker will hang if fails to start (#10342)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 516757cc74 [Fix] [Worker] Fix worker will hang if fails to start (#10342)
516757cc74 is described below
commit 516757cc7400b576ca0933dd5826005db06fa508
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Jun 6 10:49:23 2022 +0800
[Fix] [Worker] Fix worker will hang if fails to start (#10342)
* Fix worker will hang if fails to start
* Add .run to ignore
---
.gitignore | 1 +
.../plugin/datasource/hive/HiveDataSourceClient.java | 4 +++-
.../server/log/LoggerRequestProcessor.java | 16 ++++++++++------
.../dolphinscheduler/remote/NettyRemotingServer.java | 4 ++--
4 files changed, 16 insertions(+), 9 deletions(-)
diff --git a/.gitignore b/.gitignore
index 8e55e20304..fbc74e41d1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@
.DS_Store
.target
.idea/
+.run/
target/
dist/
all-dependencies.txt
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
index 08b57631ac..dd4f7e89ca 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.datasource.hive;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
@@ -60,7 +61,8 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
@Override
protected void preInit() {
logger.info("PreInit in {}", getClass().getName());
- this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor();
+ this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build());
}
@Override
diff --git a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index afc914ef05..1ab4aa13fe 100644
--- a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.commons.lang3.StringUtils;
@@ -54,6 +55,8 @@ import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* logger request process logic
*/
@@ -65,7 +68,8 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
private final ExecutorService executor;
public LoggerRequestProcessor() {
- this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1);
+ this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1,
+ new NamedThreadFactory("Log-Request-Process-Thread"));
}
@Override
@@ -80,7 +84,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
command.getBody(), GetLogBytesRequestCommand.class);
String path = getLogRequest.getPath();
if (!checkPathSecurity(path)) {
- throw new IllegalArgumentException("Illegal path");
+ throw new IllegalArgumentException("Illegal path: " + path);
}
byte[] bytes = getFileContentBytes(path);
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
@@ -91,7 +95,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
command.getBody(), ViewLogRequestCommand.class);
String viewLogPath = viewLogRequest.getPath();
if (!checkPathSecurity(viewLogPath)) {
- throw new IllegalArgumentException("Illegal path");
+ throw new IllegalArgumentException("Illegal path: " + viewLogPath);
}
String msg = LoggerUtils.readWholeFileContent(viewLogPath);
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
@@ -103,7 +107,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
String rollViewLogPath = rollViewLogRequest.getPath();
if (!checkPathSecurity(rollViewLogPath)) {
- throw new IllegalArgumentException("Illegal path");
+ throw new IllegalArgumentException("Illegal path: " + rollViewLogPath);
}
List<String> lines = readPartFileContent(rollViewLogPath,
@@ -121,7 +125,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
String taskLogPath = removeTaskLogRequest.getPath();
if (!checkPathSecurity(taskLogPath)) {
- throw new IllegalArgumentException("Illegal path");
+ throw new IllegalArgumentException("Illegal path: " + taskLogPath);
}
File taskLogFile = new File(taskLogPath);
boolean status = true;
@@ -137,7 +141,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
break;
default:
- throw new IllegalArgumentException("unknown commandType");
+ throw new IllegalArgumentException("unknown commandType: " + commandType);
}
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index 2154cb7173..098751ab65 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -103,8 +103,8 @@ public class NettyRemotingServer {
*/
public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
- ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerBossThread_%s").build();
- ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerWorkerThread_%s").build();
+ ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerBossThread_%s").build();
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerWorkerThread_%s").build();
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);