You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/01 15:26:16 UTC

[dolphinscheduler] branch 2.0.0-release-prepare updated (2446976 -> 800558f)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a change to branch 2.0.0-release-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git.


    from 2446976  Optimizing SQL scripts (#6644)
     new cd173ac  [DS-6640][WorkerServer] support process update host command type (#6642)
     new 800558f  [DS-6638][MasterServer] fix task state no change when failover worker (#6639)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/dolphinscheduler/remote/command/CommandType.java  | 2 +-
 .../org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java | 2 +-
 .../dolphinscheduler/remote/command/HostUpdateResponseCommand.java    | 2 +-
 .../dolphinscheduler/server/master/registry/MasterRegistryClient.java | 4 ++--
 .../java/org/apache/dolphinscheduler/server/worker/WorkerServer.java  | 2 ++
 .../dolphinscheduler/server/worker/processor/HostUpdateProcessor.java | 2 +-
 6 files changed, 8 insertions(+), 6 deletions(-)

[dolphinscheduler] 01/02: [DS-6640][WorkerServer] support process update host command type (#6642)

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.0-release-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit cd173ac140f232a8ad0dabbaea9fefc93ae1b08b
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Sun Oct 31 21:10:51 2021 +0800

    [DS-6640][WorkerServer] support process update host command type (#6642)
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../java/org/apache/dolphinscheduler/remote/command/CommandType.java    | 2 +-
 .../org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java   | 2 +-
 .../dolphinscheduler/remote/command/HostUpdateResponseCommand.java      | 2 +-
 .../java/org/apache/dolphinscheduler/server/worker/WorkerServer.java    | 2 ++
 .../dolphinscheduler/server/worker/processor/HostUpdateProcessor.java   | 2 +-
 5 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 4301910..786d10c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -126,7 +126,7 @@ public enum CommandType {
     /**
      * process host update
      */
-    PROCESS_HOST_UPDATE_REQUST,
+    PROCESS_HOST_UPDATE_REQUEST,
 
     /**
      * process host update response
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
index d70124b..4fc752e 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
@@ -56,7 +56,7 @@ public class HostUpdateCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
+        command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
index ddf4fc2..b44856c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
@@ -66,7 +66,7 @@ public class HostUpdateResponseCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
+        command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 7c03f22..50d2eab 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable {
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
         this.nettyRemotingServer.start();
 
         // worker registry
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
index 5be3276..8928d50 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
@@ -51,7 +51,7 @@ public class HostUpdateProcessor implements NettyRequestProcessor {
 
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUST == command.getType(), String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
         HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class);
         logger.info("received host update command : {}", updateCommand);
         taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));

[dolphinscheduler] 02/02: [DS-6638][MasterServer] fix task state no change when failover worker (#6639)

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.0-release-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 800558f417b745d47d0236498d19b9e52a4a6b5a
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Sun Oct 31 21:12:55 2021 +0800

    [DS-6638][MasterServer] fix task state no change when failover worker (#6639)
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../dolphinscheduler/server/master/registry/MasterRegistryClient.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 7bae6de..c157fb0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -275,6 +275,7 @@ public class MasterRegistryClient {
      *
      * @param workerHost worker host
      * @param needCheckWorkerAlive need check worker alive
+     * @param checkOwner need check process instance owner
      */
     private void failoverWorker(String workerHost, boolean needCheckWorkerAlive, boolean checkOwner) {
         logger.info("start worker[{}] failover ...", workerHost);
@@ -289,9 +290,8 @@ public class MasterRegistryClient {
             ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
             if (workerHost == null
                     || !checkOwner
-                    || processInstance.getHost().equalsIgnoreCase(workerHost)) {
+                    || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
                 // only failover the task owned myself if worker down.
-                // failover master need handle worker at the same time
                 if (processInstance == null) {
                     logger.error("failover error, the process {} of task {} do not exists.",
                             taskInstance.getProcessInstanceId(), taskInstance.getId());