You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/01 15:26:17 UTC

[dolphinscheduler] 01/02: [DS-6640][WorkerServer] support process update host command type (#6642)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.0-release-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit cd173ac140f232a8ad0dabbaea9fefc93ae1b08b
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Sun Oct 31 21:10:51 2021 +0800

    [DS-6640][WorkerServer] support process update host command type (#6642)
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../java/org/apache/dolphinscheduler/remote/command/CommandType.java    | 2 +-
 .../org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java   | 2 +-
 .../dolphinscheduler/remote/command/HostUpdateResponseCommand.java      | 2 +-
 .../java/org/apache/dolphinscheduler/server/worker/WorkerServer.java    | 2 ++
 .../dolphinscheduler/server/worker/processor/HostUpdateProcessor.java   | 2 +-
 5 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 4301910..786d10c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -126,7 +126,7 @@ public enum CommandType {
     /**
      * process host update
      */
-    PROCESS_HOST_UPDATE_REQUST,
+    PROCESS_HOST_UPDATE_REQUEST,
 
     /**
      * process host update response
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
index d70124b..4fc752e 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
@@ -56,7 +56,7 @@ public class HostUpdateCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
+        command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
index ddf4fc2..b44856c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
@@ -66,7 +66,7 @@ public class HostUpdateResponseCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
+        command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 7c03f22..50d2eab 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable {
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
         this.nettyRemotingServer.start();
 
         // worker registry
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
index 5be3276..8928d50 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
@@ -51,7 +51,7 @@ public class HostUpdateProcessor implements NettyRequestProcessor {
 
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUST == command.getType(), String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
         HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class);
         logger.info("received host update command : {}", updateCommand);
         taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));