You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2020/06/08 07:55:49 UTC

[incubator-dolphinscheduler] branch dev-1.3.0 updated: fix oom when no master is active in dev-1.3.0 (#2918)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
     new 186ba3b  fix oom when no master is active in dev-1.3.0 (#2918)
186ba3b is described below

commit 186ba3baab15dcb75ade4d8e5eafb8fda1bcd39d
Author: dailidong <da...@gmail.com>
AuthorDate: Mon Jun 8 15:55:39 2020 +0800

    fix oom when no master is active in dev-1.3.0 (#2918)
    
    * fix worker group config no effect
    
    * remove codehaus janino jar
    the license about janino maybe not compatiable with Apache v2
    
    * Merge remote-tracking branch 'upstream/dev-1.3.0' into dev-1.3.0
    
    # Conflicts:
    #	dolphinscheduler-server/src/main/resources/config/install_config.conf
    
    * datasource config
    
    * Update datasource.properties
    
    * fix RunConfig bug
    
    * remove param monitor server state
    
    * fix table T_DS_ALERT
    
    * update h2 database
    
    * fix #2910 master server will show exception for some time when it restart
    
    * fix oom when no master is active
    
    * fix worker oom when master server restart
    
    * fix oom
    
    * fix
    
    * add UT
    
    * fix worker group config no effect
---
 .../remote/NettyRemotingClient.java                |  2 +-
 .../dispatch/host/LowerWeightHostManager.java      |  2 +-
 .../worker/processor/TaskCallbackService.java      | 19 ++++++++++--
 .../worker/processor/TaskCallbackServiceTest.java  | 34 ++++++++++------------
 .../dolphinscheduler/service/zk/ZKServer.java      |  5 ++--
 .../dolphinscheduler/service/zk/ZKServerTest.java  | 12 ++------
 6 files changed, 39 insertions(+), 35 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 10f729d..10e62d8 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -349,7 +349,7 @@ public class NettyRemotingClient {
                 return channel;
             }
         } catch (Exception ex) {
-            logger.info("connect to {} error  {}", host, ex);
+            logger.warn(String.format("connect to %s error", host), ex);
         }
         return null;
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 8d29eb5..1872ae0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -87,7 +87,7 @@ public class LowerWeightHostManager extends CommonHostManager {
         this.workerHostWeightsMap = new ConcurrentHashMap<>();
         this.lock = new ReentrantLock();
         this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
-        this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 40, TimeUnit.SECONDS);
+        this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 5, TimeUnit.SECONDS);
         this.roundRobinHostManager = new RoundRobinHostManager();
         this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager());
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index c308f3b..6a23a9e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -46,6 +46,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 public class TaskCallbackService {
 
     private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
+    private static final int [] RETRY_BACKOFF = { 1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200 };
 
     /**
      *  remote channels
@@ -58,6 +59,7 @@ public class TaskCallbackService {
     @Autowired
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
+
     /**
      * netty remoting client
      */
@@ -99,14 +101,19 @@ public class TaskCallbackService {
                 nettyRemoteChannel.getHost(),
                 taskInstanceId);
         Set<String> masterNodes = null;
+        int ntries = 0;
         while (Stopper.isRunning()) {
             masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
             if (CollectionUtils.isEmpty(masterNodes)) {
+                logger.info("try {} times but not find any master for task : {}.",
+                        ntries + 1,
+                        taskInstanceId);
                 masterNodes = null;
-                ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+                ThreadUtils.sleep(pause(ntries++));
                 continue;
             }
-            logger.info("find {} masters for task : {}.",
+            logger.info("try {} times to find {} masters for task : {}.",
+                    ntries + 1,
                     masterNodes.size(),
                     taskInstanceId);
             for (String masterNode : masterNodes) {
@@ -116,12 +123,18 @@ public class TaskCallbackService {
                 }
             }
             masterNodes = null;
-            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+            ThreadUtils.sleep(pause(ntries++));
         }
 
         throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: {}", masterNodes, taskInstanceId));
     }
 
+
+    public int pause(int ntries){
+        return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
+    }
+
+
     private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId){
         NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
         addRemoteChannel(taskInstanceId, remoteChannel);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 78ba3a6..c38ca3e 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
@@ -46,6 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
+import java.io.IOException;
 import java.util.Date;
 
 /**
@@ -91,12 +93,8 @@ public class TaskCallbackServiceTest {
         ackCommand.setStartTime(new Date());
         taskCallbackService.sendAck(1, ackCommand.convert2Command());
 
-        Thread.sleep(5000);
-
         Stopper.stop();
 
-        Thread.sleep(5000);
-
         nettyRemotingServer.close();
         nettyRemotingClient.close();
     }
@@ -140,8 +138,13 @@ public class TaskCallbackServiceTest {
         Stopper.stop();
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testSendAckWithIllegalStateException1(){
+    @Test
+    public void testPause(){
+        Assert.assertEquals(5000, taskCallbackService.pause(3));;
+    }
+
+    @Test
+    public void testSendAck1(){
         masterRegistry.registry();
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
@@ -153,27 +156,20 @@ public class TaskCallbackServiceTest {
         NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
         Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
         taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
-        channel.close();
+//        channel.close();
+
         TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
         ackCommand.setTaskInstanceId(1);
         ackCommand.setStartTime(new Date());
 
-        nettyRemotingServer.close();
-
         taskCallbackService.sendAck(1, ackCommand.convert2Command());
-        try {
-            Thread.sleep(5000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
+
+        Assert.assertEquals(true, channel.isOpen());
 
         Stopper.stop();
 
-        try {
-            Thread.sleep(5000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
+        nettyRemotingServer.close();
+        nettyRemotingClient.close();
     }
 
 //    @Test(expected = IllegalStateException.class)
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
index 9633140..3cdc9ab 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
@@ -95,8 +95,9 @@ public class ZKServer {
      * @param port The port to listen on
      */
     public static void startLocalZkServer(final int port) {
-
-        startLocalZkServer(port, System.getProperty("user.dir") +"/zookeeper_data", ZooKeeperServer.DEFAULT_TICK_TIME,"20");
+        String zkDataDir = System.getProperty("user.dir") +"/zookeeper_data";
+        logger.info("zk server starting, data dir path:{}" , zkDataDir);
+        startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60");
     }
 
     /**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
index 48cde32..42b942b 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
@@ -16,22 +16,16 @@
  */
 package org.apache.dolphinscheduler.service.zk;
 
-import org.junit.Ignore;
+import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
-@Ignore
+// ZKServer is a process, can't unit test
 public class ZKServerTest {
 
-    @Test
-    public void start() {
-        //ZKServer is a process, can't unit test
-    }
 
     @Test
     public void isStarted() {
-
+        Assert.assertEquals(false, ZKServer.isStarted());
     }
 
     @Test