You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/19 12:45:56 UTC

[incubator-inlong] branch master updated: [INLONG-2491][Dataproxy] update dataproxy netty version to 4.1.72.Final and log4j to log4j2 (#2599)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f4a259  [INLONG-2491][Dataproxy] update dataproxy netty version to 4.1.72.Final and log4j to log4j2 (#2599)
5f4a259 is described below

commit 5f4a2590ec2d5fdb43f08425ed54343c33df8000
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Feb 19 20:45:51 2022 +0800

    [INLONG-2491][Dataproxy] update dataproxy netty version to 4.1.72.Final and log4j to log4j2 (#2599)
---
 inlong-dataproxy/bin/dataproxy-start.sh            |   2 +-
 inlong-dataproxy/bin/flume-ng                      |  24 +-
 inlong-dataproxy/conf/log4j.properties             | 119 ----------
 inlong-dataproxy/conf/log4j2.xml                   | 114 +++++++++
 inlong-dataproxy/dataproxy-source/pom.xml          |  12 +-
 .../inlong/dataproxy/config/pojo/PulsarConfig.java |  28 ++-
 .../inlong/dataproxy/consts/ConfigConstants.java   |   2 +
 .../apache/inlong/dataproxy/sink/PulsarSink.java   |  63 +++--
 .../dataproxy/sink/pulsar/PulsarClientService.java |  30 +--
 .../apache/inlong/dataproxy/source/BaseSource.java |  85 +++++--
 .../dataproxy/source/DefaultServiceDecoder.java    |  40 ++--
 .../dataproxy/source/ServerMessageFactory.java     |  63 ++---
 .../dataproxy/source/ServerMessageHandler.java     | 256 ++++++++++-----------
 .../inlong/dataproxy/source/ServiceDecoder.java    |  11 +-
 .../dataproxy/source/SimpleMessageHandler.java     | 234 ++++++++++---------
 .../inlong/dataproxy/source/SimpleTcpSource.java   |  94 ++++----
 .../inlong/dataproxy/source/SimpleUdpSource.java   |  37 ++-
 .../inlong/dataproxy/utils/EventLoopUtil.java      | 108 +++++++++
 inlong-dataproxy/pom.xml                           |  58 ++++-
 19 files changed, 791 insertions(+), 589 deletions(-)

diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh
index 94c780c..f2b1188 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -28,4 +28,4 @@ do
 done
 
 cd .. || exit
-nohup bin/flume-ng agent --conf conf/ -f conf/flume.conf -n agent1 --no-reload-conf  > dataproxy.log 2>&1 &
\ No newline at end of file
+nohup bin/flume-ng agent --conf conf/ -f conf/flume.conf -n agent1 --no-reload-conf  > /dev/null 2>&1 &
\ No newline at end of file
diff --git a/inlong-dataproxy/bin/flume-ng b/inlong-dataproxy/bin/flume-ng
index e0d2e30..8923353 100755
--- a/inlong-dataproxy/bin/flume-ng
+++ b/inlong-dataproxy/bin/flume-ng
@@ -226,7 +226,29 @@ run_flume() {
 
 # set default params
 FLUME_CLASSPATH=""
-JAVA_OPTS="-Xmx1024m -Xms512m -Xmn512m -XX:SurvivorRatio=6"
+
+# Extra options to be passed to the jvm
+DATA_PROXY_MEM=${DATA_PROXY_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
+
+# Garbage collection options
+DATA_PROXY_GC=${DATA_PROXY_GC:-"-XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC"}
+
+# Garbage collection log.
+IS_JAVA_8=`java -version 2>&1 |grep version|grep '"1\.8'`
+# java version has space, use [[ -n $PARAM ]] to judge if variable exists
+if [[ -n $IS_JAVA_8 ]]; then
+  DATA_PROXY_GC_LOG=${DATA_PROXY_GC_LOG:-"-Xloggc:logs/dataproxy_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=20M"}
+else
+# After jdk 9, gc log param should config like this. Ignoring version less than jdk 8
+  DATA_PROXY_GC_LOG=${DATA_PROXY_GC_LOG:-"-Xlog:gc*:logs/dataproxy_gc_%p.log:time,uptime:filecount=10,filesize=20M"}
+fi
+
+LOG_PATH=$(pwd)/logs
+# Extra options to be passed to the jvm
+DATA_PROXY_EXTRA_OPTS=${DATA_PROXY_EXTRA_OPTS:-"-Ddataproxy.log.path="$LOG_PATH" -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"}
+
+JAVA_OPTS="$DATA_PROXY_MEM $DATA_PROXY_GC $DATA_PROXY_GC_LOG $DATA_PROXY_EXTRA_OPTS"
+
 #LD_LIBRARY_PATH=""
 
 opt_conf=""
diff --git a/inlong-dataproxy/conf/log4j.properties b/inlong-dataproxy/conf/log4j.properties
deleted file mode 100644
index 53b6dd7..0000000
--- a/inlong-dataproxy/conf/log4j.properties
+++ /dev/null
@@ -1,119 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,debug,info,warn,error
-log.dir = ./logs
-log.file.name.prefix=dataproxy
-log.file.info.name.postfix=.info
-log.file.debug.name.postfix=.debug
-log.file.warn.name.postfix=.warn
-log.file.error.name.postfix=.error
-log.file.stat.name=dataproxy_stat.log
-log.file.monitors.name=dataproxy_monitors.log
-log.file.index.name=dataproxy_index.log
-
-
-log4j.logger.org.apache.inlong.commons.monitor.MonitorIndexExt = info,monitor
-log4j.additivity.org.apache.inlong.commons.monitor.MonitorIndexExt = false
-log4j.appender.monitor=org.apache.log4j.RollingFileAppender
-log4j.appender.monitor.MaxFileSize=100MB
-log4j.appender.monitor.MaxBackupIndex=10
-log4j.appender.monitor.BufferedIO=false
-log4j.appender.monitor.BufferSize=8192
-log4j.appender.monitor.layout=org.apache.log4j.PatternLayout
-log4j.appender.monitor.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
-log4j.appender.monitor.Threshold = INFO 
-log4j.appender.monitor.Append=true
-log4j.appender.monitor.File=${log.dir}/${log.file.monitors.name}
-
-log4j.logger.org.apache.inlong.commons.monitor.MonitorIndex=info,index
-log4j.additivity.org.apache.inlong.commons.monitor.MonitorIndex = false
-log4j.appender.index=org.apache.log4j.RollingFileAppender
-log4j.appender.index.MaxFileSize=100MB
-log4j.appender.index.MaxBackupIndex=10
-log4j.appender.index.BufferedIO=false
-log4j.appender.index.BufferSize=8192
-log4j.appender.index.layout=org.apache.log4j.PatternLayout
-log4j.appender.index.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
-log4j.appender.index.Threshold = INFO 
-log4j.appender.index.Append=true
-log4j.appender.index.File=${log.dir}/${log.file.index.name}
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-
-log4j.appender.stat=org.apache.log4j.RollingFileAppender
-log4j.appender.stat.MaxFileSize=100MB
-log4j.appender.stat.MaxBackupIndex=10
-log4j.appender.stat.BufferedIO=false
-log4j.appender.stat.BufferSize=8192
-log4j.appender.stat.layout=org.apache.log4j.PatternLayout
-log4j.appender.stat.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.stat.Threshold = INFO 
-log4j.appender.stat.Append=true
-log4j.appender.stat.File=${log.dir}/${log.file.stat.name}
-
-log4j.logger.info=info
-log4j.appender.info=org.apache.log4j.RollingFileAppender
-log4j.appender.info.MaxFileSize=100MB
-log4j.appender.info.MaxBackupIndex=10
-log4j.appender.info.BufferedIO=false
-log4j.appender.info.BufferSize=8192
-log4j.appender.info.layout=org.apache.log4j.PatternLayout
-log4j.appender.info.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.info.Threshold = INFO 
-log4j.appender.info.Append=true
-log4j.appender.info.File=${log.dir}/${log.file.name.prefix}${log.file.info.name.postfix}
-
-log4j.logger.debug=debug
-log4j.appender.debug=org.apache.log4j.RollingFileAppender
-log4j.appender.debug.MaxFileSize=100MB
-log4j.appender.debug.MaxBackupIndex=10
-log4j.appender.debug.BufferedIO=false
-log4j.appender.debug.BufferSize=8192
-log4j.appender.debug.layout=org.apache.log4j.PatternLayout
-log4j.appender.debug.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.debug.Threshold = DEBUG 
-log4j.appender.debug.Append=true
-log4j.appender.debug.File=${log.dir}/${log.file.name.prefix}${log.file.debug.name.postfix}
-
-log4j.logger.warn=warn
-log4j.appender.warn=org.apache.log4j.RollingFileAppender
-log4j.appender.warn.MaxFileSize=100MB
-log4j.appender.warn.MaxBackupIndex=10
-log4j.appender.warn.BufferedIO=false
-log4j.appender.warn.BufferSize=8192
-log4j.appender.warn.layout=org.apache.log4j.PatternLayout
-log4j.appender.warn.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.warn.Threshold = WARN 
-log4j.appender.warn.Append=true
-log4j.appender.warn.File=${log.dir}/${log.file.name.prefix}${log.file.warn.name.postfix}
-
-log4j.logger.error=error
-log4j.appender.error=org.apache.log4j.RollingFileAppender
-log4j.appender.error.MaxFileSize=100MB
-log4j.appender.error.MaxBackupIndex=10
-log4j.appender.error.BufferedIO=false
-log4j.appender.error.BufferSize=8192
-log4j.appender.error.layout=org.apache.log4j.PatternLayout
-log4j.appender.error.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.error.Threshold = ERROR 
-log4j.appender.error.Append=true
-log4j.appender.error.File=${log.dir}/${log.file.name.prefix}${log.file.error.name.postfix}
diff --git a/inlong-dataproxy/conf/log4j2.xml b/inlong-dataproxy/conf/log4j2.xml
new file mode 100644
index 0000000..615de41
--- /dev/null
+++ b/inlong-dataproxy/conf/log4j2.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration status="WARN" monitorInterval="30">
+    <Properties>
+        <property name="basePath">${sys:dataproxy.log.path}</property>
+        <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p ${PID:-} [%15.15t] %-30.30C{1.} : %m%n</property>
+        <property name="every_file_size">1G</property>
+        <property name="output_log_level">DEBUG</property>
+        <property name="rolling_max">50</property>
+        <property name="info_fileName">${basePath}/info.log</property>
+        <property name="info_filePattern">${basePath}/info-%d{yyyy-MM-dd}-%i.log.gz</property>
+        <property name="info_max">10</property>
+        <property name="warn_fileName">${basePath}/warn.log</property>
+        <property name="warn_filePattern">${basePath}/warn-%d{yyyy-MM-dd}-%i.log.gz</property>
+        <property name="warn_max">10</property>
+        <property name="error_fileName">${basePath}/error.log</property>
+        <property name="error_filePattern">${basePath}/error-%d{yyyy-MM-dd}-%i.log.gz</property>
+        <property name="error_max">10</property>
+        <property name="console_print_level">DEBUG</property>
+        <property name="index_fileName">${basePath}/index.log</property>
+        <property name="index_filePattern">${basePath}/index-%d{yyyy-MM-dd}-%i.log.gz</property>
+        <property name="monitors_fileName">${basePath}/monitors.log</property>
+        <property name="monitors_filePattern">${basePath}/monitors-%d{yyyy-MM-dd}-%i.log.gz</property>
+    </Properties>
+
+    <appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="${log_pattern}"/>
+        </Console>
+
+        <RollingFile name="InfoFile" fileName="${info_fileName}" filePattern="${info_filePattern}">
+            <PatternLayout pattern="${log_pattern}"/>
+            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+            <DefaultRolloverStrategy max="${info_max}" />
+            <Filters>
+                <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL"/>
+                <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            </Filters>
+        </RollingFile>
+
+        <RollingFile name="IndexFile" fileName="${index_fileName}" filePattern="${index_filePattern}">
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %m%n"/>
+            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+            <DefaultRolloverStrategy max="${info_max}" />
+            <Filters>
+                <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL"/>
+                <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            </Filters>
+        </RollingFile>
+
+        <RollingFile name="MonitorFile" fileName="${monitors_fileName}" filePattern="${monitors_filePattern}">
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %m%n"/>
+            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+            <DefaultRolloverStrategy max="${info_max}" />
+            <Filters>
+                <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL"/>
+                <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            </Filters>
+        </RollingFile>
+
+        <RollingFile name="WarnFile" fileName="${warn_fileName}" filePattern="${warn_filePattern}">
+            <PatternLayout pattern="${log_pattern}"/>
+            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+            <DefaultRolloverStrategy max="${warn_max}" />
+            <Filters>
+                <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL"/>
+                <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/>
+            </Filters>
+        </RollingFile>
+
+        <RollingFile name="ErrorFile" fileName="${error_fileName}" filePattern="${error_filePattern}">
+            <PatternLayout pattern="${log_pattern}"/>
+            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+            <DefaultRolloverStrategy max="${error_max}" />
+            <Filters>
+                <ThresholdFilter level="FATAL" onMatch="DENY" onMismatch="NEUTRAL"/>
+                <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
+            </Filters>
+        </RollingFile>
+    </appenders>
+
+    <loggers>
+        <logger name="org.apache.inlong.commons.monitor.MonitorIndexExt" level="info" additivity="false">
+            <appender-ref ref="MonitorFile"/>
+        </logger>
+        <logger name="org.apache.inlong.commons.monitor.MonitorIndex" level="info" additivity="false">
+            <appender-ref ref="IndexFile"/>
+        </logger>
+        <root level="${output_log_level}">
+            <appender-ref ref="Console"/>
+            <appender-ref ref="InfoFile"/>
+            <appender-ref ref="WarnFile"/>
+            <appender-ref ref="ErrorFile"/>
+        </root>
+    </loggers>
+</configuration>
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/pom.xml b/inlong-dataproxy/dataproxy-source/pom.xml
index 6950f75..6bffcf4 100644
--- a/inlong-dataproxy/dataproxy-source/pom.xml
+++ b/inlong-dataproxy/dataproxy-source/pom.xml
@@ -39,14 +39,14 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>tubemq-client</artifactId>
-            <version>${project.version}</version>
-            <scope>compile</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
             <artifactId>audit-sdk</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-simple</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/PulsarConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/PulsarConfig.java
index 10ef942..a3b2d38 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/PulsarConfig.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/PulsarConfig.java
@@ -48,6 +48,12 @@ public class PulsarConfig extends Context {
     private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
     private static final String PULSAR_IO_THREADS = "pulsar_io_threads";
     private static final String PULSAR_CONNECTIONS_PRE_BROKER = "connections_pre_broker";
+    private static final String MAX_BATCHING_BYTES = "max_batching_bytes";
+    private static final String MAX_BATCHING_PUBLISH_DELAY_MILLIS =
+            "max_batching_publish_delay_millis";
+    private static final String EVENT_QUEUE_SIZE = "event_queue_size";
+
+    private static final String BAD_EVENT_QUEUE_SIZE = "bad_event_queue_size";
     /*
      * properties for stat
      */
@@ -64,16 +70,28 @@ public class PulsarConfig extends Context {
     private static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
     private static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
     private static final int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
+    private static final int DEFAULT_MAX_BATCHING_BYTES = 128 * 1024;
+    private static final long DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS = 1L;
     private static final int DEFAULT_RETRY_CNT = -1;
     private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
     private static final int DEFAULT_STAT_INTERVAL_SEC = 60;
     private static final int DEFAULT_THREAD_NUM = 4;
     private static final boolean DEFAULT_CLIENT_ID_CACHE = true;
     private static final long DEFAULT_DISK_IO_RATE_PER_SEC = 0L;
-    private static int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil
+    private static final int DEFAULT_EVENT_QUEUE_SIZE = 10000;
+    private static final int DEFAULT_BAD_EVENT_QUEUE_SIZE = 10000;
+    private static final int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil
             .getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
     private static final int DEFAULT_CONNECTIONS_PRE_BROKER = 1;
 
+    public int getEventQueueSize() {
+        return getInteger(EVENT_QUEUE_SIZE, DEFAULT_EVENT_QUEUE_SIZE);
+    }
+
+    public int getBadEventQueueSize() {
+        return getInteger(BAD_EVENT_QUEUE_SIZE, DEFAULT_BAD_EVENT_QUEUE_SIZE);
+    }
+
     public Map<String, String> getUrl2token() {
         return url2token;
     }
@@ -102,6 +120,14 @@ public class PulsarConfig extends Context {
         return getInteger(CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT_SECOND);
     }
 
+    public int getMaxBatchingBytes() {
+        return getInteger(MAX_BATCHING_BYTES, DEFAULT_MAX_BATCHING_BYTES);
+    }
+
+    public long getMaxBatchingPublishDelayMillis() {
+        return getLong(MAX_BATCHING_PUBLISH_DELAY_MILLIS, DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS);
+    }
+
     public boolean getEnableBatch() {
         return getBoolean(ENABLE_BATCH, DEFAULT_ENABLE_BATCH);
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index fedcc80..d498b9c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -54,6 +54,8 @@ public class ConfigConstants {
 
     public static final String MAX_THREADS = "max-threads";
 
+    public static final String ENABLE_BUSY_WAIT = "enableBusyWait";
+
     public static final String STAT_INTERVAL_SEC = "stat-interval-sec";
 
     public static final String HEART_INTERVAL_SEC = "heart-interval-sec";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 7d2d90e..2c20525 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -91,9 +91,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
     /*
      * default value
      */
-    private static int BAD_EVENT_QUEUE_SIZE = 10000;
     private static int BATCH_SIZE = 10000;
-
     /*
      * for log
      */
@@ -141,12 +139,16 @@ public class PulsarSink extends AbstractSink implements Configurable,
     private static final LogCounter logPrinterB = new LogCounter(10, 100000, 60 * 1000);
     private static final LogCounter logPrinterC = new LogCounter(10, 100000, 60 * 1000);
 
-    private static int EVENT_QUEUE_SIZE = 1000;
+    private static final String SINK_THREAD_NUM = "thread-num";
+    private int eventQueueSize = 10000;
+    private int badEventQueueSize = 10000;
+    private int threadNum;
 
     /*
      * send thread pool
      */
     private Thread[] sinkThreadPool;
+    private PulsarClientService pulsarClientService;
     private LinkedBlockingQueue<Event> eventQueue;
 
     private static final String SEPARATOR = "#";
@@ -156,9 +158,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
     private SinkCounter sinkCounter;
     private ConfigManager configManager;
     private Map<String, String> topicProperties;
+
     private Map<String, String> pulsarCluster;
     private PulsarConfig pulsarConfig;
-    private PulsarClientService pulsarClientService;
 
     private static final Long PRINT_INTERVAL = 30L;
 
@@ -194,6 +196,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
      * configure
      * @param context
      */
+    @Override
     public void configure(Context context) {
         logger.info("PulsarSink started and context = {}", context.toString());
         isNewMetricOn = context.getBoolean("new-metric-on", true);
@@ -203,11 +206,15 @@ public class PulsarSink extends AbstractSink implements Configurable,
         topicProperties = configManager.getTopicProperties();
         pulsarCluster = configManager.getPulsarUrl2Token();
         pulsarConfig = configManager.getPulsarConfig(); //pulsar common config
+        pulsarClientService = new PulsarClientService(pulsarConfig);
         configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
             @Override
             public void update() {
-                diffSetPublish(new HashSet<String>(topicProperties.values()),
-                        new HashSet<String>(configManager.getTopicProperties().values()));
+                if (pulsarClientService != null) {
+                    diffSetPublish(pulsarClientService,
+                            new HashSet<String>(topicProperties.values()),
+                            new HashSet<String>(configManager.getTopicProperties().values()));
+                }
             }
         });
         configManager.getPulsarCluster().addUpdateCallback(new ConfigUpdateCallback() {
@@ -216,23 +223,24 @@ public class PulsarSink extends AbstractSink implements Configurable,
                 diffRestartPulsarClient(pulsarCluster.keySet(), configManager.getPulsarUrl2Token().keySet());
             }
         });
-        resendQueue = new LinkedBlockingQueue<EventStat>(BAD_EVENT_QUEUE_SIZE);
+        badEventQueueSize = pulsarConfig.getBadEventQueueSize();
+        resendQueue = new LinkedBlockingQueue<EventStat>(badEventQueueSize);
 
         Preconditions.checkArgument(pulsarConfig.getThreadNum() > 0, "threadNum must be > 0");
         sinkThreadPool = new Thread[pulsarConfig.getThreadNum()];
-        eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
+        eventQueueSize = pulsarConfig.getEventQueueSize();
+        eventQueue = new LinkedBlockingQueue<Event>(eventQueueSize);
 
         if (pulsarConfig.getDiskIoRatePerSec() != 0) {
             diskRateLimiter = RateLimiter.create(pulsarConfig.getDiskIoRatePerSec());
         }
-        pulsarClientService = new PulsarClientService(pulsarConfig);
 
         if (sinkCounter == null) {
             sinkCounter = new SinkCounter(getName());
         }
     }
 
-    private void initTopicSet(Set<String> topicSet) throws Exception {
+    private void initTopicSet(PulsarClientService pulsarClientService, Set<String> topicSet) {
         long startTime = System.currentTimeMillis();
         if (topicSet != null) {
             for (String topic : topicSet) {
@@ -250,8 +258,8 @@ public class PulsarSink extends AbstractSink implements Configurable,
      * @param originalSet
      * @param endSet
      */
-    public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
-
+    public void diffSetPublish(PulsarClientService pulsarClientService, Set<String> originalSet,
+            Set<String> endSet) {
         boolean changed = false;
         for (String s : endSet) {
             if (!originalSet.contains(s)) {
@@ -284,7 +292,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
             configManager.getPulsarConfig().setUrl2token(pulsarCluster);
             pulsarClientService.initCreateConnection(this);
             try {
-                initTopicSet(new HashSet<String>(topicProperties.values()));
+                initTopicSet(pulsarClientService, new HashSet<String>(topicProperties.values()));
             } catch (Exception e) {
                 logger.info("pulsar sink restart, publish topic fail.", e);
             }
@@ -314,14 +322,14 @@ public class PulsarSink extends AbstractSink implements Configurable,
         this.canSend = true;
         this.canTake = true;
 
-        try {
-            initTopicSet(new HashSet<String>(topicProperties.values()));
-        } catch (Exception e) {
-            logger.info("pulsar sink start publish topic fail.", e);
-        }
-
         for (int i = 0; i < sinkThreadPool.length; i++) {
-            sinkThreadPool[i] = new Thread(new SinkTask(), getName()
+            try {
+                initTopicSet(pulsarClientService,
+                        new HashSet<String>(topicProperties.values()));
+            } catch (Exception e) {
+                logger.info("pulsar sink start publish topic fail.", e);
+            }
+            sinkThreadPool[i] = new Thread(new SinkTask(pulsarClientService), getName()
                     + "_pulsar_sink_sender-"
                     + i);
             sinkThreadPool[i].start();
@@ -332,7 +340,6 @@ public class PulsarSink extends AbstractSink implements Configurable,
     @Override
     public void stop() {
         logger.info("pulsar sink stopping");
-        pulsarClientService.close();
         this.canTake = false;
         int waitCount = 0;
         while (eventQueue.size() != 0 && waitCount++ < 10) {
@@ -351,6 +358,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
                 logger.warn("stat runner interrupted");
             }
         }
+        if (pulsarClientService != null) {
+            pulsarClientService.close();
+        }
         if (sinkThreadPool != null) {
             for (Thread thread : sinkThreadPool) {
                 if (thread != null) {
@@ -359,6 +369,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
             }
             sinkThreadPool = null;
         }
+
         super.stop();
         if (!scheduledExecutorService.isShutdown()) {
             scheduledExecutorService.shutdown();
@@ -620,6 +631,13 @@ public class PulsarSink extends AbstractSink implements Configurable,
     }
 
     class SinkTask implements Runnable {
+
+        private PulsarClientService pulsarClientService;
+
+        public SinkTask(PulsarClientService pulsarClientService) {
+            this.pulsarClientService = pulsarClientService;
+        }
+
         @Override
         public void run() {
             logger.info("Sink task {} started.", Thread.currentThread().getName());
@@ -673,6 +691,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
                     if (logger.isDebugEnabled()) {
                         logger.debug("Event is {}, topic = {} ", event, topic);
                     }
+
                     if (event == null) {
                         continue;
                     }
@@ -714,7 +733,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
                                     getName(), clientId);
                         }
                     } else {
-                        if (clientId != null) {
+                        if (pulsarConfig.getClientIdCache() && clientId != null) {
                             agentIdCache.put(clientId, System.currentTimeMillis());
                         }
                         boolean sendResult = pulsarClientService.sendMessage(topic, event,
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index d219395..b121875 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -55,7 +55,6 @@ public class PulsarClientService {
     /*
      * for pulsar client
      */
-    private String[] pulsarServerUrls;
     private Map<String, String> pulsarUrl2token;
 
     private String authType;
@@ -67,7 +66,9 @@ public class PulsarClientService {
     private boolean enableBatch = true;
     private boolean blockIfQueueFull = true;
     private int maxPendingMessages = 10000;
+    private int maxBatchingBytes = 128 * 1024;
     private int maxBatchingMessages = 1000;
+    private long maxBatchingPublishDelayMillis = 1;
     private long retryIntervalWhenSendMsgError = 30 * 1000L;
     public Map<String, List<TopicProducerInfo>> producerInfoMap;
     public Map<String, AtomicLong> topicSendIndexMap;
@@ -99,18 +100,11 @@ public class PulsarClientService {
         blockIfQueueFull = pulsarConfig.getBlockIfQueueFull();
         maxPendingMessages = pulsarConfig.getMaxPendingMessages();
         maxBatchingMessages = pulsarConfig.getMaxBatchingMessages();
+        maxBatchingBytes = pulsarConfig.getMaxBatchingBytes();
+        maxBatchingPublishDelayMillis = pulsarConfig.getMaxBatchingPublishDelayMillis();
         producerInfoMap = new ConcurrentHashMap<String, List<TopicProducerInfo>>();
         topicSendIndexMap = new ConcurrentHashMap<String, AtomicLong>();
         localIp = NetworkUtils.getLocalIp();
-
-        //        retryIntervalWhenSendMsgError = context.getLong(RETRY_INTERVAL_WHEN_SEND_ERROR_MILL,
-//                DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL);
-//        clientTimeout = context.getInteger(CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT_SECOND);
-//        enableBatch = context.getBoolean(ENABLE_BATCH, DEFAULT_ENABLE_BATCH);
-//        blockIfQueueFull = context.getBoolean(BLOCK_IF_QUEUE_FULL, DEFAULT_BLOCK_IF_QUEUE_FULL);
-//        maxPendingMessages = context.getInteger(MAX_PENDING_MESSAGES, DEFAULT_MAX_PENDING_MESSAGES);
-//        maxBatchingMessages = context.getInteger(MAX_BATCHING_MESSAGES, DEFAULT_MAX_BATCHING_MESSAGES);
-
     }
 
     public void initCreateConnection(CreatePulsarClientCallBack callBack) {
@@ -157,6 +151,8 @@ public class PulsarClientService {
              * After 30s, reopen the topic check, if it is still a null value,
              *  put it back into the illegal map
              */
+            sendMessageCallBack.handleMessageSendException(topic, es, new Exception("producer is "
+                    + "null"));
             return false;
         }
 
@@ -240,12 +236,18 @@ public class PulsarClientService {
 
     public List<TopicProducerInfo> initTopicProducer(String topic) {
         List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
-            List<TopicProducerInfo> newList = new ArrayList<>();
+            List<TopicProducerInfo> newList = null;
             if (pulsarClients != null) {
+                newList = new ArrayList<>();
                 for (PulsarClient pulsarClient : pulsarClients) {
                     TopicProducerInfo info = new TopicProducerInfo(pulsarClient, topic);
                     info.initProducer();
-                    newList.add(info);
+                    if (info.isCanUseToSendMessage()) {
+                        newList.add(info);
+                    }
+                }
+                if (newList.size() == 0) {
+                    newList = null;
                 }
             }
             return newList;
@@ -258,7 +260,7 @@ public class PulsarClientService {
         AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> {
             return new AtomicLong(0);
         });
-        int maxTryToGetProducer = producerList.size();
+        int maxTryToGetProducer = producerList == null ? 0 : producerList.size();
         if (maxTryToGetProducer == 0) {
             return null;
         }
@@ -329,6 +331,8 @@ public class PulsarClientService {
                         .blockIfQueueFull(blockIfQueueFull)
                         .maxPendingMessages(maxPendingMessages)
                         .batchingMaxMessages(maxBatchingMessages)
+                        .batchingMaxBytes(maxBatchingBytes)
+                        .batchingMaxPublishDelay(maxBatchingPublishDelayMillis, TimeUnit.MILLISECONDS)
                         .create();
                 isFinishInit = true;
             } catch (PulsarClientException e) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 9e0c651..83bc962 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -18,6 +18,13 @@
 package org.apache.inlong.dataproxy.source;
 
 import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.GlobalEventExecutor;
 import java.lang.reflect.Constructor;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.ChannelSelector;
@@ -32,11 +39,6 @@ import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.commons.monitor.MonitorIndex;
 import org.apache.inlong.commons.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
-import org.jboss.netty.bootstrap.Bootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,11 +94,17 @@ public abstract class BaseSource
    * netty server
    */
 
-  protected Bootstrap serverBootstrap = null;
+  protected EventLoopGroup acceptorGroup;
+
+  protected EventLoopGroup workerGroup;
+
+  protected DefaultThreadFactory acceptorThreadFactory;
+
+  protected boolean enableBusyWait = false;
 
   protected ChannelGroup allChannels;
 
-  protected Channel nettyChannel = null;
+  protected ChannelFuture channelFuture;
 
   private static String HOST_DEFAULT_VALUE = "0.0.0.0";
 
@@ -114,13 +122,29 @@ public abstract class BaseSource
 
   private static int INTERVAL_SEC = 60;
 
+  protected static int BUFFER_SIZE_MUST_THAN = 0;
+
   protected static int DEFAULT_MAX_THREADS = 32;
 
+  protected static int RECEIVE_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+  protected static int SEND_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+  protected static int RECEIVE_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+  protected static int SEND_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+  protected int receiveBufferSize;
+
+  protected int sendBufferSize;
+
   protected int maxThreads = 32;
 
+  protected int acceptorThreads = 1;
+
   public BaseSource() {
     super();
-    allChannels = new DefaultChannelGroup();
+    allChannels = new DefaultChannelGroup("DefaultChannelGroup", GlobalEventExecutor.INSTANCE);
   }
 
   @Override
@@ -149,21 +173,12 @@ public abstract class BaseSource
       try {
         allChannels.close().awaitUninterruptibly();
       } catch (Exception e) {
-        logger.warn("Simple UDP Source netty server stop ex, {}", e);
+        logger.warn("Simple Source netty server stop ex, {}", e);
       } finally {
         allChannels.clear();
       }
     }
 
-    if (serverBootstrap != null) {
-      try {
-        serverBootstrap.releaseExternalResources();
-      } catch (Exception e) {
-        logger.warn("Simple UDP Source serverBootstrap stop ex {}", e);
-      } finally {
-        serverBootstrap = null;
-      }
-    }
     super.stop();
     if (monitorIndex != null) {
       monitorIndex.shutDown();
@@ -171,6 +186,14 @@ public abstract class BaseSource
     if (monitorIndexExt != null) {
       monitorIndexExt.shutDown();
     }
+
+    if (channelFuture != null) {
+      try {
+        channelFuture.channel().closeFuture().sync();
+      } catch (InterruptedException e) {
+        logger.warn("Simple Source netty server stop ex, {}", e);
+      }
+    }
     logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), this.getName());
   }
 
@@ -245,6 +268,22 @@ public abstract class BaseSource
               context.getString(ConfigConstants.MAX_THREADS));
     }
 
+    receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, RECEIVE_BUFFER_DEFAULT_SIZE);
+    if (receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
+      receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
+    }
+    Preconditions.checkArgument(receiveBufferSize > BUFFER_SIZE_MUST_THAN,
+            "receiveBufferSize must be > 0");
+
+    sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, SEND_BUFFER_DEFAULT_SIZE);
+    if (sendBufferSize > SEND_BUFFER_MAX_SIZE) {
+      sendBufferSize = SEND_BUFFER_MAX_SIZE;
+    }
+    Preconditions.checkArgument(sendBufferSize > BUFFER_SIZE_MUST_THAN,
+            "sendBufferSize must be > 0");
+
+    enableBusyWait = context.getBoolean(ConfigConstants.ENABLE_BUSY_WAIT, false);
+
     this.customProcessor = context.getBoolean(ConfigConstants.CUSTOM_CHANNEL_PROCESSOR, false);
   }
 
@@ -252,21 +291,21 @@ public abstract class BaseSource
    * channel factory
    * @return
    */
-  public ChannelPipelineFactory getChannelPiplineFactory() {
+  public ChannelInitializer getChannelInitializerFactory() {
     logger.info(new StringBuffer("load msgFactory=").append(msgFactoryName)
             .append(" and serviceDecoderName=").append(serviceDecoderName).toString());
-    ChannelPipelineFactory fac = null;
+    ChannelInitializer fac = null;
     try {
       ServiceDecoder serviceDecoder = (ServiceDecoder)Class.forName(serviceDecoderName).newInstance();
-      Class<? extends ChannelPipelineFactory> clazz =
-              (Class<? extends ChannelPipelineFactory>) Class.forName(msgFactoryName);
+      Class<? extends ChannelInitializer> clazz =
+              (Class<? extends ChannelInitializer>) Class.forName(msgFactoryName);
       Constructor ctor = clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
               String.class, ServiceDecoder.class, String.class, Integer.class,
               String.class, String.class, Boolean.class,
               Integer.class, Boolean.class, MonitorIndex.class,
               MonitorIndexExt.class, String.class);
       logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
-      fac = (ChannelPipelineFactory) ctor.newInstance(this, allChannels,
+      fac = (ChannelInitializer) ctor.newInstance(this, allChannels,
               this.getProtocolName(), serviceDecoder, messageHandlerName, maxMsgLength,
               topic, attr, filterEmptyMsg,
               maxConnections, isCompressed, monitorIndex,
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index d2c7ff6..c79a4a8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -19,6 +19,8 @@ package org.apache.inlong.dataproxy.source;
 
 import com.google.common.base.Splitter;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -34,9 +36,6 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.MessageEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
@@ -77,14 +76,12 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * @param resultMap
      * @param cb
      * @param channel
-     * @param msgEvent
      * @param totalDataLen
      * @return
      * @throws
      */
     private Map<String, Object> extractNewBinHB(Map<String, Object> resultMap,
-            ChannelBuffer cb, Channel channel,
-            MessageEvent msgEvent, int totalDataLen) throws Exception {
+            ByteBuf cb, Channel channel, int totalDataLen) throws Exception {
         int msgHeadPos = cb.readerIndex() - 5;
 
         // check validation
@@ -119,7 +116,6 @@ public class DefaultServiceDecoder implements ServiceDecoder {
     }
 
     private void handleDateTime(Map<String, String> commonAttrMap, Channel channel,
-            MessageEvent msgEvent,
             long uniq, long dataTime, int msgCount) {
         commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
         String time = "";
@@ -134,10 +130,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
          * udp need use msgEvent get remote address
          */
         String remoteAddress = "";
-        if (channel != null && channel.getRemoteAddress() != null) {
-            remoteAddress = channel.getRemoteAddress().toString();
-        } else if (msgEvent != null && msgEvent.getRemoteAddress() != null) {
-            remoteAddress = msgEvent.getRemoteAddress().toString();
+        if (channel != null && channel.remoteAddress() != null) {
+            remoteAddress = channel.remoteAddress().toString();
         }
         sidBuilder.append(remoteAddress).append("#").append(time)
             .append("#").append(uniq);
@@ -151,7 +145,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             String.valueOf(msgCount != 0 ? msgCount : 1));
     }
 
-    private boolean handleExtMap(Map<String, String> commonAttrMap, ChannelBuffer cb,
+    private boolean handleExtMap(Map<String, String> commonAttrMap, ByteBuf cb,
         Map<String, Object> resultMap, int extendField, int msgHeadPos) {
         boolean index = false;
         if ((extendField & 0x8) == 0x8) {
@@ -177,7 +171,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
         return index;
     }
 
-    private ByteBuffer handleTrace(Channel channel, ChannelBuffer cb, int extendField,
+    private ByteBuffer handleTrace(Channel channel, ByteBuf cb, int extendField,
         int msgHeadPos, int totalDataLen, int attrLen, String strAttr, int bodyLen) {
         // whether enable trace
         boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1);
@@ -190,7 +184,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             String traceInfo;
             String strNode2Ip = null;
 
-            SocketAddress loacalSockAddr = channel.getLocalAddress();
+            SocketAddress loacalSockAddr = channel.localAddress();
             if (null != loacalSockAddr) {
                 strNode2Ip = loacalSockAddr.toString();
                 try {
@@ -237,14 +231,13 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * @param resultMap
      * @param cb
      * @param channel
-     * @param msgEvent
      * @param totalDataLen
      * @param msgType
      * @return
      * @throws Exception
      */
     private Map<String, Object> extractNewBinData(Map<String, Object> resultMap,
-            ChannelBuffer cb, Channel channel, MessageEvent msgEvent,
+            ByteBuf cb, Channel channel,
             int totalDataLen, MsgType msgType) throws Exception {
         int msgHeadPos = cb.readerIndex() - 5;
 
@@ -306,7 +299,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
         }
 
         try {
-            handleDateTime(commonAttrMap, channel, msgEvent, uniq, dataTime, msgCount);
+            handleDateTime(commonAttrMap, channel, uniq, dataTime, msgCount);
             final boolean index = handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos);
             ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos,
                 totalDataLen, attrLen, strAttr, bodyLen);
@@ -358,14 +351,12 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * @param cb
      * @param channel
      * @param totalDataLen
-     * @param msgEvent
      * @param msgType
      * @return
      * @throws Exception
      */
     private Map<String, Object> extractDefaultData(Map<String, Object> resultMap,
-            ChannelBuffer cb, Channel channel,
-            MessageEvent msgEvent,
+            ByteBuf cb, Channel channel,
             int totalDataLen, MsgType msgType) throws Exception {
         int bodyLen = cb.readInt();
         if (bodyLen == 0) {
@@ -481,8 +472,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * +--------+--------+--------+----------------+--------+----------------+------------------------+
      */
     @Override
-    public Map<String, Object> extractData(ChannelBuffer cb, Channel channel,
-            MessageEvent msgEvent) throws Exception {
+    public Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception {
         Map<String, Object> resultMap = new HashMap<String, Object>();
         if (null == cb) {
             LOG.error("cb == null");
@@ -509,14 +499,14 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             }
             // if it's bin heart beat.
             if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-                return extractNewBinHB(resultMap, cb, channel, msgEvent, totalDataLen);
+                return extractNewBinHB(resultMap, cb, channel, totalDataLen);
             }
 
             if (msgType.getValue() >= MsgType.MSG_BIN_MULTI_BODY.getValue()) {
                 resultMap.put(ConfigConstants.COMPRESS_TYPE, (compressType != 0) ? "snappy" : "");
-                return extractNewBinData(resultMap, cb, channel, msgEvent, totalDataLen, msgType);
+                return extractNewBinData(resultMap, cb, channel, totalDataLen, msgType);
             } else {
-                return extractDefaultData(resultMap, cb, channel, msgEvent, totalDataLen, msgType);
+                return extractDefaultData(resultMap, cb, channel, totalDataLen, msgType);
             }
 
         } else {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index fa42169..91dd75d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -17,6 +17,12 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 import java.lang.reflect.Constructor;
 import java.util.concurrent.TimeUnit;
 import org.apache.flume.channel.ChannelProcessor;
@@ -24,21 +30,11 @@ import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.commons.monitor.MonitorIndex;
 import org.apache.inlong.commons.monitor.MonitorIndexExt;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ServerMessageFactory implements ChannelPipelineFactory {
+public class ServerMessageFactory
+        extends ChannelInitializer<SocketChannel> {
 
     private static final Logger LOG = LoggerFactory.getLogger(ServerMessageFactory.class);
 
@@ -56,8 +52,6 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
 
     private ChannelGroup allChannels;
 
-    private ExecutionHandler executionHandler;
-
     private String protocolType;
 
     private ServiceDecoder serviceDecoder;
@@ -82,8 +76,6 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
 
     private MonitorIndexExt monitorIndexExt;
 
-    private Timer timer = new HashedWheelTimer();
-
     /**
      * get server factory
      *
@@ -122,37 +114,24 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
         this.isCompressed = isCompressed;
         this.monitorIndex = monitorIndex;
         this.monitorIndexExt = monitorIndexExt;
-        if (protocolType.equalsIgnoreCase(ConfigConstants.UDP_PROTOCOL)) {
-            this.executionHandler = new ExecutionHandler(
-                    new OrderedMemoryAwareThreadPoolExecutor(cores * 2,
-                            MAX_CHANNEL_MEMORY_SIZE, MAX_TOTAL_MEMORY_SIZE));
-        }
     }
 
     @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline cp = Channels.pipeline();
-        return addMessageHandlersTo(cp);
-    }
-
-    /**
-     * get message handlers
-     * @param cp
-     * @return
-     */
-    public ChannelPipeline addMessageHandlersTo(ChannelPipeline cp) {
+    protected void initChannel(SocketChannel ch) throws Exception {
 
         if (this.protocolType
                 .equalsIgnoreCase(ConfigConstants.TCP_PROTOCOL)) {
-            cp.addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
-                    this.maxMsgLength, 0, MSG_LENGTH_LEN, 0, 0, true));
-            cp.addLast("readTimeoutHandler", new ReadTimeoutHandler(timer,
-                    DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
+            ch.pipeline().addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
+                    this.maxMsgLength, 0,
+                    MSG_LENGTH_LEN, 0, 0, true));
+            ch.pipeline().addLast("readTimeoutHandler",
+                    new ReadTimeoutHandler(DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
         }
 
         if (processor != null) {
             try {
-                Class<? extends SimpleChannelHandler> clazz = (Class<? extends SimpleChannelHandler>) Class
+                Class<? extends ChannelInboundHandlerAdapter> clazz
+                        = (Class<? extends ChannelInboundHandlerAdapter>) Class
                         .forName(messageHandlerName);
 
                 Constructor<?> ctor = clazz.getConstructor(
@@ -161,22 +140,16 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
                         Integer.class, Boolean.class, MonitorIndex.class,
                         MonitorIndexExt.class, String.class);
 
-                SimpleChannelHandler messageHandler = (SimpleChannelHandler) ctor
+                ChannelInboundHandlerAdapter messageHandler = (ChannelInboundHandlerAdapter) ctor
                         .newInstance(source, serviceDecoder, allChannels, topic, attr,
                                 filterEmptyMsg, maxConnections,
                                 isCompressed,  monitorIndex, monitorIndexExt, protocolType
                         );
 
-                cp.addLast("messageHandler", messageHandler);
+                ch.pipeline().addLast("messageHandler", messageHandler);
             } catch (Exception e) {
                 LOG.info("SimpleChannelHandler.newInstance  has error:" + name, e);
             }
         }
-
-        if (this.protocolType.equalsIgnoreCase(ConfigConstants.UDP_PROTOCOL)) {
-            cp.addLast("execution", executionHandler);
-        }
-
-        return cp;
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index baef74b..e433716 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -22,6 +22,12 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
 import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -52,15 +58,6 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,7 +68,7 @@ import com.google.common.base.Splitter;
  * Server message handler
  *
  */
-public class ServerMessageHandler extends SimpleChannelHandler {
+public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
     private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
 
     private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
@@ -120,8 +117,6 @@ public class ServerMessageHandler extends SimpleChannelHandler {
 
     private String defaultMXAttr = "m=3";
 
-    private final ChannelBuffer heartbeatBuffer;
-
     private final String protocolType;
 
 
@@ -148,7 +143,6 @@ public class ServerMessageHandler extends SimpleChannelHandler {
 
         this.filterEmptyMsg = filterEmptyMsg;
         this.isCompressed = isCompressed;
-        this.heartbeatBuffer = ChannelBuffers.wrappedBuffer(new byte[]{0, 0, 0, 1, 1});
         this.maxConnections = maxCons;
         this.protocolType = protocolType;
         if (source instanceof SimpleTcpSource) {
@@ -166,7 +160,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
 
     private String getRemoteIp(Channel channel, SocketAddress remoteAddress) {
         String strRemoteIp = DEFAULT_REMOTE_IP_VALUE;
-        SocketAddress remoteSocketAddress = channel.getRemoteAddress();
+        SocketAddress remoteSocketAddress = channel.remoteAddress();
         if (remoteSocketAddress == null) {
             remoteSocketAddress = remoteAddress;
         }
@@ -242,21 +236,28 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         if (allChannels.size() - 1 >= maxConnections) {
             logger.warn("refuse to connect , and connections=" + (allChannels.size() - 1)
                     + ", maxConnections="
-                    + maxConnections + ",channel is " + e.getChannel());
-            e.getChannel().disconnect();
-            e.getChannel().close();
+                    + maxConnections + ",channel is " + ctx.channel());
+            ctx.channel().disconnect();
+            ctx.channel().close();
         }
-        if (!checkBlackIp(e.getChannel())) {
+        if (!checkBlackIp(ctx.channel())) {
             logger.info("connections={},maxConnections={}", allChannels.size() - 1, maxConnections);
-            allChannels.add(e.getChannel());
-            super.channelOpen(ctx, e);
+            allChannels.add(ctx.channel());
+            ctx.fireChannelActive();
         }
     }
 
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        logger.error("channel inactive {}", ctx.channel());
+        ctx.fireChannelInactive();
+        allChannels.remove(ctx.channel());
+    }
+
     private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
             Map<String, String> attrMap, AtomicReference<String> topicInfo) {
         String groupId = message.getGroupId();
@@ -510,7 +511,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                         backBody = new byte[]{50};
                     }
                     int backTotalLen = 1 + 4 + backBody.length + 4 + backAttr.length;
-                    ChannelBuffer buffer = ChannelBuffers.buffer(4 + backTotalLen);
+                    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
                     buffer.writeInt(backTotalLen);
                     buffer.writeByte(msgType.getValue());
                     buffer.writeInt(backBody.length);
@@ -518,13 +519,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                     buffer.writeInt(backAttr.length);
                     buffer.writeBytes(backAttr);
                     if (remoteChannel.isWritable()) {
-                        remoteChannel.write(buffer, remoteSocketAddress);
+                        remoteChannel.write(buffer);
                     } else {
                         String backAttrStr = new String(backAttr, StandardCharsets.UTF_8);
                         logger.warn(
                                 "the send buffer1 is full, so disconnect it!please check remote client"
                                         + "; Connection info:"
                                         + remoteChannel + ";attr is " + backAttrStr);
+                        buffer.release();
                         throw new Exception(new Throwable(
                                 "the send buffer1 is full, so disconnect it!please check remote client"
                                         +
@@ -543,7 +545,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                     binTotalLen += backattrs.length();
                 }
 
-                ChannelBuffer binBuffer = ChannelBuffers.buffer(4 + binTotalLen);
+                ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
                 binBuffer.writeInt(binTotalLen);
                 binBuffer.writeByte(msgType.getValue());
 
@@ -564,8 +566,9 @@ public class ServerMessageHandler extends SimpleChannelHandler {
 
                 binBuffer.writeShort(0xee01);
                 if (remoteChannel.isWritable()) {
-                    remoteChannel.write(binBuffer, remoteSocketAddress);
+                    remoteChannel.write(binBuffer);
                 } else {
+                    binBuffer.release();
                     logger.warn(
                             "the send buffer2 is full, so disconnect it!please check remote client"
                                     + "; Connection info:" + remoteChannel + ";attr is "
@@ -579,129 +582,122 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         logger.debug("message received");
-        if (e == null) {
-            logger.error("get null messageevent, just skip");
-            this.addMetric(false, 0, null);
-            return;
-        }
-        ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
-        String strRemoteIP = getRemoteIp(e.getChannel(), e.getRemoteAddress());
-        SocketAddress remoteSocketAddress = e.getRemoteAddress();
-        int len = cb.readableBytes();
-        if (len == 0 && this.filterEmptyMsg) {
-            logger.warn("skip empty msg.");
-            cb.clear();
+        if (msg == null) {
+            logger.error("get null msg, just skip");
             this.addMetric(false, 0, null);
             return;
         }
-
-        Channel remoteChannel = e.getChannel();
-        Map<String, Object> resultMap = null;
+        ByteBuf cb = (ByteBuf) msg;
         try {
-            resultMap = serviceDecoder.extractData(cb, remoteChannel, e);
-        } catch (MessageIDException ex) {
-            logger.error("MessageIDException ex = {}", ex);
-            this.addMetric(false, 0, null);
-            throw new IOException(ex.getCause());
-        }
+            Channel remoteChannel = ctx.channel();
+            String strRemoteIP = getRemoteIp(remoteChannel);
+            int len = cb.readableBytes();
+            if (len == 0 && this.filterEmptyMsg) {
+                logger.warn("skip empty msg.");
+                this.addMetric(false, 0, null);
+                return;
+            }
 
-        if (resultMap == null) {
-            logger.info("result is null");
-            this.addMetric(false, 0, null);
-            return;
-        }
+            Map<String, Object> resultMap = null;
+            try {
+                resultMap = serviceDecoder.extractData(cb, remoteChannel);
+            } catch (MessageIDException ex) {
+                logger.error("MessageIDException ex = {}", ex);
+                this.addMetric(false, 0, null);
+                throw new IOException(ex.getCause());
+            }
 
-        MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
-        if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
-            remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
-            this.addMetric(false, 0, null);
-            return;
-        }
+            if (resultMap == null) {
+                logger.info("result is null");
+                this.addMetric(false, 0, null);
+                return;
+            }
 
-        if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-            this.addMetric(false, 0, null);
-            return;
-        }
+            MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
+            if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+                ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
+                heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+                remoteChannel.write(heartbeatBuffer);
+                this.addMetric(false, 0, null);
+                return;
+            }
 
-        Map<String, String> commonAttrMap =
-                (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
-        if (commonAttrMap == null) {
-            commonAttrMap = new HashMap<String, String>();
-        }
+            if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+                this.addMetric(false, 0, null);
+                return;
+            }
 
-        List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
-        if (msgList != null
-                && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
-                && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-            Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
-                    new HashMap<String, HashMap<String, List<ProxyMessage>>>(
-                            msgList.size());
-
-            updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
-
-            formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
-
-        } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
-            Map<String, String> headers = new HashMap<String, String>();
-            headers.put("msgtype", "filestatus");
-            headers.put(ConfigConstants.FILE_CHECK_DATA,
-                    "true");
-            for (ProxyMessage message : msgList) {
-                byte[] body = message.getData();
-                Event event = EventBuilder.withBody(body, headers);
-                try {
-                    processor.processEvent(event);
-                    this.addMetric(true, body.length, event);
-                } catch (Throwable ex) {
-                    logger.error("Error writing to controller,data will discard.", ex);
-                    this.addMetric(false, body.length, event);
-                    throw new ChannelException(
-                            "Process Controller Event error can't write event to channel.");
-                }
+            Map<String, String> commonAttrMap =
+                    (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+            if (commonAttrMap == null) {
+                commonAttrMap = new HashMap<String, String>();
             }
-        } else if (msgList != null && commonAttrMap
-                .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-            logger.info("i am in MINUTE_CHECK_DATA");
-            Map<String, String> headers = new HashMap<String, String>();
-            headers.put("msgtype", "measure");
-            headers.put(ConfigConstants.FILE_CHECK_DATA,
-                    "true");
-            for (ProxyMessage message : msgList) {
-                byte[] body = message.getData();
-                Event event = EventBuilder.withBody(body, headers);
-                try {
-                    processor.processEvent(event);
-                    this.addMetric(true, body.length, event);
-                } catch (Throwable ex) {
-                    logger.error("Error writing to controller,data will discard.", ex);
-                    this.addMetric(false, body.length, event);
-                    throw new ChannelException(
-                            "Process Controller Event error can't write event to channel.");
+
+            List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
+            if (msgList != null
+                    && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
+                    && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+                Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
+                        new HashMap<String, HashMap<String, List<ProxyMessage>>>(
+                                msgList.size());
+
+                updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
+
+                formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
+
+            } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
+                Map<String, String> headers = new HashMap<String, String>();
+                headers.put("msgtype", "filestatus");
+                headers.put(ConfigConstants.FILE_CHECK_DATA,
+                        "true");
+                for (ProxyMessage message : msgList) {
+                    byte[] body = message.getData();
+                    Event event = EventBuilder.withBody(body, headers);
+                    try {
+                        processor.processEvent(event);
+                        this.addMetric(true, body.length, event);
+                    } catch (Throwable ex) {
+                        logger.error("Error writing to controller,data will discard.", ex);
+                        this.addMetric(false, body.length, event);
+                        throw new ChannelException(
+                                "Process Controller Event error can't write event to channel.");
+                    }
+                }
+            } else if (msgList != null && commonAttrMap
+                    .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+                logger.info("i am in MINUTE_CHECK_DATA");
+                Map<String, String> headers = new HashMap<String, String>();
+                headers.put("msgtype", "measure");
+                headers.put(ConfigConstants.FILE_CHECK_DATA,
+                        "true");
+                for (ProxyMessage message : msgList) {
+                    byte[] body = message.getData();
+                    Event event = EventBuilder.withBody(body, headers);
+                    try {
+                        processor.processEvent(event);
+                        this.addMetric(true, body.length, event);
+                    } catch (Throwable ex) {
+                        logger.error("Error writing to controller,data will discard.", ex);
+                        this.addMetric(false, body.length, event);
+                        throw new ChannelException(
+                                "Process Controller Event error can't write event to channel.");
+                    }
                 }
             }
+            SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
+            responsePackage(commonAttrMap, resultMap, remoteChannel, remoteSocketAddress, msgType);
+        } finally {
+            cb.release();
         }
-        responsePackage(commonAttrMap, resultMap, remoteChannel, remoteSocketAddress, msgType);
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-        logger.error("exception caught", e.getCause());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        logger.error("exception caught cause = {}", cause);
         monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
-    }
-
-    @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        logger.error("channel closed {}", ctx.getChannel());
-        super.channelClosed(ctx, e);
-        try {
-            e.getChannel().disconnect();
-            e.getChannel().close();
-        } catch (Exception ex) {
-            //
-        }
-        allChannels.remove(e.getChannel());
+        ctx.fireExceptionCaught(cause);
     }
 
     /**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
index 162d614..ba7778b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
@@ -17,15 +17,10 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import java.util.Map;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.MessageEvent;
-
-/**
- * decoder interface definition
- */
 public interface ServiceDecoder {
 
     int HEAD_LENGTH = 4;
@@ -37,5 +32,5 @@ public interface ServiceDecoder {
      * @return
      * @throws
      */
-    Map<String, Object> extractData(ChannelBuffer cb, Channel channel, MessageEvent e) throws Exception;
+    Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception;
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index f00f19f..caae85e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -22,6 +22,12 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
 import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -50,15 +56,6 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.Constants;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +66,7 @@ import com.google.common.base.Splitter;
  * Server message handler
  *
  */
-public class SimpleMessageHandler extends SimpleChannelHandler {
+public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger logger = LoggerFactory.getLogger(SimpleMessageHandler.class);
 
@@ -105,7 +102,6 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
     private final ServiceDecoder serviceProcessor;
     private final String defaultTopic;
     private String defaultMXAttr = "m=3";
-    private final ChannelBuffer heartbeatBuffer;
     private final String protocolType;
     private final DataProxyMetricItemSet metricItemSet;
 
@@ -138,7 +134,6 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
 
         this.filterEmptyMsg = filterEmptyMsg;
         this.isCompressed = isCompressed;
-        this.heartbeatBuffer = ChannelBuffers.wrappedBuffer(new byte[]{0, 0, 0, 1, 1});
         this.maxConnections = maxCons;
         this.protocolType = protocolType;
         if (source instanceof SimpleTcpSource) {
@@ -150,7 +145,7 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
 
     private String getRemoteIp(Channel channel) {
         String strRemoteIp = DEFAULT_REMOTE_IP_VALUE;
-        SocketAddress remoteSocketAddress = channel.getRemoteAddress();
+        SocketAddress remoteSocketAddress = channel.remoteAddress();
         if (null != remoteSocketAddress) {
             strRemoteIp = remoteSocketAddress.toString();
             try {
@@ -223,18 +218,18 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         if (allChannels.size() - 1 >= maxConnections) {
             logger.warn("refuse to connect , and connections=" + (allChannels.size() - 1)
                     + ", maxConnections="
-                    + maxConnections + ",channel is " + e.getChannel());
-            e.getChannel().disconnect();
-            e.getChannel().close();
+                    + maxConnections + ",channel is " + ctx.channel());
+            ctx.channel().disconnect();
+            ctx.channel().close();
         }
-        if (!checkBlackIp(e.getChannel())) {
+        if (!checkBlackIp(ctx.channel())) {
             logger.info("connections={},maxConnections={}", allChannels.size() - 1, maxConnections);
-            allChannels.add(e.getChannel());
-            super.channelOpen(ctx, e);
+            allChannels.add(ctx.channel());
+            ctx.fireChannelActive();
         }
     }
 
@@ -507,7 +502,7 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
                         backBody = new byte[]{50};
                     }
                     int backTotalLen = 1 + 4 + backBody.length + 4 + backAttr.length;
-                    ChannelBuffer buffer = ChannelBuffers.buffer(4 + backTotalLen);
+                    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
                     buffer.writeInt(backTotalLen);
                     buffer.writeByte(msgType.getValue());
                     buffer.writeInt(backBody.length);
@@ -515,13 +510,14 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
                     buffer.writeInt(backAttr.length);
                     buffer.writeBytes(backAttr);
                     if (remoteChannel.isWritable()) {
-                        remoteChannel.write(buffer, remoteSocketAddress);
+                        remoteChannel.write(buffer);
                     } else {
                         String backAttrStr = new String(backAttr, StandardCharsets.UTF_8);
                         logger.warn(
                                 "the send buffer1 is full, so disconnect it!please check remote client"
                                         + "; Connection info:"
                                         + remoteChannel + ";attr is " + backAttrStr);
+                        buffer.release();
                         throw new Exception(new Throwable(
                                 "the send buffer1 is full, so disconnect it!please check remote client"
                                         +
@@ -540,7 +536,7 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
                     binTotalLen += backattrs.length();
                 }
 
-                ChannelBuffer binBuffer = ChannelBuffers.buffer(4 + binTotalLen);
+                ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
                 binBuffer.writeInt(binTotalLen);
                 binBuffer.writeByte(msgType.getValue());
 
@@ -561,12 +557,13 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
 
                 binBuffer.writeShort(0xee01);
                 if (remoteChannel.isWritable()) {
-                    remoteChannel.write(binBuffer, remoteSocketAddress);
+                    remoteChannel.write(binBuffer);
                 } else {
                     logger.warn(
                             "the send buffer2 is full, so disconnect it!please check remote client"
                                     + "; Connection info:" + remoteChannel + ";attr is "
                                     + backattrs);
+                    binBuffer.release();
                     throw new Exception(new Throwable(
                             "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
                                     + remoteChannel + ";attr is " + backattrs));
@@ -576,121 +573,122 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent msgEvent) throws Exception {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         logger.info("message received");
-        if (msgEvent == null) {
+        if (msg == null) {
             logger.error("get null messageevent, just skip");
             this.addMetric(false, 0, null);
             return;
         }
-        ChannelBuffer cb = ((ChannelBuffer) msgEvent.getMessage());
-        String strRemoteIP = getRemoteIp(msgEvent.getChannel());
-        SocketAddress remoteSocketAddress = msgEvent.getRemoteAddress();
-        int len = cb.readableBytes();
-        if (len == 0 && this.filterEmptyMsg) {
-            logger.warn("skip empty msg.");
-            cb.clear();
-            this.addMetric(false, 0, null);
-            return;
-        }
-
-        Channel remoteChannel = msgEvent.getChannel();
-        Map<String, Object> resultMap = null;
+        Channel remoteChannel = ctx.channel();
+        ByteBuf cb = (ByteBuf) msg;
         try {
-            resultMap = serviceProcessor.extractData(cb, remoteChannel, msgEvent);
-        } catch (MessageIDException ex) {
-            this.addMetric(false, 0, null);
-            throw new IOException(ex.getCause());
-        }
+            int len = cb.readableBytes();
+            if (len == 0 && this.filterEmptyMsg) {
+                logger.warn("skip empty msg.");
+                cb.clear();
+                this.addMetric(false, 0, null);
+                return;
+            }
+            Map<String, Object> resultMap = null;
+            try {
+                resultMap = serviceProcessor.extractData(cb, remoteChannel);
+            } catch (MessageIDException ex) {
+                this.addMetric(false, 0, null);
+                throw new IOException(ex.getCause());
+            }
 
-        if (resultMap == null) {
-            logger.info("result is null");
-            this.addMetric(false, 0, null);
-            return;
-        }
+            if (resultMap == null) {
+                logger.info("result is null");
+                this.addMetric(false, 0, null);
+                return;
+            }
 
-        MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
-        if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
-            remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
-            this.addMetric(false, 0, null);
-            return;
-        }
+            MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
+            if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+                ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
+                heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+                remoteChannel.write(heartbeatBuffer);
+                this.addMetric(false, 0, null);
+                return;
+            }
 
-        if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-//            ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
-//            remoteChannel.write(binBuffer, remoteSocketAddress);
-            this.addMetric(false, 0, null);
-            return;
-        }
+            if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+                this.addMetric(false, 0, null);
+                return;
+            }
 
-        Map<String, String> commonAttrMap = (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
-        if (commonAttrMap == null) {
-            commonAttrMap = new HashMap<String, String>();
-        }
+            Map<String, String> commonAttrMap = (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+            if (commonAttrMap == null) {
+                commonAttrMap = new HashMap<String, String>();
+            }
 
-        List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
-        if (msgList != null
-                && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
-                && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-            Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new HashMap<>(msgList.size());
-
-            updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
-
-            formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
-
-        } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
-//            logger.info("i am in FILE_CHECK_DATA ");
-            Map<String, String> headers = new HashMap<String, String>();
-            headers.put("msgtype", "filestatus");
-            headers.put(ConfigConstants.FILE_CHECK_DATA,
-                    "true");
-            for (ProxyMessage message : msgList) {
-                byte[] body = message.getData();
-//                logger.info("data:"+new String(body));
-                Event event = EventBuilder.withBody(body, headers);
-                try {
-                    processor.processEvent(event);
-                    this.addMetric(true, body.length, event);
-                } catch (Throwable ex) {
-                    logger.error("Error writing to controller,data will discard.", ex);
-                    this.addMetric(false, body.length, event);
-                    throw new ChannelException(
-                            "Process Controller Event error can't write event to channel.");
+            List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
+            if (msgList != null
+                    && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
+                    && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+                Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new HashMap<>(msgList.size());
+                String strRemoteIP = getRemoteIp(remoteChannel);
+                updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
+
+                formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
+
+            } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
+                Map<String, String> headers = new HashMap<String, String>();
+                headers.put("msgtype", "filestatus");
+                headers.put(ConfigConstants.FILE_CHECK_DATA,
+                        "true");
+                for (ProxyMessage message : msgList) {
+                    byte[] body = message.getData();
+                    Event event = EventBuilder.withBody(body, headers);
+                    try {
+                        processor.processEvent(event);
+                        this.addMetric(true, body.length, event);
+                    } catch (Throwable ex) {
+                        logger.error("Error writing to controller,data will discard.", ex);
+                        this.addMetric(false, body.length, event);
+                        throw new ChannelException(
+                                "Process Controller Event error can't write event to channel.");
+                    }
                 }
-            }
-        } else if (msgList != null && commonAttrMap
-                .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-            logger.info("i am in MINUTE_CHECK_DATA");
-            Map<String, String> headers = new HashMap<String, String>();
-            headers.put("msgtype", "measure");
-            headers.put(ConfigConstants.FILE_CHECK_DATA,
-                    "true");
-            for (ProxyMessage message : msgList) {
-                byte[] body = message.getData();
-//                logger.info("data:"+new String(body));
-                Event event = EventBuilder.withBody(body, headers);
-                try {
-                    processor.processEvent(event);
-                    this.addMetric(true, body.length, event);
-                } catch (Throwable ex) {
-                    logger.error("Error writing to controller,data will discard.", ex);
-                    this.addMetric(false, body.length, event);
-                    throw new ChannelException(
-                            "Process Controller Event error can't write event to channel.");
+            } else if (msgList != null && commonAttrMap
+                    .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+                logger.info("i am in MINUTE_CHECK_DATA");
+                Map<String, String> headers = new HashMap<String, String>();
+                headers.put("msgtype", "measure");
+                headers.put(ConfigConstants.FILE_CHECK_DATA,
+                        "true");
+                for (ProxyMessage message : msgList) {
+                    byte[] body = message.getData();
+                    Event event = EventBuilder.withBody(body, headers);
+                    try {
+                        processor.processEvent(event);
+                        this.addMetric(true, body.length, event);
+                    } catch (Throwable ex) {
+                        logger.error("Error writing to controller,data will discard.", ex);
+                        this.addMetric(false, body.length, event);
+                        throw new ChannelException(
+                                "Process Controller Event error can't write event to channel.");
+                    }
                 }
             }
+            SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
+            responsePackage(commonAttrMap, resultMap, remoteChannel, remoteSocketAddress, msgType);
+        } finally {
+            cb.release();
         }
-        responsePackage(commonAttrMap, resultMap, remoteChannel, remoteSocketAddress, msgType);
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-        logger.error("exception caught", e.getCause());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        logger.error("exception caught cause = {}", cause);
+        ctx.fireExceptionCaught(cause);
     }
 
     @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        logger.error("channel closed {}", ctx.getChannel());
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        logger.error("channel inactive {}", ctx.channel());
+        ctx.fireChannelInactive();
     }
 
     /**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index d861c8c..dd36012 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -18,6 +18,13 @@
 package org.apache.inlong.dataproxy.source;
 
 import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -28,24 +35,14 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
-import java.util.concurrent.Executors;
 import org.apache.commons.io.IOUtils;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.inlong.commons.config.metrics.MetricRegister;
-import org.apache.inlong.dataproxy.base.NamedThreadFactory;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.ThreadRenamingRunnable;
+import org.apache.inlong.dataproxy.utils.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,45 +63,31 @@ public class SimpleTcpSource extends BaseSource
 
     private static int TRAFFIC_CLASS_TYPE_96 = 96;
 
-    private static int BUFFER_SIZE_MUST_THAN = 0;
-
     private static int HIGH_WATER_MARK_DEFAULT_VALUE = 64 * 1024;
 
-    private static int RECEIVE_BUFFER_DEFAULT_SIZE = 64 * 1024;
-
-    private static int SEND_BUFFER_DEFAULT_SIZE = 64 * 1024;
-
-    private static int RECEIVE_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
-
-    private static int SEND_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
-
     private static int DEFAULT_SLEEP_TIME_MS = 5 * 1000;
 
     private static long propsLastModified;
 
     private CheckBlackListThread checkBlackListThread;
 
-    private int maxThreads = 32;
-
     private boolean tcpNoDelay = true;
 
     private boolean keepAlive = true;
 
-    private int receiveBufferSize;
-
     private int highWaterMark;
 
-    private int sendBufferSize;
-
     private int trafficClass;
 
     protected String topic;
 
+    private ServerBootstrap bootstrap;
+
     private DataProxyMetricItemSet metricItemSet;
 
     public SimpleTcpSource() {
         super();
-        allChannels = new DefaultChannelGroup();
+
     }
 
     /**
@@ -118,7 +101,7 @@ public class SimpleTcpSource extends BaseSource
             while (it.hasNext()) {
                 Channel channel = it.next();
                 String strRemoteIP = null;
-                SocketAddress remoteSocketAddress = channel.getRemoteAddress();
+                SocketAddress remoteSocketAddress = channel.remoteAddress();
                 if (null != remoteSocketAddress) {
                     strRemoteIP = remoteSocketAddress.toString();
                     try {
@@ -203,43 +186,48 @@ public class SimpleTcpSource extends BaseSource
         MetricRegister.register(metricItemSet);
         checkBlackListThread = new CheckBlackListThread();
         checkBlackListThread.start();
-        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
-        ChannelFactory factory =
-                new NioServerSocketChannelFactory(
-                        Executors.newCachedThreadPool(
-                                new NamedThreadFactory("tcpSource-nettyBoss-threadGroup")), 1,
-                        Executors.newCachedThreadPool(
-                                new NamedThreadFactory("tcpSource-nettyWorker-threadGroup")), maxThreads);
+//        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+
         logger.info("Set max workers : {} ;", maxThreads);
 
-        serverBootstrap = new ServerBootstrap(factory);
-        serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
-        serverBootstrap.setOption("child.keepAlive", keepAlive);
-        serverBootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
-        serverBootstrap.setOption("child.sendBufferSize", sendBufferSize);
-        serverBootstrap.setOption("child.trafficClass", trafficClass);
-        serverBootstrap.setOption("child.writeBufferHighWaterMark", highWaterMark);
+        acceptorThreadFactory = new DefaultThreadFactory("tcpSource-nettyBoss-threadGroup");
+
+        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
+                acceptorThreads, false, acceptorThreadFactory);
+
+        this.workerGroup = EventLoopUtil
+                .newEventLoopGroup(maxThreads, enableBusyWait,
+                        new DefaultThreadFactory("tcpSource-nettyWorker-threadGroup"));
+
+        bootstrap = new ServerBootstrap();
+
+        bootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
+        bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
+        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
+        bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize);
+        bootstrap.childOption(ChannelOption.SO_SNDBUF, sendBufferSize);
+//        serverBootstrap.childOption("child.trafficClass", trafficClass);
+        bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, highWaterMark);
+        bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
+        EventLoopUtil.enableTriggeredMode(bootstrap);
+        bootstrap.group(acceptorGroup, workerGroup);
         logger.info("load msgFactory=" + msgFactoryName
                 + " and serviceDecoderName=" + serviceDecoderName);
 
-        ChannelPipelineFactory fac = this.getChannelPiplineFactory();
-        serverBootstrap.setPipelineFactory(fac);
-
+        ChannelInitializer fac = this.getChannelInitializerFactory();
+        bootstrap.childHandler(fac);
         try {
             if (host == null) {
-                nettyChannel = ((ServerBootstrap)serverBootstrap).bind(new InetSocketAddress(port));
+                channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
             } else {
-                nettyChannel = ((ServerBootstrap)serverBootstrap).bind(new InetSocketAddress(host, port));
+                channelFuture = bootstrap.bind(new InetSocketAddress(host, port)).sync();
             }
         } catch (Exception e) {
-            logger.error("Simple TCP Source error bind host {} port {},program will exit!", host, port);
+            logger.error("Simple TCP Source error bind host {} port {},program will exit! e = {}",
+                    host, port, e);
             System.exit(-1);
         }
-
-        allChannels.add(nettyChannel);
-
         logger.info("Simple TCP Source started at host {}, port {}", host, port);
-
     }
 
     @Override
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index 7c09c30..5c17451 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -19,19 +19,15 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.nio.NioDatagramChannel;
 import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
-import org.apache.inlong.dataproxy.base.NamedThreadFactory;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.ThreadRenamingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +40,8 @@ public class SimpleUdpSource
 
     private static int UPD_BUFFER_DEFAULT_SIZE = 8192;
 
+    private Bootstrap bootstrap;
+
     public SimpleUdpSource() {
         super();
     }
@@ -51,24 +49,20 @@ public class SimpleUdpSource
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     public void startSource() {
-        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
         // setup Netty server
-        serverBootstrap = new ConnectionlessBootstrap(
-                new NioDatagramChannelFactory(Executors
-                        .newCachedThreadPool(new NamedThreadFactory("udpSource-nettyWorker"
-                                + "-threadGroup")), maxThreads));
+        bootstrap = new Bootstrap();
         logger.info("Set max workers : {} ;",maxThreads);
-        serverBootstrap.setOption("receiveBufferSizePredictorFactory",
-                new FixedReceiveBufferSizePredictorFactory(UPD_BUFFER_DEFAULT_SIZE));
-        ChannelPipelineFactory fac = this.getChannelPiplineFactory();
-        serverBootstrap.setPipelineFactory(fac);
+        bootstrap.channel(NioDatagramChannel.class);
+        bootstrap.option(ChannelOption.SO_RCVBUF, receiveBufferSize);
+        bootstrap.option(ChannelOption.SO_SNDBUF, sendBufferSize);
+        ChannelInitializer fac = this.getChannelInitializerFactory();
+        bootstrap.handler(fac);
         try {
             if (host == null) {
-                nettyChannel =
-                        ((ConnectionlessBootstrap)serverBootstrap).bind(new InetSocketAddress(port));
+                channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
             } else {
-                nettyChannel =
-                        ((ConnectionlessBootstrap)serverBootstrap).bind(new InetSocketAddress(host, port));
+
+                channelFuture = bootstrap.bind(new InetSocketAddress(host, port)).sync();
             }
         } catch (Exception e) {
             logger.error("Simple UDP Source error bind host {} port {}, program will exit!",
@@ -76,7 +70,6 @@ public class SimpleUdpSource
             System.exit(-1);
             //throw new FlumeException(e.getMessage());
         }
-        allChannels.add(nettyChannel);
         logger.info("Simple UDP Source started at host {}, port {}", host, port);
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/EventLoopUtil.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/EventLoopUtil.java
new file mode 100644
index 0000000..679a30b
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/EventLoopUtil.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollMode;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadFactory;
+
+public class EventLoopUtil {
+
+    public EventLoopUtil() {
+    }
+
+    public static EventLoopGroup newEventLoopGroup(int nThreads, boolean enableBusyWait, ThreadFactory threadFactory) {
+        if (!Epoll.isAvailable()) {
+            return new NioEventLoopGroup(nThreads, threadFactory);
+        } else if (!enableBusyWait) {
+            return new EpollEventLoopGroup(nThreads, threadFactory);
+        } else {
+            EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(nThreads, threadFactory, () -> {
+                return (selectSupplier, hasTasks) -> {
+                    return -3;
+                };
+            });
+            return eventLoopGroup;
+        }
+    }
+
+    public static Class<? extends SocketChannel> getClientSocketChannelClass(EventLoopGroup eventLoopGroup) {
+        return eventLoopGroup instanceof EpollEventLoopGroup
+                ? EpollSocketChannel.class : NioSocketChannel.class;
+    }
+
+    public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
+        return eventLoopGroup instanceof EpollEventLoopGroup
+                ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
+    }
+
+    public static Class<? extends DatagramChannel> getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
+        return eventLoopGroup instanceof EpollEventLoopGroup
+                ? EpollDatagramChannel.class : NioDatagramChannel.class;
+    }
+
+    public static void enableTriggeredMode(ServerBootstrap bootstrap) {
+        if (Epoll.isAvailable()) {
+            bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
+        }
+
+    }
+
+    public static CompletableFuture<Void> shutdownGracefully(EventLoopGroup eventLoopGroup) {
+        return toCompletableFutureVoid(eventLoopGroup.shutdownGracefully());
+    }
+
+    public static CompletableFuture<Void> toCompletableFutureVoid(Future<?> future) {
+        Objects.requireNonNull(future, "future cannot be null");
+
+        CompletableFuture<Void> adapter = new CompletableFuture<>();
+        if (future.isDone()) {
+            if (future.isSuccess()) {
+                adapter.complete(null);
+            } else {
+                adapter.completeExceptionally(future.cause());
+            }
+        } else {
+            future.addListener(f -> {
+                if (f.isSuccess()) {
+                    adapter.complete(null);
+                } else {
+                    adapter.completeExceptionally(f.cause());
+                }
+            });
+        }
+        return adapter;
+    }
+}
diff --git a/inlong-dataproxy/pom.xml b/inlong-dataproxy/pom.xml
index 78feae2..3d0e413 100644
--- a/inlong-dataproxy/pom.xml
+++ b/inlong-dataproxy/pom.xml
@@ -42,7 +42,7 @@
         <maven.compiler.target>8</maven.compiler.target>
         <flume.version>1.9.0</flume.version>
         <plugin.assembly.version>3.2.0</plugin.assembly.version>
-        <netty.version>3.8.0.Final</netty.version>
+        <netty.version>4.1.72.Final</netty.version>
         <codec.version>1.15</codec.version>
         <servlet.version>2.5-20110124</servlet.version>
         <inlong-common.version>1.3.4</inlong-common.version>
@@ -52,6 +52,8 @@
         <guava.version>19.0</guava.version>
         <powermock.version>2.0.9</powermock.version>
         <simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
+        <log4j2.version>2.17.1</log4j2.version>
+        <slf4j.version>1.7.36</slf4j.version>
     </properties>
 
     <dependencies>
@@ -64,6 +66,12 @@
             <groupId>org.apache.flume</groupId>
             <artifactId>flume-ng-node</artifactId>
             <version>${flume.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.flume</groupId>
@@ -77,7 +85,27 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+            <artifactId>netty-transport</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
             <version>${netty.version}</version>
         </dependency>
         <dependency>
@@ -94,6 +122,12 @@
             <groupId>org.apache.inlong</groupId>
             <artifactId>tubemq-client</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
@@ -144,6 +178,26 @@
             <version>${powermock.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j2.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j2.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j2.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file