You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/10/12 15:29:53 UTC

[incubator-dolphinscheduler] branch dev updated: [FIX_BUG_1.3#3789][remote]support netty heart beat (#3868)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 08dc691  [FIX_BUG_1.3#3789][remote]support netty heart beat (#3868)
08dc691 is described below

commit 08dc6913dcfa9205bab3ccbffd23902e326bdc4e
Author: Kirs <ac...@163.com>
AuthorDate: Mon Oct 12 23:29:38 2020 +0800

    [FIX_BUG_1.3#3789][remote]support netty heart beat (#3868)
    
    * [FIX_BUG_1.3#3789][remote]support netty heart beat
    
    * netty heart beat
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * delete code docs
    
    * client fail-close
---
 .../remote/NettyRemotingClient.java                |  51 ++++++----
 .../remote/NettyRemotingServer.java                |  12 ++-
 .../remote/command/CommandType.java                | 106 ++++++++++++++++++++-
 .../remote/handler/NettyClientHandler.java         |  75 ++++++++++-----
 .../remote/handler/NettyServerHandler.java         |  67 ++++++++-----
 .../dolphinscheduler/remote/utils/Constants.java   |   5 +-
 6 files changed, 239 insertions(+), 77 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 38f00fb..c1aea90 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -17,17 +17,6 @@
 
 package org.apache.dolphinscheduler.remote;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-
 import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
 import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -41,19 +30,40 @@ import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
 import org.apache.dolphinscheduler.remote.future.ResponseFuture;
 import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.remote.utils.NettyUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetSocketAddress;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+
 /**
  * remoting netty client
  */
@@ -162,11 +172,10 @@ public class NettyRemotingClient {
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
-                public void initChannel(SocketChannel ch) throws Exception {
-                    ch.pipeline().addLast(
-                        new NettyDecoder(),
-                        clientHandler,
-                        encoder);
+                public void initChannel(SocketChannel ch) {
+                    ch.pipeline()
+                        .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
+                        .addLast(new NettyDecoder(), clientHandler, encoder);
                 }
             });
         this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index ad5c95b..867cf4d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.remote.utils.NettyUtils;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -39,11 +40,11 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 
 /**
  * remoting netty server
@@ -183,10 +184,11 @@ public class NettyRemotingServer {
      * @param ch socket channel
      */
     private void initNettyChannel(SocketChannel ch) {
-        ChannelPipeline pipeline = ch.pipeline();
-        pipeline.addLast("encoder", encoder);
-        pipeline.addLast("decoder", new NettyDecoder());
-        pipeline.addLast("handler", serverHandler);
+        ch.pipeline()
+            .addLast("encoder", encoder)
+            .addLast("decoder", new NettyDecoder())
+            .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
+            .addLast("handler", serverHandler);
     }
 
     /**
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index d1ffc65..4f477fb 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1,105 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     * remove task log request,
     */
    REMOVE_TAK_LOG_REQUEST,

    /**
     * remove task log respons
 e
     */
    REMOVE_TAK_LOG_RESPONSE,

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
     */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    TASK_EXECUTE_REQUEST,

    /**
     * execute task ack
     */
    TASK_EXECUTE_ACK,

    /**
     * execute task response
     */
    TASK_EXECUTE_RESPONSE,

    /**
     * kill task
     */
    TASK_KILL_REQUEST,

    /**
     * kill task response
     */
    TASK_KILL_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+public enum CommandType {
+
+    /**
+     * remove task log request,
+     */
+    REMOVE_TAK_LOG_REQUEST,
+
+    /**
+     * remove task log response
+     */
+    REMOVE_TAK_LOG_RESPONSE,
+
+    /**
+     *  roll view log request
+     */
+    ROLL_VIEW_LOG_REQUEST,
+
+    /**
+     *  roll view log response
+     */
+    ROLL_VIEW_LOG_RESPONSE,
+
+    /**
+     * view whole log request
+     */
+    VIEW_WHOLE_LOG_REQUEST,
+
+    /**
+     * view whole log response
+     */
+    VIEW_WHOLE_LOG_RESPONSE,
+
+    /**
+     * get log bytes request
+     */
+    GET_LOG_BYTES_REQUEST,
+
+    /**
+     * get log bytes response
+     */
+    GET_LOG_BYTES_RESPONSE,
+
+
+    WORKER_REQUEST,
+    MASTER_RESPONSE,
+
+    /**
+     * execute task request
+     */
+    TASK_EXECUTE_REQUEST,
+
+    /**
+     * execute task ack
+     */
+    TASK_EXECUTE_ACK,
+
+    /**
+     * execute task response
+     */
+    TASK_EXECUTE_RESPONSE,
+
+    /**
+     * kill task
+     */
+    TASK_KILL_REQUEST,
+
+    /**
+     * kill task response
+     */
+    TASK_KILL_RESPONSE,
+
+    /**
+     * HEART_BEAT
+     */
+    HEART_BEAT,
+
+    /**
+     * ping
+     */
+    PING,
+
+    /**
+     *  pong
+     */
+    PONG;
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
index 48d78d9..a988acf 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.handler;
 
-import io.netty.channel.*;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -25,16 +25,24 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.remote.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+
 /**
- *  netty client request handler
+ * netty client request handler
  */
 @ChannelHandler.Sharable
 public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@@ -42,12 +50,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
 
     /**
-     *  netty client
+     * netty client
      */
     private final NettyRemotingClient nettyRemotingClient;
 
+    private static byte[] heartBeatData = "heart_beat".getBytes();
+
     /**
-     *  callback thread executor
+     * callback thread executor
      */
     private final ExecutorService callbackExecutor;
 
@@ -57,19 +67,19 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors;
 
     /**
-     *  default executor
+     * default executor
      */
     private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
 
-    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
+    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
         this.nettyRemotingClient = nettyRemotingClient;
         this.callbackExecutor = callbackExecutor;
         this.processors = new ConcurrentHashMap();
     }
 
     /**
-     *  When the current channel is not active,
-     *  the current channel has reached the end of its life cycle
+     * When the current channel is not active,
+     * the current channel has reached the end of its life cycle
      *
      * @param ctx channel handler context
      * @throws Exception
@@ -81,7 +91,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     }
 
     /**
-     *  The current channel reads data from the remote
+     * The current channel reads data from the remote
      *
      * @param ctx channel handler context
      * @param msg message
@@ -89,55 +99,55 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      */
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        processReceived(ctx.channel(), (Command)msg);
+        processReceived(ctx.channel(), (Command) msg);
     }
 
     /**
      * register processor
      *
      * @param commandType command type
-     * @param processor processor
+     * @param processor   processor
      */
     public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
         this.registerProcessor(commandType, processor, null);
     }
 
     /**
-     *  register processor
+     * register processor
      *
      * @param commandType command type
-     * @param processor processor
-     * @param executor thread executor
+     * @param processor   processor
+     * @param executor    thread executor
      */
     public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
         ExecutorService executorRef = executor;
-        if(executorRef == null){
+        if (executorRef == null) {
             executorRef = defaultExecutor;
         }
         this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
     }
 
     /**
-     *  process received logic
+     * process received logic
      *
      * @param command command
      */
     private void processReceived(final Channel channel, final Command command) {
         ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
-        if(future != null){
+        if (future != null) {
             future.setResponseCommand(command);
             future.release();
-            if(future.getInvokeCallback() != null){
+            if (future.getInvokeCallback() != null) {
                 this.callbackExecutor.submit(new Runnable() {
                     @Override
                     public void run() {
                         future.executeInvokeCallback();
                     }
                 });
-            } else{
+            } else {
                 future.putResponse(command);
             }
-        } else{
+        } else {
             processByCommandType(channel, command);
         }
     }
@@ -163,9 +173,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     }
 
     /**
-     *  caught exception
-     * @param ctx channel handler context
-     * @param cause  cause
+     * caught exception
+     *
+     * @param ctx   channel handler context
+     * @param cause cause
      * @throws Exception
      */
     @Override
@@ -175,4 +186,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
         ctx.channel().close();
     }
 
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            Command heartBeat = new Command();
+            heartBeat.setType(CommandType.HEART_BEAT);
+            heartBeat.setBody(heartBeatData);
+            ctx.writeAndFlush(heartBeat)
+                .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
index da2a6ff..09e41e9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -17,22 +17,30 @@
 
 package org.apache.dolphinscheduler.remote.handler;
 
-import io.netty.channel.*;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+
+
 /**
- *  netty server request handler
+ * netty server request handler
  */
 @ChannelHandler.Sharable
 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@@ -40,22 +48,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
     private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
 
     /**
-     *  netty remote server
+     * netty remote server
      */
     private final NettyRemotingServer nettyRemotingServer;
 
     /**
-     *  server processors queue
+     * server processors queue
      */
     private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
 
-    public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
+    public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
         this.nettyRemotingServer = nettyRemotingServer;
     }
 
     /**
-     *  When the current channel is not active,
-     *  the current channel has reached the end of its life cycle
+     * When the current channel is not active,
+     * the current channel has reached the end of its life cycle
+     *
      * @param ctx channel handler context
      * @throws Exception
      */
