You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/30 03:55:44 UTC

[dolphinscheduler] branch 2.0.1-prepare updated: [cherry-pick][DS-7004][MasterServer]fix master still work when it lose zk connection (#7058)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.1-prepare by this push:
     new 19cb123  [cherry-pick][DS-7004][MasterServer]fix master still work when it lose zk connection (#7058)
19cb123 is described below

commit 19cb1237b36d3de0d5ae29ee99142ef28fbe8242
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Nov 30 11:55:38 2021 +0800

    [cherry-pick][DS-7004][MasterServer]fix master still work when it lose zk connection (#7058)
---
 .../master/registry/MasterRegistryClient.java      | 39 ++++++++++++++++------
 .../master/registry/MasterRegistryClientTest.java  | 13 +++++++-
 2 files changed, 40 insertions(+), 12 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index ee6349a..318a06f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -290,20 +290,20 @@ public class MasterRegistryClient {
 
             ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
             if (workerHost == null
-                || !checkOwner
-                || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+                    || !checkOwner
+                    || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
                 // only failover the task owned myself if worker down.
                 if (processInstance == null) {
                     logger.error("failover error, the process {} of task {} do not exists.",
-                        taskInstance.getProcessInstanceId(), taskInstance.getId());
+                            taskInstance.getProcessInstanceId(), taskInstance.getId());
                     continue;
                 }
                 taskInstance.setProcessInstance(processInstance);
 
                 TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
-                                                                                       .buildTaskInstanceRelatedInfo(taskInstance)
-                                                                                       .buildProcessInstanceRelatedInfo(processInstance)
-                                                                                       .create();
+                        .buildTaskInstanceRelatedInfo(taskInstance)
+                        .buildProcessInstanceRelatedInfo(processInstance)
+                        .create();
                 // only kill yarn job if exists , the local thread has exited
                 ProcessUtils.killYarnJob(taskExecutionContext);
 
@@ -364,16 +364,33 @@ public class MasterRegistryClient {
             registryClient);
 
         registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
-        registryClient.addConnectionStateListener(newState -> {
-            if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) {
-                registryClient.persistEphemeral(localNodePath, "");
-            }
-        });
+        registryClient.addConnectionStateListener(this::handleConnectionState);
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
 
     }
 
+    public void handleConnectionState(ConnectionState state) {
+        switch (state) {
+            case CONNECTED:
+                logger.debug("registry connection state is {}", state);
+                break;
+            case SUSPENDED:
+                logger.warn("registry connection state is {}, ready to stop myself", state);
+                registryClient.getStoppable().stop("registry connection state is SUSPENDED, stop myself");
+                break;
+            case RECONNECTED:
+                logger.debug("registry connection state is {}, clean the node info", state);
+                registryClient.persistEphemeral(localNodePath, "");
+                break;
+            case DISCONNECTED:
+                logger.warn("registry connection state is {}, ready to stop myself", state);
+                registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
+                break;
+            default:
+        }
+    }
+
     public void deregister() {
         try {
             String address = getLocalAddress();
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index c643818..65d6b89 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -48,7 +49,7 @@ import org.springframework.test.util.ReflectionTestUtils;
  * MasterRegistryClientTest
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
+@PrepareForTest({RegistryClient.class})
 @PowerMockIgnore({"javax.management.*"})
 public class MasterRegistryClientTest {
 
@@ -72,6 +73,9 @@ public class MasterRegistryClientTest {
         given(registryClient.getLock(Mockito.anyString())).willReturn(true);
         given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
         given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
+        given(registryClient.getStoppable()).willReturn(cause -> {
+
+        });
         doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
         ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
 
@@ -102,6 +106,13 @@ public class MasterRegistryClientTest {
     }
 
     @Test
+    public void handleConnectionStateTest() {
+        masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
+        masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
+        masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
+    }
+
+    @Test
     public void removeNodePathTest() {
         masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
         masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);