You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/09/17 02:07:09 UTC
[incubator-dolphinscheduler] branch 1.3.3-release updated:
[Fix-3616][Server] when worker akc/response master exception ,
async retry (#3748)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch 1.3.3-release
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.3-release by this push:
new 29f1123 [Fix-3616][Server] when worker akc/response master exception , async retry (#3748)
29f1123 is described below
commit 29f1123d1d95ddaae7b531d704fb2cc372d0cda2
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Thu Sep 17 10:06:59 2020 +0800
[Fix-3616][Server] when worker akc/response master exception , async retry (#3748)
---
.github/workflows/ci_e2e.yml | 10 +-
.../dolphinscheduler/common/enums/Event.java | 23 +++++
.../remote/command/CommandType.java | 111 ++++++++++++++++++++-
.../remote/command/DBTaskAckCommand.java | 72 +++++++++++++
.../remote/command/DBTaskResponseCommand.java | 71 +++++++++++++
.../server/master/processor/TaskAckProcessor.java | 25 +----
.../master/processor/TaskResponseProcessor.java | 26 +----
.../master/processor/queue/TaskResponseEvent.java | 33 +++++-
.../processor/queue/TaskResponseService.java | 55 +++++++---
.../server/worker/WorkerServer.java | 11 ++
.../server/worker/cache/ResponceCache.java | 94 +++++++++++++++++
.../worker/processor/DBTaskAckProcessor.java | 56 +++++++++++
.../worker/processor/DBTaskResponseProcessor.java | 58 +++++++++++
.../worker/processor/NettyRemoteChannel.java | 8 ++
.../worker/processor/TaskCallbackService.java | 31 +++---
.../worker/processor/TaskExecuteProcessor.java | 10 +-
.../worker/runner/RetryReportTaskStatusThread.java | 92 +++++++++++++++++
.../server/worker/runner/TaskExecuteThread.java | 13 ++-
.../processor/queue/TaskResponseServiceTest.java | 4 +-
19 files changed, 705 insertions(+), 98 deletions(-)
diff --git a/.github/workflows/ci_e2e.yml b/.github/workflows/ci_e2e.yml
index 82c81ef..ebd5364 100644
--- a/.github/workflows/ci_e2e.yml
+++ b/.github/workflows/ci_e2e.yml
@@ -58,7 +58,9 @@ jobs:
wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
sudo dpkg -i google-chrome*.deb
sudo apt-get install -f -y
- wget -N https://chromedriver.storage.googleapis.com/83.0.4103.39/chromedriver_linux64.zip
+ google-chrome -version
+ googleVersion=$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE)
+ wget -N https://chromedriver.storage.googleapis.com/${googleVersion}/chromedriver_linux64.zip
unzip chromedriver_linux64.zip
sudo mv -f chromedriver /usr/local/share/chromedriver
sudo ln -s /usr/local/share/chromedriver /usr/local/bin/chromedriver
@@ -66,9 +68,7 @@ jobs:
run: cd ./e2e && mvn -B clean test
- name: Collect logs
if: failure()
- uses: actions/upload-artifact@v1
+ uses: actions/upload-artifact@v2
with:
name: dslogs
- path: /var/lib/docker/volumes/docker-swarm_dolphinscheduler-logs/_data
-
-
+ path: ${{ github.workspace }}/docker/docker-swarm/dolphinscheduler-logs
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
new file mode 100644
index 0000000..9cec276
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+public enum Event {
+ ACK,
+ RESULT;
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index d1ffc65..17c1e44 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1,110 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log respons
e
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+public enum CommandType {
+
+ /**
+ * remove task log request,
+ */
+ REMOVE_TAK_LOG_REQUEST,
+
+ /**
+ * remove task log response
+ */
+ REMOVE_TAK_LOG_RESPONSE,
+
+ /**
+ * roll view log request
+ */
+ ROLL_VIEW_LOG_REQUEST,
+
+ /**
+ * roll view log response
+ */
+ ROLL_VIEW_LOG_RESPONSE,
+
+ /**
+ * view whole log request
+ */
+ VIEW_WHOLE_LOG_REQUEST,
+
+ /**
+ * view whole log response
+ */
+ VIEW_WHOLE_LOG_RESPONSE,
+
+ /**
+ * get log bytes request
+ */
+ GET_LOG_BYTES_REQUEST,
+
+ /**
+ * get log bytes response
+ */
+ GET_LOG_BYTES_RESPONSE,
+
+
+ WORKER_REQUEST,
+ MASTER_RESPONSE,
+
+ /**
+ * execute task request
+ */
+ TASK_EXECUTE_REQUEST,
+
+ /**
+ * execute task ack
+ */
+ TASK_EXECUTE_ACK,
+
+ /**
+ * execute task response
+ */
+ TASK_EXECUTE_RESPONSE,
+
+ /**
+ * db task ack
+ */
+ DB_TASK_ACK,
+
+ /**
+ * db task response
+ */
+ DB_TASK_RESPONSE,
+
+ /**
+ * kill task
+ */
+ TASK_KILL_REQUEST,
+
+ /**
+ * kill task response
+ */
+ TASK_KILL_RESPONSE,
+
+ /**
+ * ping
+ */
+ PING,
+
+ /**
+ * pong
+ */
+ PONG;
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
new file mode 100644
index 0000000..f37eb97
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * db task ack request command
+ */
+public class DBTaskAckCommand implements Serializable {
+
+ private int taskInstanceId;
+ private int status;
+
+ public DBTaskAckCommand(int status,int taskInstanceId) {
+ this.status = status;
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public int getTaskInstanceId() {
+ return taskInstanceId;
+ }
+
+ public void setTaskInstanceId(int taskInstanceId) {
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ /**
+ * package response command
+ * @return command
+ */
+ public Command convert2Command(){
+ Command command = new Command();
+ command.setType(CommandType.DB_TASK_ACK);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+
+
+ @Override
+ public String toString() {
+ return "DBTaskAckCommand{" +
+ "taskInstanceId=" + taskInstanceId +
+ ", status=" + status +
+ '}';
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
new file mode 100644
index 0000000..a640298
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * db task final result response command
+ */
+public class DBTaskResponseCommand implements Serializable {
+
+ private int taskInstanceId;
+ private int status;
+
+ public DBTaskResponseCommand(int status,int taskInstanceId) {
+ this.status = status;
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public int getTaskInstanceId() {
+ return taskInstanceId;
+ }
+
+ public void setTaskInstanceId(int taskInstanceId) {
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ /**
+ * package response command
+ * @return command
+ */
+ public Command convert2Command(){
+ Command command = new Command();
+ command.setType(CommandType.DB_TASK_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+
+ @Override
+ public String toString() {
+ return "DBTaskResponseCommand{" +
+ "taskInstanceId=" + taskInstanceId +
+ ", status=" + status +
+ '}';
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 7bc3759..9ca9645 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -34,11 +31,9 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.dolphinscheduler.common.Constants.*;
/**
* task ack processor
@@ -57,16 +52,9 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
-
- /**
- * processService
- */
- private ProcessService processService;
-
public TaskAckProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
- this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**
@@ -92,19 +80,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
- taskAckCommand.getTaskInstanceId());
+ taskAckCommand.getTaskInstanceId(),
+ channel);
taskResponseService.addResponse(taskResponseEvent);
-
- while (Stopper.isRunning()){
- TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
-
- if (taskInstance != null && ackStatus.typeIsRunning()){
- break;
- }
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
- }
-
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 721b146..5982f6c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -33,11 +30,9 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.dolphinscheduler.common.Constants.*;
/**
* task response processor
@@ -56,15 +51,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
- /**
- * processService
- */
- private ProcessService processService;
-
public TaskResponseProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
- this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**
@@ -83,25 +72,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
- ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus());
-
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
- responseCommand.getTaskInstanceId());
-
+ responseCommand.getTaskInstanceId(),
+ channel);
taskResponseService.addResponse(taskResponseEvent);
-
- while (Stopper.isRunning()){
- TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-
- if (taskInstance != null && responseStatus.typeIsFinished()){
- break;
- }
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
- }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 9e8813f..494d95f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import java.util.Date;
@@ -76,7 +78,18 @@ public class TaskResponseEvent {
*/
private Event event;
- public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){
+ /**
+ * channel
+ */
+ private Channel channel;
+
+ public static TaskResponseEvent newAck(ExecutionStatus state,
+ Date startTime,
+ String workerAddress,
+ String executePath,
+ String logPath,
+ int taskInstanceId,
+ Channel channel){
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setStartTime(startTime);
@@ -85,10 +98,16 @@ public class TaskResponseEvent {
event.setLogPath(logPath);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.ACK);
+ event.setChannel(channel);
return event;
}
- public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){
+ public static TaskResponseEvent newResult(ExecutionStatus state,
+ Date endTime,
+ int processId,
+ String appIds,
+ int taskInstanceId,
+ Channel channel){
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
@@ -96,6 +115,7 @@ public class TaskResponseEvent {
event.setAppIds(appIds);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.RESULT);
+ event.setChannel(channel);
return event;
}
@@ -179,8 +199,11 @@ public class TaskResponseEvent {
this.event = event;
}
- public enum Event{
- ACK,
- RESULT;
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public void setChannel(Channel channel) {
+ this.channel = channel;
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index b9772ca..f365db7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -17,7 +17,13 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,23 +127,48 @@ public class TaskResponseService {
* @param taskResponseEvent taskResponseEvent
*/
private void persist(TaskResponseEvent taskResponseEvent){
- TaskResponseEvent.Event event = taskResponseEvent.getEvent();
+ Event event = taskResponseEvent.getEvent();
+ Channel channel = taskResponseEvent.getChannel();
switch (event){
case ACK:
- processService.changeTaskState(taskResponseEvent.getState(),
- taskResponseEvent.getStartTime(),
- taskResponseEvent.getWorkerAddress(),
- taskResponseEvent.getExecutePath(),
- taskResponseEvent.getLogPath(),
- taskResponseEvent.getTaskInstanceId());
+ try {
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
+ if (taskInstance != null){
+ processService.changeTaskState(taskResponseEvent.getState(),
+ taskResponseEvent.getStartTime(),
+ taskResponseEvent.getWorkerAddress(),
+ taskResponseEvent.getExecutePath(),
+ taskResponseEvent.getLogPath(),
+ taskResponseEvent.getTaskInstanceId());
+ }
+ // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
+ DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskAckCommand.convert2Command());
+ }catch (Exception e){
+ logger.error("worker ack master error",e);
+ DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1);
+ channel.writeAndFlush(taskAckCommand.convert2Command());
+ }
break;
case RESULT:
- processService.changeTaskState(taskResponseEvent.getState(),
- taskResponseEvent.getEndTime(),
- taskResponseEvent.getProcessId(),
- taskResponseEvent.getAppIds(),
- taskResponseEvent.getTaskInstanceId());
+ try {
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
+ if (taskInstance != null){
+ processService.changeTaskState(taskResponseEvent.getState(),
+ taskResponseEvent.getEndTime(),
+ taskResponseEvent.getProcessId(),
+ taskResponseEvent.getAppIds(),
+ taskResponseEvent.getTaskInstanceId());
+ }
+ // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
+ DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskResponseCommand.convert2Command());
+ }catch (Exception e){
+ logger.error("worker response master error",e);
+ DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1);
+ channel.writeAndFlush(taskResponseCommand.convert2Command());
+ }
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index f0833cb..6895de3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -22,9 +22,12 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +73,9 @@ public class WorkerServer {
@Autowired
private SpringApplicationContext springApplicationContext;
+ @Autowired
+ private RetryReportTaskStatusThread retryReportTaskStatusThread;
+
/**
* worker server startup
*
@@ -95,11 +101,16 @@ public class WorkerServer {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.start();
// worker registry
this.workerRegistry.registry();
+ // retry report task status
+ this.retryReportTaskStatusThread.start();
+
/**
* register hooks, which are called before the process exits
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
new file mode 100644
index 0000000..3639b8e
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.cache;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.remote.command.Command;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Responce Cache : cache worker send master result
+ */
+public class ResponceCache {
+
+ private static final ResponceCache instance = new ResponceCache();
+
+ private ResponceCache(){}
+
+ public static ResponceCache get(){
+ return instance;
+ }
+
+ private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
+ private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
+
+
+ /**
+ * cache response
+ * @param taskInstanceId taskInstanceId
+ * @param command command
+ * @param event event ACK/RESULT
+ */
+ public void cache(Integer taskInstanceId, Command command, Event event){
+ switch (event){
+ case ACK:
+ ackCache.put(taskInstanceId,command);
+ break;
+ case RESULT:
+ responseCache.put(taskInstanceId,command);
+ break;
+ default:
+ throw new IllegalArgumentException("invalid event type : " + event);
+ }
+ }
+
+
+ /**
+ * remove ack cache
+ * @param taskInstanceId taskInstanceId
+ */
+ public void removeAckCache(Integer taskInstanceId){
+ ackCache.remove(taskInstanceId);
+ }
+
+ /**
+ * remove reponse cache
+ * @param taskInstanceId taskInstanceId
+ */
+ public void removeResponseCache(Integer taskInstanceId){
+ responseCache.remove(taskInstanceId);
+ }
+
+ /**
+ * getAckCache
+ * @return getAckCache
+ */
+ public Map<Integer,Command> getAckCache(){
+ return ackCache;
+ }
+
+ /**
+ * getResponseCache
+ * @return getResponseCache
+ */
+ public Map<Integer,Command> getResponseCache(){
+ return responseCache;
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
new file mode 100644
index 0000000..ff0fd8e
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.remote.command.*;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * db task ack processor
+ */
+public class DBTaskAckProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class);
+
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+
+ DBTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
+ command.getBody(), DBTaskAckCommand.class);
+
+ if (taskAckCommand == null){
+ return;
+ }
+
+ if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
+ ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
+ }
+ }
+
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
new file mode 100644
index 0000000..126defd
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * db task response processor
+ */
+public class DBTaskResponseProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class);
+
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+
+ DBTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize(
+ command.getBody(), DBTaskResponseCommand.class);
+
+ if (taskResponseCommand == null){
+ return;
+ }
+
+ if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
+ ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
+ }
+ }
+
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
index cbb8972..9762b10 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
+import java.util.Random;
+
/**
* callback channel
*/
@@ -50,6 +52,12 @@ public class NettyRemoteChannel {
this.opaque = opaque;
}
+ public NettyRemoteChannel(Channel channel) {
+ this.channel = channel;
+ this.host = ChannelUtils.toAddress(channel);
+ this.opaque = -1;
+ }
+
public Channel getChannel() {
return channel;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 6a23a9e..ca7d3c6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -86,20 +86,19 @@ public class TaskCallbackService {
* @return callback channel
*/
private NettyRemoteChannel getRemoteChannel(int taskInstanceId){
+ Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
- if(nettyRemoteChannel == null){
- throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first");
- }
- if(nettyRemoteChannel.isActive()){
- return nettyRemoteChannel;
- }
- Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
- if(newChannel != null){
- return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
+ if(nettyRemoteChannel != null){
+ if(nettyRemoteChannel.isActive()){
+ return nettyRemoteChannel;
+ }
+ newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
+ if(newChannel != null){
+ return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
+ }
+
}
- logger.warn("original master : {} for task : {} is not reachable, random select master",
- nettyRemoteChannel.getHost(),
- taskInstanceId);
+
Set<String> masterNodes = null;
int ntries = 0;
while (Stopper.isRunning()) {
@@ -119,7 +118,7 @@ public class TaskCallbackService {
for (String masterNode : masterNodes) {
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
if (newChannel != null) {
- return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
+ return getRemoteChannel(newChannel,taskInstanceId);
}
}
masterNodes = null;
@@ -141,6 +140,12 @@ public class TaskCallbackService {
return remoteChannel;
}
+ private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId){
+ NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
+ addRemoteChannel(taskInstanceId, remoteChannel);
+ return remoteChannel;
+ }
+
/**
* remove callback channels
* @param taskInstanceId taskInstanceId
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 4ca110f..5ecc2c7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@@ -36,6 +37,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -101,12 +103,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
- try {
- this.doAck(taskExecutionContext);
- }catch (Exception e){
- ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
- this.doAck(taskExecutionContext);
- }
+ this.doAck(taskExecutionContext);
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
@@ -115,6 +112,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private void doAck(TaskExecutionContext taskExecutionContext){
// tell master that task is in executing
TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
+ ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
new file mode 100644
index 0000000..ea9bb03
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.thread.Stopper;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * Retry Report Task Status Thread
+ */
+@Component
+public class RetryReportTaskStatusThread implements Runnable {
+
+ private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
+
+ /**
+ * every 5 minutes
+ */
+ private static long RETRY_REPORT_TASK_STATUS_TIME = 5 * 60 * 1000L;
+ /**
+ * task callback service
+ */
+ private final TaskCallbackService taskCallbackService;
+
+ public void start(){
+ Thread thread = new Thread(this,"RetryReportTaskStatusThread");
+ thread.start();
+ }
+
+ public RetryReportTaskStatusThread(){
+ this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
+ }
+
+ /**
+ * retry ack/response
+ */
+ @Override
+ public void run() {
+ ResponceCache responceCache = ResponceCache.get();
+
+ while (Stopper.isRunning()){
+ try {
+ if (!responceCache.getAckCache().isEmpty()){
+ Map<Integer,Command> ackCache = responceCache.getAckCache();
+ for (Map.Entry<Integer, Command> entry : ackCache.entrySet()){
+ Integer taskInstanceId = entry.getKey();
+ Command ackCommand = entry.getValue();
+ taskCallbackService.sendAck(taskInstanceId,ackCommand);
+ }
+ }
+
+ if (!responceCache.getResponseCache().isEmpty()){
+ Map<Integer,Command> responseCache = responceCache.getResponseCache();
+ for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){
+ Integer taskInstanceId = entry.getKey();
+ Command responseCommand = entry.getValue();
+ taskCallbackService.sendAck(taskInstanceId,responseCommand);
+ }
+ }
+ }catch (Exception e){
+ logger.warn("retry report task status error", e);
+ }
+
+ ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_TIME);
+ }
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 592060b..e224896 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
@@ -144,13 +146,10 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
} finally {
- try {
- taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
- }catch (Exception e){
- ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
- taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
- }
+ taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT);
+ taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+
}
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index dcba832..a22663e 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -44,7 +44,7 @@ public class TaskResponseServiceTest {
@Test
public void testAdd(){
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
- "", "", "", 1);
+ "", "", "", 1,null);
taskResponseService.addResponse(taskResponseEvent);
Assert.assertTrue(taskResponseService.getEventQueue().size() == 1);
try {
@@ -58,7 +58,7 @@ public class TaskResponseServiceTest {
@Test
public void testStop(){
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
- "", "", "", 1);
+ "", "", "", 1,null);
taskResponseService.addResponse(taskResponseEvent);
taskResponseService.stop();
Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);