You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/02 14:15:16 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
refactor kill logic (#2060)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 71c11bd refactor kill logic (#2060)
71c11bd is described below
commit 71c11bd59474787da9ad49d2a30c312afad41440
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Mar 2 22:15:08 2020 +0800
refactor kill logic (#2060)
---
.../remote/command/CommandType.java | 2 +-
...kAckCommand.java => TaskExecuteAckCommand.java} | 2 +-
...Command.java => TaskExecuteRequestCommand.java} | 2 +-
...ommand.java => TaskExecuteResponseCommand.java} | 2 +-
...estCommand.java => TaskKillRequestCommand.java} | 2 +-
...seCommand.java => TaskKillResponseCommand.java} | 2 +-
.../server/master/MasterServer.java | 6 +-
.../master/cache/TaskInstanceCacheManager.java | 10 +-
.../cache/impl/TaskInstanceCacheManagerImpl.java | 16 +--
.../master/dispatch/executor/ExecutorManager.java | 8 ++
.../dispatch/executor/NettyExecutorManager.java | 20 +++-
.../master/dispatch/executor/NettyKillManager.java | 119 ---------------------
.../server/master/processor/TaskAckProcessor.java | 8 +-
.../processor/TaskKillResponseProcessor.java | 13 +--
.../master/processor/TaskResponseProcessor.java | 6 +-
.../server/master/runner/MasterTaskExecThread.java | 8 +-
.../server/worker/WorkerServer.java | 4 +-
.../worker/processor/KillTaskCallbackService.java | 116 --------------------
.../worker/processor/TaskCallbackService.java | 17 +--
.../worker/processor/TaskExecuteProcessor.java | 22 ++--
.../server/worker/processor/TaskKillProcessor.java | 44 ++++----
.../server/worker/runner/TaskExecuteThread.java | 6 +-
22 files changed, 107 insertions(+), 328 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 053b38f..c8d5659 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
EXECUTE_TASK_REQUEST,
/**
* execute task ack
*/
EXECUTE_TASK_ACK,
/**
* execute task response
*/
EXECUTE_TASK_RESPONSE,
/**
* kill task
*/
KILL_TASK_REQUEST,
/**
* kill task response
*/
KILL_TASK_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
similarity index 92%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
index 52d0a5d..0b3d901 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
request command
*/
public class ExecuteTaskAckCommand implements Serializable {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* startTime
*/
private Date startTime;
/**
* host
*/
private String host;
/**
* status
*/
private int status;
/**
* logPath
*/
private String logPath;
/**
* executePath
*/
private String executePath;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstan
ceId) {
this.taskInstanceId = taskInstanceId;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_ACK);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskAckCommand{" +
"taskInstanceId=" + taskInstanceId +
", startTime=" + startTime +
", host='" + host + '\'' +
", status=" + status +
"
, logPath='" + logPath + '\'' +
", executePath='" + executePath + '\'' +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
request command
*/
public class TaskExecuteAckCommand implements Serializable {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* startTime
*/
private Date startTime;
/**
* host
*/
private String host;
/**
* status
*/
private int status;
/**
* logPath
*/
private String logPath;
/**
* executePath
*/
private String executePath;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstan
ceId) {
this.taskInstanceId = taskInstanceId;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_ACK);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskExecuteAckCommand{" +
"taskInstanceId=" + taskInstanceId +
", startTime=" + startTime +
", host='" + host + '\'' +
", status=" + status +
"
, logPath='" + logPath + '\'' +
", executePath='" + executePath + '\'' +
'}';
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
similarity index 82%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
index e7564ed..637724f 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
pub
lic class ExecuteTaskRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
public ExecuteTaskRequestCommand() {
}
public ExecuteTaskRequestCommand(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskRequestCommand{" +
"taskExecutionContext='" + taskExecutionContext + '\'' +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
pub
lic class TaskExecuteRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
public TaskExecuteRequestCommand() {
}
public TaskExecuteRequestCommand(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskExecuteRequestCommand{" +
"taskExecutionContext='" + taskExecutionContext + '\'' +
'}';
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
similarity index 87%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index 707bf07..deb6f5d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
/**
* processId
*/
private int processId;
/**
* appIds
*/
private String appIds;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
t
his.endTime = endTime;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
/**
* package response command
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskResponseCommand{" +
"taskInstanceId=" + taskInstanceId +
", status=" + status +
", endTime=" + endTime +
", processId=" + processId +
", appIds='" + appIds + '\'' +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
response command
*/
public class TaskExecuteResponseCommand implements Serializable {
public TaskExecuteResponseCommand() {
}
public TaskExecuteResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
/**
* processId
*/
private int processId;
/**
* appIds
*/
private String appIds;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
t
his.endTime = endTime;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
/**
* package response command
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskExecuteResponseCommand{" +
"taskInstanceId=" + taskInstanceId +
", status=" + status +
", endTime=" + endTime +
", processId=" + processId +
", appIds='" + appIds + '\'' +
'}';
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
similarity index 83%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
index b8cfd89..e5c756a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* kill task request command
*/
public
class KillTaskRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
public KillTaskRequestCommand() {
}
public KillTaskRequestCommand(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.KILL_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "KillTaskRequestCommand{" +
"taskExecutio
nContext='" + taskExecutionContext + '\'' +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* kill task request command
*/
public
class TaskKillRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
public TaskKillRequestCommand() {
}
public TaskKillRequestCommand(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_KILL_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskKillRequestCommand{" +
"taskExecutio
nContext='" + taskExecutionContext + '\'' +
'}';
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
similarity index 93%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
index 515bd07..2ca2330 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* kill task response command
*/
public class KillTaskResponseCommand implements Serializable {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* host
*/
private String host;
/**
* status
*/
private int status;
/**
* processId
*/
private int processId;
/**
* other resource manager appId , for example : YARN etc
*/
protected List<String> appIds;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getProcessId() {
return processId;
}
public void setPr
ocessId(int processId) {
this.processId = processId;
}
public List<String> getAppIds() {
return appIds;
}
public void setAppIds(List<String> appIds) {
this.appIds = appIds;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.KILL_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "KillTaskResponseCommand{" +
"taskInstanceId=" + taskInstanceId +
", host='" + host + '\'' +
", status=" + status +
", processId=" + processId +
", appIds=" + appIds +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* kill task response command
*/
public class TaskKillResponseCommand implements Serializable {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* host
*/
private String host;
/**
* status
*/
private int status;
/**
* processId
*/
private int processId;
/**
* other resource manager appId , for example : YARN etc
*/
protected List<String> appIds;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getProcessId() {
return processId;
}
public void setPr
ocessId(int processId) {
this.processId = processId;
}
public List<String> getAppIds() {
return appIds;
}
public void setAppIds(List<String> appIds) {
this.appIds = appIds;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_KILL_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskKillResponseCommand{" +
"taskInstanceId=" + taskInstanceId +
", host='" + host + '\'' +
", status=" + status +
", processId=" + processId +
", appIds=" + appIds +
'}';
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 4c0c3e8..9c46ad6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -126,9 +126,9 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(45678);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
//
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
index a62ee49..031d8b2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
@@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
@@ -47,14 +47,14 @@ public interface TaskInstanceCacheManager {
*
* @param taskAckCommand taskAckCommand
*/
- void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand);
+ void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand);
/**
* cache taskInstance
*
- * @param executeTaskResponseCommand executeTaskResponseCommand
+ * @param taskExecuteResponseCommand taskExecuteResponseCommand
*/
- void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand);
+ void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand);
/**
* remove taskInstance by taskInstanceId
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index dc775d8..c149ac3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -18,8 +18,8 @@ package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -86,7 +86,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
* @param taskAckCommand taskAckCommand
*/
@Override
- public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) {
+ public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
taskInstance.setStartTime(taskAckCommand.getStartTime());
@@ -99,13 +99,13 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
/**
* cache taskInstance
*
- * @param executeTaskResponseCommand executeTaskResponseCommand
+ * @param taskExecuteResponseCommand taskExecuteResponseCommand
*/
@Override
- public void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand) {
- TaskInstance taskInstance = getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId());
- taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus()));
- taskInstance.setEndTime(executeTaskResponseCommand.getEndTime());
+ public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) {
+ TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
+ taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
+ taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
}
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index f1707df..c8cd9d1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -42,6 +42,14 @@ public interface ExecutorManager<T> {
T execute(ExecutionContext context) throws ExecuteException;
/**
+ * execute task directly without retry
+ * @param context context
+ * @return T
+ * @throws ExecuteException
+ */
+ void executeDirectly(ExecutionContext context) throws ExecuteException;
+
+ /**
* after execute
* @param context context
* @throws ExecuteException
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 544a958..286b0e6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -21,7 +21,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
@@ -68,8 +69,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
*/
- this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
- this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
}
@@ -128,13 +130,19 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
return success;
}
+ public void executeDirectly(ExecutionContext context) throws ExecuteException {
+ Command command = buildCommand(context);
+ Host host = context.getHost();
+ doExecute(host,command);
+ }
+
/**
* build command
* @param context context
* @return command
*/
private Command buildCommand(ExecutionContext context) {
- ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
+ TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
@@ -203,4 +211,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
}
return nodes;
}
+
+ public NettyRemotingClient getNettyRemotingClient() {
+ return nettyRemotingClient;
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java
deleted file mode 100644
index 54d0022..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.dispatch.executor;
-
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * netty executor manager
- */
-@Service
-public class NettyKillManager extends AbstractExecutorManager<Boolean>{
-
- private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class);
- /**
- * netty remote client
- */
- private final NettyRemotingClient nettyRemotingClient;
-
- public NettyKillManager(){
- final NettyClientConfig clientConfig = new NettyClientConfig();
- this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
- /**
- * register KILL_TASK_RESPONSE command type TaskKillResponseProcessor
- */
- this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
- }
-
- /**
- * execute logic
- *
- * @param context context
- * @return result
- * @throws ExecuteException
- */
- @Override
- public Boolean execute(ExecutionContext context) throws ExecuteException {
- Host host = context.getHost();
- Command command = buildCommand(context);
- try {
- doExecute(host, command);
- return true;
- }catch (ExecuteException ex) {
- logger.error(String.format("execute context : %s error", context.getContext()), ex);
- return false;
- }
- }
-
-
- private Command buildCommand(ExecutionContext context) {
- KillTaskRequestCommand requestCommand = new KillTaskRequestCommand();
- TaskExecutionContext taskExecutionContext = context.getContext();
-
- requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
- return requestCommand.convert2Command();
- }
-
- /**
- * execute logic
- * @param host host
- * @param command command
- * @throws ExecuteException
- */
- private void doExecute(final Host host, final Command command) throws ExecuteException {
- /**
- * retry count,default retry 3
- */
- int retryCount = 3;
- boolean success = false;
- do {
- try {
- nettyRemotingClient.send(host, command);
- success = true;
- } catch (Exception ex) {
- logger.error(String.format("send command : %s to %s error", command, host), ex);
- retryCount--;
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignore) {}
- }
- } while (retryCount >= 0 && !success);
-
- if (!success) {
- throw new ExecuteException(String.format("send command : %s to %s error", command, host));
- }
- }
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index cf38579..ef2cb67 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@@ -57,12 +57,12 @@ public class TaskAckProcessor implements NettyRequestProcessor {
/**
* task ack process
* @param channel channel channel
- * @param command command ExecuteTaskAckCommand
+ * @param command command TaskExecuteAckCommand
*/
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
- ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
+ Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
+ TaskExecuteAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);
logger.info("taskAckCommand : {}", taskAckCommand);
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index d6c3f69..4986e89 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -18,19 +18,12 @@
package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,9 +43,9 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
+ Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
- KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class);
+ TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);
logger.info("received command : {}", responseCommand);
logger.info("已经接受到了worker杀任务的回应");
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index ed76153..93ca4ab 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@@ -63,9 +63,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
+ Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
- ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
+ TaskExecuteResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", responseCommand);
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 07f9168..995a4e8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
private TaskInstanceCacheManager taskInstanceCacheManager;
- private NettyKillManager nettyKillManager;
+ private NettyExecutorManager nettyExecutorManager;
/**
* constructor of MasterTaskExecThread
@@ -67,7 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
super(taskInstance, processInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
- this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class);
+ this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
}
/**
@@ -191,7 +191,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
host.setPort(12346);
executionContext.setHost(host);
- nettyKillManager.execute(executionContext);
+ nettyExecutorManager.executeDirectly(executionContext);
logger.info("master add kill task :{} id:{} to kill queue",
taskInstance.getName(), taskInstance.getId() );
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index ec43dd8..cfb94b5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -127,8 +127,8 @@ public class WorkerServer implements IStoppable {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new TaskExecuteProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start();
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java
deleted file mode 100644
index 65342bd..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.processor;
-
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * taks callback service
- */
-public class KillTaskCallbackService {
-
- private final Logger logger = LoggerFactory.getLogger(KillTaskCallbackService.class);
-
- /**
- * remote channels
- */
- private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
-
- /**
- * netty remoting client
- */
- private final NettyRemotingClient nettyRemotingClient;
-
-
- public KillTaskCallbackService(){
- final NettyClientConfig clientConfig = new NettyClientConfig();
- this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
- }
-
- /**
- * add callback channel
- * @param taskInstanceId taskInstanceId
- * @param channel channel
- */
- public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){
- REMOTE_CHANNELS.put(taskInstanceId, channel);
- }
-
- /**
- * get callback channel
- * @param taskInstanceId taskInstanceId
- * @return callback channel
- */
- public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
- NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
- if(nettyRemoteChannel.isActive()){
- return nettyRemoteChannel;
- }
- Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
- if(newChannel != null){
- NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
- addRemoteChannel(taskInstanceId, remoteChannel);
- return remoteChannel;
- }
- return null;
- }
-
- /**
- * remove callback channels
- * @param taskInstanceId taskInstanceId
- */
- public void remove(int taskInstanceId){
- REMOTE_CHANNELS.remove(taskInstanceId);
- }
-
- /**
- * send result
- *
- * @param taskInstanceId taskInstanceId
- * @param killTaskResponseCommand killTaskResponseCommand
- */
- public void sendKillResult(int taskInstanceId, KillTaskResponseCommand killTaskResponseCommand){
- NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
- if(nettyRemoteChannel == null){
- //TODO
- } else{
- nettyRemoteChannel.writeAndFlush(killTaskResponseCommand.convert2Command()).addListener(new ChannelFutureListener(){
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if(future.isSuccess()){
- remove(taskInstanceId);
- return;
- }
- }
- });
- }
- }
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 23ac7e2..02d889b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -22,17 +22,18 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
/**
* taks callback service
*/
+@Service
public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
@@ -92,14 +93,14 @@ public class TaskCallbackService {
/**
* send ack
* @param taskInstanceId taskInstanceId
- * @param ackCommand ackCommand
+ * @param command command
*/
- public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
+ public void sendAck(int taskInstanceId, Command command){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
//TODO
} else{
- nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command());
+ nettyRemoteChannel.writeAndFlush(command);
}
}
@@ -107,14 +108,14 @@ public class TaskCallbackService {
* send result
*
* @param taskInstanceId taskInstanceId
- * @param responseCommand responseCommand
+ * @param command command
*/
- public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
+ public void sendResult(int taskInstanceId, Command command){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
//TODO
} else{
- nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
+ nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 98e4e92..b53ef17 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@@ -69,18 +69,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final TaskCallbackService taskCallbackService;
public TaskExecuteProcessor(){
- this.taskCallbackService = new TaskCallbackService();
+ this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
}
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(),
+ Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
logger.info("received command : {}", command);
- ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
- command.getBody(), ExecuteTaskRequestCommand.class);
+ TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
+ command.getBody(), TaskExecuteRequestCommand.class);
String contextJson = taskRequestCommand.getTaskExecutionContext();
@@ -105,8 +105,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private void doAck(TaskExecutionContext taskExecutionContext){
// tell master that task is in executing
- ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
- taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+ TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
+ taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}
/**
@@ -134,10 +134,10 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/**
* build ack command
* @param taskExecutionContext taskExecutionContext
- * @return ExecuteTaskAckCommand
+ * @return TaskExecuteAckCommand
*/
- private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
- ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
+ private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
+ TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index c910aed..c23f199 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@@ -58,7 +58,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/**
* task callback service
*/
- private final KillTaskCallbackService killTaskCallbackService;
+ private final TaskCallbackService taskCallbackService;
/**
* taskExecutionContextCacheManager
@@ -72,7 +72,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
public TaskKillProcessor(){
- this.killTaskCallbackService = new KillTaskCallbackService();
+ this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
@@ -115,41 +115,41 @@ public class TaskKillProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
- KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
- logger.info("received command : {}", killTaskRequestCommand);
+ Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+ TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
+ logger.info("received command : {}", taskKillRequestCommand);
- String contextJson = killTaskRequestCommand.getTaskExecutionContext();
+ String contextJson = taskKillRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
Boolean killStatus = doKill(taskExecutionContext);
- killTaskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+ taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
- KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
- killTaskCallbackService.sendKillResult(killTaskResponseCommand.getTaskInstanceId(),killTaskResponseCommand);
+ TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
+ taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
}
/**
- * build KillTaskResponseCommand
+ * build TaskKillResponseCommand
*
* @param taskExecutionContext taskExecutionContext
* @param killStatus killStatus
- * @return build KillTaskResponseCommand
+ * @return build TaskKillResponseCommand
*/
- private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
+ private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
Boolean killStatus) {
- KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand();
- killTaskResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- killTaskResponseCommand.setHost(taskExecutionContext.getHost());
- killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
- killTaskResponseCommand.setProcessId(taskExecutionContext.getProcessId());
- killTaskResponseCommand.setAppIds(appIds);
-
- return killTaskResponseCommand;
+ TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
+ taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskKillResponseCommand.setHost(taskExecutionContext.getHost());
+ taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
+ taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+ taskKillResponseCommand.setAppIds(appIds);
+
+ return taskKillResponseCommand;
}
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index d3161ec..347dfb6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@@ -75,7 +75,7 @@ public class TaskExecuteThread implements Runnable {
@Override
public void run() {
- ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
+ TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// task node
@@ -134,7 +134,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
} finally {
- taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
+ taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
}