@@ -73,38 +82,39 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
      */
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        processReceived(ctx.channel(), (Command)msg);
+        processReceived(ctx.channel(), (Command) msg);
     }
 
     /**
      * register processor
      *
      * @param commandType command type
-     * @param processor processor
+     * @param processor   processor
      */
     public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
         this.registerProcessor(commandType, processor, null);
     }
 
     /**
-     *  register processor
+     * register processor
      *
      * @param commandType command type
-     * @param processor processor
-     * @param executor thread executor
+     * @param processor   processor
+     * @param executor    thread executor
      */
     public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
         ExecutorService executorRef = executor;
-        if(executorRef == null){
+        if (executorRef == null) {
             executorRef = nettyRemotingServer.getDefaultExecutor();
         }
         this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
     }
 
     /**
-     *  process received logic
+     * process received logic
+     *
      * @param channel channel
-     * @param msg message
+     * @param msg     message
      */
     private void processReceived(final Channel channel, final Command msg) {
         final CommandType commandType = msg.getType();
@@ -132,22 +142,22 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
     }
 
     /**
-     *  caught exception
+     * caught exception
      *
-     * @param ctx channel handler context
+     * @param ctx   channel handler context
      * @param cause cause
      * @throws Exception
      */
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        logger.error("exceptionCaught : {}",cause.getMessage(), cause);
+        logger.error("exceptionCaught : {}", cause.getMessage(), cause);
         ctx.channel().close();
     }
 
     /**
-     *  channel write changed
+     * channel write changed
      *
-     * @param ctx  channel handler context
+     * @param ctx channel handler context
      * @throws Exception
      */
     @Override
@@ -158,16 +168,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
         if (!ch.isWritable()) {
             if (logger.isWarnEnabled()) {
                 logger.warn("{} is not writable, over high water level : {}",
-                        ch, config.getWriteBufferHighWaterMark());
+                    ch, config.getWriteBufferHighWaterMark());
             }
 
             config.setAutoRead(false);
         } else {
             if (logger.isWarnEnabled()) {
                 logger.warn("{} is writable, to low water : {}",
-                        ch, config.getWriteBufferLowWaterMark());
+                    ch, config.getWriteBufferLowWaterMark());
             }
             config.setAutoRead(true);
         }
     }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            ctx.channel().close();
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
index 91d4ac2..866ebb6 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.remote.utils;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
-
 /**
  * constant
  */
@@ -30,6 +29,10 @@ public class Constants {
 
     public static final String SLASH = "/";
 
+    public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000;
+
+    public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60;
+
     /**
      * charset
      */