You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2021/12/25 04:26:50 UTC
[dolphinscheduler] branch dev updated: to #7609 (#7611)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0d861fe to #7609 (#7611)
0d861fe is described below
commit 0d861fe46af595156342b56ef2edf3a2f24a0e4c
Author: zwZjut <zw...@163.com>
AuthorDate: Sat Dec 25 12:26:43 2021 +0800
to #7609 (#7611)
Co-authored-by: honghuo.zw <ho...@alibaba-inc.com>
---
.../server/worker/processor/DBTaskResponseProcessor.java | 10 +++++++---
.../server/worker/processor/TaskCallbackService.java | 9 ++++-----
.../server/worker/processor/TaskKillProcessor.java | 2 ++
3 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index 97a9cf5..9c0815f 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
-import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
@@ -25,11 +24,14 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+
/**
* db task response processor
*/
@@ -45,12 +47,14 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor {
DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject(
command.getBody(), DBTaskResponseCommand.class);
- if (taskResponseCommand == null){
+ if (taskResponseCommand == null) {
return;
}
- if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
+ if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
+ TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
+ logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId());
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 96ec36b..09b2c3a 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,7 +124,7 @@ public class TaskCallbackService {
*
* @param taskInstanceId taskInstanceId
*/
- public void remove(int taskInstanceId) {
+ public static void remove(int taskInstanceId) {
REMOTE_CHANNELS.remove(taskInstanceId);
}
@@ -156,7 +155,7 @@ public class TaskCallbackService {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
- remove(taskInstanceId);
+ // remove(taskInstanceId);
return;
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 7a4bf89..cde1b57 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -100,6 +100,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
+ TaskCallbackService.remove(killCommand.getTaskInstanceId());
+ logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
}
/**