You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/19 02:11:52 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
Refactor worker (#1975)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new f9500c5 Refactor worker (#1975)
f9500c5 is described below
commit f9500c58b174f7dbf1c3d073046583157c7d6673
Author: Tboy <gu...@immomo.com>
AuthorDate: Wed Feb 19 10:11:41 2020 +0800
Refactor worker (#1975)
* updates
* move FetchTaskThread logic to WorkerNettyRequestProcessor
* add NettyRemotingClient to scheduler thread
---
.../remote/command/ExecuteTaskRequestCommand.java | 2 +-
.../dolphinscheduler/remote/utils/Constants.java | 3 +
.../dolphinscheduler/remote/utils/IPUtils.java | 142 +++++++++++++++++++++
.../server/master/runner/MasterExecThread.java | 28 +++-
.../master/runner/MasterSchedulerThread.java | 13 +-
.../server/registry/ZookeeperRegistryCenter.java | 83 ++++++++++++
.../server/worker/WorkerServer.java | 49 ++++++-
.../processor/WorkerNettyRequestProcessor.java | 106 +++++++++++++++
.../server/worker/registry/WorkerRegistry.java | 77 +++++++++++
9 files changed, 494 insertions(+), 9 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
index beec055..adfcb1d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concu
rrent.atomic.AtomicLong;
/**
* execute task request command
*/
public class ExecuteTaskRequestCommand implements Serializable {
/**
* task id
*/
private String taskId;
/**
* attempt id
*/
private String attemptId;
/**
* application name
*/
private String applicationName;
/**
* group name
*/
private String groupName;
/**
* task name
*/
private String taskName;
/**
* connector port
*/
private int connectorPort;
/**
* description info
*/
private String description;
/**
* class name
*/
private String className;
/**
* method name
*/
private String methodName;
/**
* parameters
*/
private String params;
/**
* shard itemds
*/
private List<Integer> shardItems;
public List<Integer> getShardItems() {
return shardItems;
}
public void setShardItems(List<
Integer> shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
pub
lic class ExecuteTaskRequestCommand implements Serializable {
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
index 5733b17..99fbb94 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -38,4 +38,7 @@ public class Constants {
*/
public static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+ public static final String LOCAL_ADDRESS = IPUtils.getFirstNoLoopbackIP4Address();
+
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java
new file mode 100644
index 0000000..2fa82fd
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class IPUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(IPUtils.class);
+
+ private static String IP_REGEX = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}";
+
+ private static String LOCAL_HOST = "unknown";
+
+ static {
+ String host = System.getenv("HOSTNAME");
+ if (isNotEmpty(host)) {
+ LOCAL_HOST = host;
+ } else {
+
+ try {
+ String hostName = InetAddress.getLocalHost().getHostName();
+ if (isNotEmpty(hostName)) {
+ LOCAL_HOST = hostName;
+ }
+ } catch (UnknownHostException e) {
+ logger.error("get hostName error!", e);
+ }
+ }
+ }
+
+ public static String getLocalHost() {
+ return LOCAL_HOST;
+ }
+
+
+ public static String getFirstNoLoopbackIP4Address() {
+ Collection<String> allNoLoopbackIP4Addresses = getNoLoopbackIP4Addresses();
+ if (allNoLoopbackIP4Addresses.isEmpty()) {
+ return null;
+ }
+ return allNoLoopbackIP4Addresses.iterator().next();
+ }
+
+ public static Collection<String> getNoLoopbackIP4Addresses() {
+ Collection<String> noLoopbackIP4Addresses = new ArrayList<>();
+ Collection<InetAddress> allInetAddresses = getAllHostAddress();
+
+ for (InetAddress address : allInetAddresses) {
+ if (!address.isLoopbackAddress() && !address.isSiteLocalAddress()
+ && !Inet6Address.class.isInstance(address)) {
+ noLoopbackIP4Addresses.add(address.getHostAddress());
+ }
+ }
+ if (noLoopbackIP4Addresses.isEmpty()) {
+ for (InetAddress address : allInetAddresses) {
+ if (!address.isLoopbackAddress() && !Inet6Address.class.isInstance(address)) {
+ noLoopbackIP4Addresses.add(address.getHostAddress());
+ }
+ }
+ }
+ return noLoopbackIP4Addresses;
+ }
+
+ public static Collection<InetAddress> getAllHostAddress() {
+ try {
+ Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
+ Collection<InetAddress> addresses = new ArrayList<>();
+
+ while (networkInterfaces.hasMoreElements()) {
+ NetworkInterface networkInterface = networkInterfaces.nextElement();
+ Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = inetAddresses.nextElement();
+ addresses.add(inetAddress);
+ }
+ }
+
+ return addresses;
+ } catch (SocketException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static String getIpByHostName(String host) {
+ InetAddress address = null;
+ try {
+ address = InetAddress.getByName(host);
+ } catch (UnknownHostException e) {
+ logger.error("get IP error", e);
+ }
+ if (address == null) {
+ return "";
+ }
+ return address.getHostAddress();
+
+ }
+
+ private static boolean isEmpty(final CharSequence cs) {
+ return cs == null || cs.length() == 0;
+ }
+
+ private static boolean isNotEmpty(final CharSequence cs) {
+ return !isEmpty(cs);
+ }
+
+ public static boolean isIp(String addr) {
+ if (addr.length() < 7 || addr.length() > 15 || "".equals(addr)) {
+ return false;
+ }
+
+ Pattern pat = Pattern.compile(IP_REGEX);
+
+ Matcher mat = pat.matcher(addr);
+
+ boolean ipAddress = mat.find();
+
+ return ipAddress;
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index f5e3121..be83217 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -32,6 +32,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -135,11 +140,16 @@ public class MasterExecThread implements Runnable {
private MasterConfig masterConfig;
/**
+ *
+ */
+ private NettyRemotingClient nettyRemotingClient;
+
+ /**
* constructor of MasterExecThread
* @param processInstance process instance
* @param processService process dao
*/
- public MasterExecThread(ProcessInstance processInstance, ProcessService processService){
+ public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){
this.processService = processService;
this.processInstance = processInstance;
@@ -147,6 +157,22 @@ public class MasterExecThread implements Runnable {
int masterTaskExecNum = masterConfig.getMasterExecTaskNum();
this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
masterTaskExecNum);
+ this.nettyRemotingClient = nettyRemotingClient;
+ }
+
+ //TODO
+ /**端口,默认是123456
+ * 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。
+ */
+ private void sendToWorker(){
+ final Address address = new Address("localhost", 12346);
+ ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand();
+ try {
+ Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000);
+ //结果可能为空,所以不用管,能发过去,就行。
+ } catch (InterruptedException | RemotingException ex) {
+ logger.error(String.format("send command to : %s error", address), ex);
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
index c0ddb1c..6e96164 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
@@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -70,6 +72,11 @@ public class MasterSchedulerThread implements Runnable {
*/
private MasterConfig masterConfig;
+ /**
+ * netty remoting client
+ */
+ private NettyRemotingClient nettyRemotingClient;
+
/**
* constructor of MasterSchedulerThread
@@ -83,6 +90,9 @@ public class MasterSchedulerThread implements Runnable {
this.masterExecThreadNum = masterExecThreadNum;
this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum);
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+ //
+ NettyClientConfig clientConfig = new NettyClientConfig();
+ this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
/**
@@ -123,7 +133,7 @@ public class MasterSchedulerThread implements Runnable {
processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
- masterExecService.execute(new MasterExecThread(processInstance, processService));
+ masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));
}
}catch (Exception e){
logger.error("scan command error ", e);
@@ -140,6 +150,7 @@ public class MasterSchedulerThread implements Runnable {
AbstractZKClient.releaseMutex(mutex);
}
}
+ nettyRemotingClient.close();
logger.info("master server stopped...");
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
new file mode 100644
index 0000000..68c19ea
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.registry;
+
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Service
+public class ZookeeperRegistryCenter implements InitializingBean {
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ public static final String NAMESPACE = "/dolphinscheduler";
+
+ public static final String NODES = NAMESPACE + "/nodes";
+
+ public static final String MASTER_PATH = NODES + "/master";
+
+ public static final String WORKER_PATH = NODES + "/worker";
+
+ public static final String EMPTY = "";
+
+ @Autowired
+ protected ZookeeperCachedOperator zookeeperCachedOperator;
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+
+ }
+
+ public void init() {
+ if (isStarted.compareAndSet(false, true)) {
+ //TODO
+// zookeeperCachedOperator.start(NODES);
+ initNodes();
+ }
+ }
+
+ private void initNodes() {
+ zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
+ zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
+ }
+
+ public void close() {
+ if (isStarted.compareAndSet(true, false)) {
+ if (zookeeperCachedOperator != null) {
+ zookeeperCachedOperator.close();
+ }
+ }
+ }
+
+ public String getMasterPath() {
+ return MASTER_PATH;
+ }
+
+ public String getWorkerPath() {
+ return WORKER_PATH;
+ }
+
+ public ZookeeperCachedOperator getZookeeperCachedOperator() {
+ return zookeeperCachedOperator;
+ }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index ace9307..d014f1e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -29,8 +29,14 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.WorkerNettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -112,10 +118,29 @@ public class WorkerServer implements IStoppable {
@Value("${server.is-combined-server:false}")
private Boolean isCombinedServer;
+ /**
+ * worker config
+ */
@Autowired
private WorkerConfig workerConfig;
/**
+ * zookeeper registry center
+ */
+ @Autowired
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+ /**
+ * netty remote server
+ */
+ private NettyRemotingServer nettyRemotingServer;
+
+ /**
+ * worker registry
+ */
+ private WorkerRegistry workerRegistry;
+
+ /**
* spring application context
* only use it for initialization
*/
@@ -141,7 +166,17 @@ public class WorkerServer implements IStoppable {
public void run(){
logger.info("start worker server...");
- zkWorkerClient.init();
+ //init remoting server
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService));
+ this.nettyRemotingServer.start();
+
+ //worker registry
+ this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
+ this.workerRegistry.registry();
+
+ this.zkWorkerClient.init();
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
@@ -167,10 +202,10 @@ public class WorkerServer implements IStoppable {
killExecutorService.execute(killProcessThread);
// new fetch task thread
- FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
-
- // submit fetch task thread
- fetchTaskExecutorService.execute(fetchTaskThread);
+// FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
+//
+// // submit fetch task thread
+// fetchTaskExecutorService.execute(fetchTaskThread);
/**
* register hooks, which are called before the process exits
@@ -217,6 +252,9 @@ public class WorkerServer implements IStoppable {
logger.warn("thread sleep exception", e);
}
+ this.nettyRemotingServer.close();
+ this.workerRegistry.unRegistry();
+
try {
heartbeatWorkerService.shutdownNow();
}catch (Exception e){
@@ -260,7 +298,6 @@ public class WorkerServer implements IStoppable {
}
}
-
/**
* heartbeat thread implement
*
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
new file mode 100644
index 0000000..c0db034
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.concurrent.ExecutorService;
+
+
+public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(WorkerNettyRequestProcessor.class);
+
+ private final ProcessService processService;
+
+ private final ExecutorService workerExecService;
+
+ private final WorkerConfig workerConfig;
+
+ public WorkerNettyRequestProcessor(ProcessService processService){
+ this.processService = processService;
+ this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
+ this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+ logger.debug("received command : {}", command);
+ TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class);
+ int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
+ Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
+ // verify tenant is null
+ if (verifyTenantIsNull(tenant, taskInstance)) {
+ processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
+ return;
+ }
+ // set queue for process instance, user-specified queue takes precedence over tenant queue
+ String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+ taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
+ taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+ // local execute path
+ String execLocalPath = getExecLocalPath(taskInstance);
+ logger.info("task instance local execute path : {} ", execLocalPath);
+ // init task
+ taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath);
+ try {
+ FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode());
+ } catch (Exception ex){
+ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
+ }
+ // submit task
+ workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
+ }
+
+ private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
+ if(tenant == null){
+ logger.error("tenant not exists,process instance id : {},task instance id : {}",
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ return true;
+ }
+ return false;
+ }
+
+ private String getExecLocalPath(TaskInstance taskInstance){
+ return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+ taskInstance.getProcessDefine().getId(),
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
new file mode 100644
index 0000000..396b009
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class WorkerRegistry {
+
+ private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
+
+ private final ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+ private final int port;
+
+ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
+ this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+ this.port = port;
+ }
+
+ public void registry() {
+ String address = Constants.LOCAL_ADDRESS;
+ String localNodePath = getWorkerPath();
+ zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
+ zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if(newState == ConnectionState.LOST){
+ logger.error("worker : {} connection lost from zookeeper", address);
+ } else if(newState == ConnectionState.RECONNECTED){
+ logger.info("worker : {} reconnected to zookeeper", address);
+ zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
+ } else if(newState == ConnectionState.SUSPENDED){
+ logger.warn("worker : {} connection SUSPENDED ", address);
+ }
+ }
+ });
+ logger.info("scheduler node : {} registry to ZK successfully.", address);
+ }
+
+ public void unRegistry() {
+ String address = getLocalAddress();
+ String localNodePath = getWorkerPath();
+ zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
+ logger.info("worker node : {} unRegistry to ZK.", address);
+ }
+
+ private String getWorkerPath() {
+ String address = getLocalAddress();
+ String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
+ return localNodePath;
+ }
+
+ private String getLocalAddress(){
+ return Constants.LOCAL_ADDRESS + ":" + port;
+ }
+}