You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/19 13:57:22 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
Refactor worker (#1979)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new ac93d78 Refactor worker (#1979)
ac93d78 is described below
commit ac93d783796e4819ac6a46b1bab7efa272e79b47
Author: Tboy <gu...@immomo.com>
AuthorDate: Wed Feb 19 21:57:16 2020 +0800
Refactor worker (#1979)
* updates
* move FetchTaskThread logic to WorkerNettyRequestProcessor
* add NettyRemotingClient to scheduler thread
* refactor TaskScheduleThread, add TaskInstanceCallbackService
---
.../remote/command/CommandType.java | 2 +-
.../remote/command/ExecuteTaskAckCommand.java | 1 +
.../remote/command/ExecuteTaskResponseCommand.java | 2 +-
.../apache/dolphinscheduler/remote/utils/Pair.java | 4 ++
.../server/worker/processor/CallbackChannel.java | 38 +++++------
.../processor/TaskInstanceCallbackService.java | 76 ++++++++++++++++++++++
.../processor/WorkerNettyRequestProcessor.java | 8 ++-
.../server/worker/runner/TaskScheduleThread.java | 73 +++++++++++----------
8 files changed, 146 insertions(+), 58 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index b1b24d3..79ef2d9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
EXECUTE_TASK_REQUEST,
/**
* execute task response
*/
EXECUTE_TASK_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
EXECUTE_TASK_REQUEST,
/**
* execute task ack
*/
EXECUTE_TASK_ACK,
/**
* execute task response
*/
EXECUTE_TASK_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
new file mode 100644
index 0000000..24ab68f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
request command
*/
public class ExecuteTaskAckCommand implements Serializable {
private int taskInstanceId;
private Date startTime;
private String host;
private int status;
private String logPath;
private String executePath;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = lo
gPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.EXECUTE_TASK_ACK);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
index 7e35fa6..3e6d5c1 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong
;
/**
* execute task response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
/**
* task id
*/
private String taskId;
/**
* attempt id
*/
private String attemptId;
/**
* return result
*/
private Object result;
/**
* received time
*/
private long receivedTime;
/**
* execute count
*/
private int executeCount;
/**
* execute time
*/
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(long opaque){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public Command convert2Command(){
Command command = new Command();
com
mand.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
index 2042191..33bf8ca 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
@@ -50,4 +50,8 @@ public class Pair<L, R> {
public void setRight(R right) {
this.right = right;
}
+
+ public static <L, R> Pair of(L left, R right){
+ return new Pair(left, right);
+ }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
similarity index 60%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
copy to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
index 2042191..95345c0 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
@@ -15,39 +15,35 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.remote.utils;
+package org.apache.dolphinscheduler.server.worker.processor;
+import io.netty.channel.Channel;
-/**
- * key value pair
- *
- * @param <L> L generic type
- * @param <R> R generic type
- */
-public class Pair<L, R> {
- private L left;
+public class CallbackChannel {
+
+ private Channel channel;
- private R right;
+ private long opaque;
- public Pair(L left, R right) {
- this.left = left;
- this.right = right;
+ public CallbackChannel(Channel channel, long opaque) {
+ this.channel = channel;
+ this.opaque = opaque;
}
- public L getLeft() {
- return left;
+ public Channel getChannel() {
+ return channel;
}
- public void setLeft(L left) {
- this.left = left;
+ public void setChannel(Channel channel) {
+ this.channel = channel;
}
- public R getRight() {
- return right;
+ public long getOpaque() {
+ return opaque;
}
- public void setRight(R right) {
- this.right = right;
+ public void setOpaque(long opaque) {
+ this.opaque = opaque;
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java
new file mode 100644
index 0000000..0480d94
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TaskInstanceCallbackService {
+
+ private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
+
+ public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){
+ CALL_BACK_CHANNELS.put(taskInstanceId, channel);
+ }
+
+ public CallbackChannel getCallbackChannel(int taskInstanceId){
+ CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId);
+ if(callbackChannel.getChannel().isActive()){
+ return callbackChannel;
+ }
+ Channel newChannel = createChannel();
+ callbackChannel.setChannel(newChannel);
+ CALL_BACK_CHANNELS.put(taskInstanceId, callbackChannel);
+ return callbackChannel;
+ }
+
+ public void remove(int taskInstanceId){
+ CALL_BACK_CHANNELS.remove(taskInstanceId);
+ }
+
+ public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
+ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
+ callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
+ }
+
+ public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
+ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
+ callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(future.isSuccess()){
+ remove(taskInstanceId);
+ return;
+ }
+ }
+ });
+ }
+
+ //TODO
+ private Channel createChannel(){
+ return null;
+ }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
index c0db034..2e5ea99 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
@@ -51,8 +51,11 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
private final WorkerConfig workerConfig;
+ private final TaskInstanceCallbackService taskInstanceCallbackService;
+
public WorkerNettyRequestProcessor(ProcessService processService){
this.processService = processService;
+ this.taskInstanceCallbackService = new TaskInstanceCallbackService();
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
}
@@ -62,6 +65,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
logger.debug("received command : {}", command);
TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class);
+ //TODO 需要干掉,然后移到master里面。
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
@@ -73,6 +77,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+ //TODO 到这里。
// local execute path
String execLocalPath = getExecLocalPath(taskInstance);
logger.info("task instance local execute path : {} ", execLocalPath);
@@ -84,7 +89,8 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
// submit task
- workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
+ taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque()));
+ workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService));
}
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index a69cffd..96cb0c2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -24,17 +24,21 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.server.worker.processor.TaskInstanceCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@@ -74,22 +78,31 @@ public class TaskScheduleThread implements Runnable {
private AbstractTask task;
/**
+ * task instance callback service
+ */
+ private TaskInstanceCallbackService taskInstanceCallbackService;
+
+ /**
* constructor
*
* @param taskInstance task instance
* @param processService process dao
*/
- public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService){
+ public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskInstanceCallbackService taskInstanceCallbackService){
this.processService = processService;
this.taskInstance = taskInstance;
+ this.taskInstanceCallbackService = taskInstanceCallbackService;
}
@Override
public void run() {
+ ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
+
try {
- // update task state is running according to task type
- updateTaskState(taskInstance.getTaskType());
+ // tell master that task is in executing
+ ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType());
+ taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
logger.info("script path : {}", taskInstance.getExecutePath());
// task node
@@ -148,22 +161,21 @@ public class TaskScheduleThread implements Runnable {
// task result process
task.after();
+ //
+ responseCommand.setStatus(task.getExitStatus().getCode());
+ responseCommand.setEndTime(new Date());
+ logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus());
+
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
- // update task instance state
- processService.changeTaskState(ExecutionStatus.FAILURE,
- new Date(),
- taskInstance.getId());
+ responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
+ responseCommand.setEndTime(new Date());
+
+ } finally {
+ taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
}
- logger.info("task instance id : {},task final status : {}",
- taskInstance.getId(),
- task.getExitStatus());
- // update task instance state
- processService.changeTaskState(task.getExitStatus(),
- new Date(),
- taskInstance.getId());
}
/**
@@ -182,29 +194,22 @@ public class TaskScheduleThread implements Runnable {
}
return globalParamsMap;
}
-
/**
- * update task state according to task type
+ * build ack command
* @param taskType
*/
- private void updateTaskState(String taskType) {
- // update task status is running
- if(taskType.equals(TaskType.SQL.name()) ||
- taskType.equals(TaskType.PROCEDURE.name())){
- processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- null,
- getTaskLogPath(),
- taskInstance.getId());
+ private ExecuteTaskAckCommand buildAckCommand(String taskType) {
+ ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
+ ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
+ ackCommand.setLogPath(getTaskLogPath());
+ ackCommand.setHost("localhost");
+ ackCommand.setStartTime(new Date());
+ if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
+ ackCommand.setExecutePath(null);
}else{
- processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- taskInstance.getExecutePath(),
- getTaskLogPath(),
- taskInstance.getId());
+ ackCommand.setExecutePath(taskInstance.getExecutePath());
}
+ return ackCommand;
}
/**