You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/30 03:55:44 UTC
[dolphinscheduler] branch 2.0.1-prepare updated: [cherry-pick][DS-7004][MasterServer]fix master still work when it lose zk connection (#7058)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.1-prepare by this push:
new 19cb123 [cherry-pick][DS-7004][MasterServer]fix master still work when it lose zk connection (#7058)
19cb123 is described below
commit 19cb1237b36d3de0d5ae29ee99142ef28fbe8242
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Nov 30 11:55:38 2021 +0800
[cherry-pick][DS-7004][MasterServer]fix master still work when it lose zk connection (#7058)
---
.../master/registry/MasterRegistryClient.java | 39 ++++++++++++++++------
.../master/registry/MasterRegistryClientTest.java | 13 +++++++-
2 files changed, 40 insertions(+), 12 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index ee6349a..318a06f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -290,20 +290,20 @@ public class MasterRegistryClient {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (workerHost == null
- || !checkOwner
- || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+ || !checkOwner
+ || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
// only failover the task owned myself if worker down.
if (processInstance == null) {
logger.error("failover error, the process {} of task {} do not exists.",
- taskInstance.getProcessInstanceId(), taskInstance.getId());
+ taskInstance.getProcessInstanceId(), taskInstance.getId());
continue;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .create();
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
@@ -364,16 +364,33 @@ public class MasterRegistryClient {
registryClient);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
- registryClient.addConnectionStateListener(newState -> {
- if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) {
- registryClient.persistEphemeral(localNodePath, "");
- }
- });
+ registryClient.addConnectionStateListener(this::handleConnectionState);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
+ public void handleConnectionState(ConnectionState state) {
+ switch (state) {
+ case CONNECTED:
+ logger.debug("registry connection state is {}", state);
+ break;
+ case SUSPENDED:
+ logger.warn("registry connection state is {}, ready to stop myself", state);
+ registryClient.getStoppable().stop("registry connection state is SUSPENDED, stop myself");
+ break;
+ case RECONNECTED:
+ logger.debug("registry connection state is {}, clean the node info", state);
+ registryClient.persistEphemeral(localNodePath, "");
+ break;
+ case DISCONNECTED:
+ logger.warn("registry connection state is {}, ready to stop myself", state);
+ registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
+ break;
+ default:
+ }
+ }
+
public void deregister() {
try {
String address = getLocalAddress();
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index c643818..65d6b89 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -48,7 +49,7 @@ import org.springframework.test.util.ReflectionTestUtils;
* MasterRegistryClientTest
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
+@PrepareForTest({RegistryClient.class})
@PowerMockIgnore({"javax.management.*"})
public class MasterRegistryClientTest {
@@ -72,6 +73,9 @@ public class MasterRegistryClientTest {
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
+ given(registryClient.getStoppable()).willReturn(cause -> {
+
+ });
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
@@ -102,6 +106,13 @@ public class MasterRegistryClientTest {
}
@Test
+ public void handleConnectionStateTest() {
+ masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
+ masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
+ masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
+ }
+
+ @Test
public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);