You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2020/06/08 07:55:49 UTC
[incubator-dolphinscheduler] branch dev-1.3.0 updated: fix oom when
no master is active in dev-1.3.0 (#2918)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
new 186ba3b fix oom when no master is active in dev-1.3.0 (#2918)
186ba3b is described below
commit 186ba3baab15dcb75ade4d8e5eafb8fda1bcd39d
Author: dailidong <da...@gmail.com>
AuthorDate: Mon Jun 8 15:55:39 2020 +0800
fix oom when no master is active in dev-1.3.0 (#2918)
* fix worker group config no effect
* remove codehaus janino jar
the license about janino maybe not compatiable with Apache v2
* Merge remote-tracking branch 'upstream/dev-1.3.0' into dev-1.3.0
# Conflicts:
# dolphinscheduler-server/src/main/resources/config/install_config.conf
* datasource config
* Update datasource.properties
* fix RunConfig bug
* remove param monitor server state
* fix table T_DS_ALERT
* update h2 database
* fix #2910 master server will show exception for some time when it restart
* fix oom when no master is active
* fix worker oom when master server restart
* fix oom
* fix
* add UT
* fix worker group config no effect
---
.../remote/NettyRemotingClient.java | 2 +-
.../dispatch/host/LowerWeightHostManager.java | 2 +-
.../worker/processor/TaskCallbackService.java | 19 ++++++++++--
.../worker/processor/TaskCallbackServiceTest.java | 34 ++++++++++------------
.../dolphinscheduler/service/zk/ZKServer.java | 5 ++--
.../dolphinscheduler/service/zk/ZKServerTest.java | 12 ++------
6 files changed, 39 insertions(+), 35 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 10f729d..10e62d8 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -349,7 +349,7 @@ public class NettyRemotingClient {
return channel;
}
} catch (Exception ex) {
- logger.info("connect to {} error {}", host, ex);
+ logger.warn(String.format("connect to %s error", host), ex);
}
return null;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 8d29eb5..1872ae0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -87,7 +87,7 @@ public class LowerWeightHostManager extends CommonHostManager {
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
- this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 40, TimeUnit.SECONDS);
+ this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 5, TimeUnit.SECONDS);
this.roundRobinHostManager = new RoundRobinHostManager();
this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager());
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index c308f3b..6a23a9e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -46,6 +46,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
+ private static final int [] RETRY_BACKOFF = { 1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200 };
/**
* remote channels
@@ -58,6 +59,7 @@ public class TaskCallbackService {
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
/**
* netty remoting client
*/
@@ -99,14 +101,19 @@ public class TaskCallbackService {
nettyRemoteChannel.getHost(),
taskInstanceId);
Set<String> masterNodes = null;
+ int ntries = 0;
while (Stopper.isRunning()) {
masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
if (CollectionUtils.isEmpty(masterNodes)) {
+ logger.info("try {} times but not find any master for task : {}.",
+ ntries + 1,
+ taskInstanceId);
masterNodes = null;
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ ThreadUtils.sleep(pause(ntries++));
continue;
}
- logger.info("find {} masters for task : {}.",
+ logger.info("try {} times to find {} masters for task : {}.",
+ ntries + 1,
masterNodes.size(),
taskInstanceId);
for (String masterNode : masterNodes) {
@@ -116,12 +123,18 @@ public class TaskCallbackService {
}
}
masterNodes = null;
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ ThreadUtils.sleep(pause(ntries++));
}
throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: {}", masterNodes, taskInstanceId));
}
+
+ public int pause(int ntries){
+ return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
+ }
+
+
private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId){
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
addRemoteChannel(taskInstanceId, remoteChannel);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 78ba3a6..c38ca3e 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
@@ -46,6 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import java.io.IOException;
import java.util.Date;
/**
@@ -91,12 +93,8 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date());
taskCallbackService.sendAck(1, ackCommand.convert2Command());
- Thread.sleep(5000);
-
Stopper.stop();
- Thread.sleep(5000);
-
nettyRemotingServer.close();
nettyRemotingClient.close();
}
@@ -140,8 +138,13 @@ public class TaskCallbackServiceTest {
Stopper.stop();
}
- @Test(expected = IllegalStateException.class)
- public void testSendAckWithIllegalStateException1(){
+ @Test
+ public void testPause(){
+ Assert.assertEquals(5000, taskCallbackService.pause(3));;
+ }
+
+ @Test
+ public void testSendAck1(){
masterRegistry.registry();
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
@@ -153,27 +156,20 @@ public class TaskCallbackServiceTest {
NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
- channel.close();
+// channel.close();
+
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(1);
ackCommand.setStartTime(new Date());
- nettyRemotingServer.close();
-
taskCallbackService.sendAck(1, ackCommand.convert2Command());
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+
+ Assert.assertEquals(true, channel.isOpen());
Stopper.stop();
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ nettyRemotingServer.close();
+ nettyRemotingClient.close();
}
// @Test(expected = IllegalStateException.class)
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
index 9633140..3cdc9ab 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
@@ -95,8 +95,9 @@ public class ZKServer {
* @param port The port to listen on
*/
public static void startLocalZkServer(final int port) {
-
- startLocalZkServer(port, System.getProperty("user.dir") +"/zookeeper_data", ZooKeeperServer.DEFAULT_TICK_TIME,"20");
+ String zkDataDir = System.getProperty("user.dir") +"/zookeeper_data";
+ logger.info("zk server starting, data dir path:{}" , zkDataDir);
+ startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60");
}
/**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
index 48cde32..42b942b 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
@@ -16,22 +16,16 @@
*/
package org.apache.dolphinscheduler.service.zk;
-import org.junit.Ignore;
+import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.*;
-
-@Ignore
+// ZKServer is a process, can't unit test
public class ZKServerTest {
- @Test
- public void start() {
- //ZKServer is a process, can't unit test
- }
@Test
public void isStarted() {
-
+ Assert.assertEquals(false, ZKServer.isStarted());
}
@Test