You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/10/12 15:29:53 UTC
[incubator-dolphinscheduler] branch dev updated:
[FIX_BUG_1.3#3789][remote]support netty heart beat (#3868)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 08dc691 [FIX_BUG_1.3#3789][remote]support netty heart beat (#3868)
08dc691 is described below
commit 08dc6913dcfa9205bab3ccbffd23902e326bdc4e
Author: Kirs <ac...@163.com>
AuthorDate: Mon Oct 12 23:29:38 2020 +0800
[FIX_BUG_1.3#3789][remote]support netty heart beat (#3868)
* [FIX_BUG_1.3#3789][remote]support netty heart beat
* netty heart beat
* code style
* code style
* code style
* code style
* code style
* delete code docs
* client fail-close
---
.../remote/NettyRemotingClient.java | 51 ++++++----
.../remote/NettyRemotingServer.java | 12 ++-
.../remote/command/CommandType.java | 106 ++++++++++++++++++++-
.../remote/handler/NettyClientHandler.java | 75 ++++++++++-----
.../remote/handler/NettyServerHandler.java | 67 ++++++++-----
.../dolphinscheduler/remote/utils/Constants.java | 5 +-
6 files changed, 239 insertions(+), 77 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 38f00fb..c1aea90 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -17,17 +17,6 @@
package org.apache.dolphinscheduler.remote;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
@@ -41,19 +30,40 @@ import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.InetSocketAddress;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+
/**
* remoting netty client
*/
@@ -162,11 +172,10 @@ public class NettyRemotingClient {
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
- new NettyDecoder(),
- clientHandler,
- encoder);
+ public void initChannel(SocketChannel ch) {
+ ch.pipeline()
+ .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
+ .addLast(new NettyDecoder(), clientHandler, encoder);
}
});
this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index ad5c95b..867cf4d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,11 +40,11 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
/**
* remoting netty server
@@ -183,10 +184,11 @@ public class NettyRemotingServer {
* @param ch socket channel
*/
private void initNettyChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("encoder", encoder);
- pipeline.addLast("decoder", new NettyDecoder());
- pipeline.addLast("handler", serverHandler);
+ ch.pipeline()
+ .addLast("encoder", encoder)
+ .addLast("decoder", new NettyDecoder())
+ .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
+ .addLast("handler", serverHandler);
}
/**
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index d1ffc65..4f477fb 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1,105 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log respons
e
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+public enum CommandType {
+
+ /**
+ * remove task log request,
+ */
+ REMOVE_TAK_LOG_REQUEST,
+
+ /**
+ * remove task log response
+ */
+ REMOVE_TAK_LOG_RESPONSE,
+
+ /**
+ * roll view log request
+ */
+ ROLL_VIEW_LOG_REQUEST,
+
+ /**
+ * roll view log response
+ */
+ ROLL_VIEW_LOG_RESPONSE,
+
+ /**
+ * view whole log request
+ */
+ VIEW_WHOLE_LOG_REQUEST,
+
+ /**
+ * view whole log response
+ */
+ VIEW_WHOLE_LOG_RESPONSE,
+
+ /**
+ * get log bytes request
+ */
+ GET_LOG_BYTES_REQUEST,
+
+ /**
+ * get log bytes response
+ */
+ GET_LOG_BYTES_RESPONSE,
+
+
+ WORKER_REQUEST,
+ MASTER_RESPONSE,
+
+ /**
+ * execute task request
+ */
+ TASK_EXECUTE_REQUEST,
+
+ /**
+ * execute task ack
+ */
+ TASK_EXECUTE_ACK,
+
+ /**
+ * execute task response
+ */
+ TASK_EXECUTE_RESPONSE,
+
+ /**
+ * kill task
+ */
+ TASK_KILL_REQUEST,
+
+ /**
+ * kill task response
+ */
+ TASK_KILL_RESPONSE,
+
+ /**
+ * HEART_BEAT
+ */
+ HEART_BEAT,
+
+ /**
+ * ping
+ */
+ PING,
+
+ /**
+ * pong
+ */
+ PONG;
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
index 48d78d9..a988acf 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.remote.handler;
-import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -25,16 +25,24 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+
/**
- * netty client request handler
+ * netty client request handler
*/
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@@ -42,12 +50,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/**
- * netty client
+ * netty client
*/
private final NettyRemotingClient nettyRemotingClient;
+ private static byte[] heartBeatData = "heart_beat".getBytes();
+
/**
- * callback thread executor
+ * callback thread executor
*/
private final ExecutorService callbackExecutor;
@@ -57,19 +67,19 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors;
/**
- * default executor
+ * default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
- public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
+ public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor;
this.processors = new ConcurrentHashMap();
}
/**
- * When the current channel is not active,
- * the current channel has reached the end of its life cycle
+ * When the current channel is not active,
+ * the current channel has reached the end of its life cycle
*
* @param ctx channel handler context
* @throws Exception
@@ -81,7 +91,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
}
/**
- * The current channel reads data from the remote
+ * The current channel reads data from the remote
*
* @param ctx channel handler context
* @param msg message
@@ -89,55 +99,55 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- processReceived(ctx.channel(), (Command)msg);
+ processReceived(ctx.channel(), (Command) msg);
}
/**
* register processor
*
* @param commandType command type
- * @param processor processor
+ * @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
- * register processor
+ * register processor
*
* @param commandType command type
- * @param processor processor
- * @param executor thread executor
+ * @param processor processor
+ * @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
- if(executorRef == null){
+ if (executorRef == null) {
executorRef = defaultExecutor;
}
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
}
/**
- * process received logic
+ * process received logic
*
* @param command command
*/
private void processReceived(final Channel channel, final Command command) {
ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
- if(future != null){
+ if (future != null) {
future.setResponseCommand(command);
future.release();
- if(future.getInvokeCallback() != null){
+ if (future.getInvokeCallback() != null) {
this.callbackExecutor.submit(new Runnable() {
@Override
public void run() {
future.executeInvokeCallback();
}
});
- } else{
+ } else {
future.putResponse(command);
}
- } else{
+ } else {
processByCommandType(channel, command);
}
}
@@ -163,9 +173,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
}
/**
- * caught exception
- * @param ctx channel handler context
- * @param cause cause
+ * caught exception
+ *
+ * @param ctx channel handler context
+ * @param cause cause
* @throws Exception
*/
@Override
@@ -175,4 +186,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
ctx.channel().close();
}
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ Command heartBeat = new Command();
+ heartBeat.setType(CommandType.HEART_BEAT);
+ heartBeat.setBody(heartBeatData);
+ ctx.writeAndFlush(heartBeat)
+ .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
index da2a6ff..09e41e9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -17,22 +17,30 @@
package org.apache.dolphinscheduler.remote.handler;
-import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+
+
/**
- * netty server request handler
+ * netty server request handler
*/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@@ -40,22 +48,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
/**
- * netty remote server
+ * netty remote server
*/
private final NettyRemotingServer nettyRemotingServer;
/**
- * server processors queue
+ * server processors queue
*/
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
- public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
+ public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
this.nettyRemotingServer = nettyRemotingServer;
}
/**
- * When the current channel is not active,
- * the current channel has reached the end of its life cycle
+ * When the current channel is not active,
+ * the current channel has reached the end of its life cycle
+ *
* @param ctx channel handler context
* @throws Exception
*/
@@ -73,38 +82,39 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- processReceived(ctx.channel(), (Command)msg);
+ processReceived(ctx.channel(), (Command) msg);
}
/**
* register processor
*
* @param commandType command type
- * @param processor processor
+ * @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
- * register processor
+ * register processor
*
* @param commandType command type
- * @param processor processor
- * @param executor thread executor
+ * @param processor processor
+ * @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
- if(executorRef == null){
+ if (executorRef == null) {
executorRef = nettyRemotingServer.getDefaultExecutor();
}
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
}
/**
- * process received logic
+ * process received logic
+ *
* @param channel channel
- * @param msg message
+ * @param msg message
*/
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
@@ -132,22 +142,22 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
}
/**
- * caught exception
+ * caught exception
*
- * @param ctx channel handler context
+ * @param ctx channel handler context
* @param cause cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.error("exceptionCaught : {}",cause.getMessage(), cause);
+ logger.error("exceptionCaught : {}", cause.getMessage(), cause);
ctx.channel().close();
}
/**
- * channel write changed
+ * channel write changed
*
- * @param ctx channel handler context
+ * @param ctx channel handler context
* @throws Exception
*/
@Override
@@ -158,16 +168,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (!ch.isWritable()) {
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}",
- ch, config.getWriteBufferHighWaterMark());
+ ch, config.getWriteBufferHighWaterMark());
}
config.setAutoRead(false);
} else {
if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}",
- ch, config.getWriteBufferLowWaterMark());
+ ch, config.getWriteBufferLowWaterMark());
}
config.setAutoRead(true);
}
}
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ ctx.channel().close();
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
index 91d4ac2..866ebb6 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.remote.utils;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
-
/**
* constant
*/
@@ -30,6 +29,10 @@ public class Constants {
public static final String SLASH = "/";
+ public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000;
+
+ public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60;
+
/**
* charset
*/