You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2021/12/25 04:26:50 UTC

[dolphinscheduler] branch dev updated: to #7609 (#7611)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0d861fe  to #7609 (#7611)
0d861fe is described below

commit 0d861fe46af595156342b56ef2edf3a2f24a0e4c
Author: zwZjut <zw...@163.com>
AuthorDate: Sat Dec 25 12:26:43 2021 +0800

    to #7609 (#7611)
    
    Co-authored-by: honghuo.zw <ho...@alibaba-inc.com>
---
 .../server/worker/processor/DBTaskResponseProcessor.java       | 10 +++++++---
 .../server/worker/processor/TaskCallbackService.java           |  9 ++++-----
 .../server/worker/processor/TaskKillProcessor.java             |  2 ++
 3 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index 97a9cf5..9c0815f 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
-import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -25,11 +24,14 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+import io.netty.channel.Channel;
+
 /**
  *  db task response processor
  */
@@ -45,12 +47,14 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor {
         DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject(
                 command.getBody(), DBTaskResponseCommand.class);
 
-        if (taskResponseCommand == null){
+        if (taskResponseCommand == null) {
             return;
         }
 
-        if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
+        if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
             ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
+            TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
+            logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId());
         }
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 96ec36b..09b2c3a 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
 
 import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,7 +124,7 @@ public class TaskCallbackService {
      *
      * @param taskInstanceId taskInstanceId
      */
-    public void remove(int taskInstanceId) {
+    public static void remove(int taskInstanceId) {
         REMOTE_CHANNELS.remove(taskInstanceId);
     }
 
@@ -156,7 +155,7 @@ public class TaskCallbackService {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
                     if (future.isSuccess()) {
-                        remove(taskInstanceId);
+                        // remove(taskInstanceId);
                         return;
                     }
                 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 7a4bf89..cde1b57 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -100,6 +100,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
         taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
         TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
+        TaskCallbackService.remove(killCommand.getTaskInstanceId());
+        logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
     }
 
     /**