You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/02 01:20:44 UTC

[incubator-inlong] branch master updated: [INLONG-2781][DataProxy] Update netty version to 4.x (#2782)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d65c1f  [INLONG-2781][DataProxy] Update netty version to 4.x (#2782)
0d65c1f is described below

commit 0d65c1f8019347e32d01c04383179f847bc7e0ea
Author: baomingyu <ba...@163.com>
AuthorDate: Wed Mar 2 09:20:37 2022 +0800

    [INLONG-2781][DataProxy] Update netty version to 4.x (#2782)
---
 inlong-dataproxy/conf/log4j2.xml                   |  18 ++
 .../apache/inlong/dataproxy/sink/PulsarSink.java   |   2 +-
 .../inlong/dataproxy/source/SourceContext.java     |   2 +-
 .../source/tcp/InlongTcpChannelHandler.java        | 194 +++++++++++----------
 .../tcp/InlongTcpChannelPipelineFactory.java       |  60 +++----
 .../dataproxy/source/tcp/InlongTcpSource.java      | 180 +++++--------------
 6 files changed, 184 insertions(+), 272 deletions(-)

diff --git a/inlong-dataproxy/conf/log4j2.xml b/inlong-dataproxy/conf/log4j2.xml
index 34d2165..d4f70d4 100644
--- a/inlong-dataproxy/conf/log4j2.xml
+++ b/inlong-dataproxy/conf/log4j2.xml
@@ -27,6 +27,9 @@
         <property name="info_fileName">${basePath}/info.log</property>
         <property name="info_filePattern">${basePath}/info-%d{yyyy-MM-dd}-%i.log.gz</property>
         <property name="info_max">10</property>
+        <property name="debug_fileName">${basePath}/debug.log</property>
+        <property name="debug_filePattern">${basePath}/debug-%d{yyyy-MM-dd}-%i.log.gz</property>
+        <property name="debug_max">10</property>
         <property name="warn_fileName">${basePath}/warn.log</property>
         <property name="warn_filePattern">${basePath}/warn-%d{yyyy-MM-dd}-%i.log.gz</property>
         <property name="warn_max">10</property>
@@ -46,6 +49,17 @@
             <PatternLayout pattern="${log_pattern}"/>
         </Console>
 
+        <RollingFile name="DebugFile" fileName="${debug_fileName}" filePattern="${debug_filePattern}">
+            <PatternLayout pattern="${log_pattern}"/>
+            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+            <DefaultRolloverStrategy max="${debug_max}" />
+            <Filters>
+                <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL"/>
+                <ThresholdFilter level="INFO" onMatch="DENY" onMismatch="NEUTRAL"/>
+                <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/>
+            </Filters>
+        </RollingFile>
+
         <RollingFile name="InfoFile" fileName="${info_fileName}" filePattern="${info_filePattern}">
             <PatternLayout pattern="${log_pattern}"/>
             <SizeBasedTriggeringPolicy size="${every_file_size}"/>
@@ -104,8 +118,12 @@
         <logger name="org.apache.inlong.common.monitor.MonitorIndex" level="info" additivity="false">
             <appender-ref ref="IndexFile"/>
         </logger>
+        <logger name="org.apache.pulsar" level="info" additivity="false">
+            <appender-ref ref="InfoFile"/>
+        </logger>
         <root level="${output_log_level}">
             <appender-ref ref="Console"/>
+            <appender-ref ref="DebugFile"/>
             <appender-ref ref="InfoFile"/>
             <appender-ref ref="WarnFile"/>
             <appender-ref ref="ErrorFile"/>
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index e5a094b..965c41e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -24,6 +24,7 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.RateLimiter;
+import io.netty.handler.codec.TooLongFrameException;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -53,7 +54,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
index 58abc2d..38bd7ae 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import io.netty.channel.group.ChannelGroup;
 import java.util.Date;
 import java.util.Map;
 import java.util.Timer;
@@ -31,7 +32,6 @@ import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.jboss.netty.channel.group.ChannelGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
index 56a34f5..5be334d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
@@ -17,6 +17,11 @@
 
 package org.apache.inlong.dataproxy.source.tcp;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,21 +34,14 @@ import org.apache.inlong.sdk.commons.protocol.EventUtils;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * InlongTcpChannelHandler
  */
-public class InlongTcpChannelHandler extends SimpleChannelHandler {
+public class InlongTcpChannelHandler extends ChannelInboundHandlerAdapter {
 
     public static final Logger LOG = LoggerFactory.getLogger(InlongTcpChannelHandler.class);
     public static final int LENGTH_PARAM_OFFSET = 0;
@@ -66,78 +64,64 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
     }
 
     /**
-     * channelOpen
-     * 
-     * @param  ctx
-     * @param  e
-     * @throws Exception
-     */
-    @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        if (sourceContext.getAllChannels().size() - 1 >= sourceContext.getMaxConnections()) {
-            LOG.warn("refuse to connect , and connections=" + (sourceContext.getAllChannels().size() - 1)
-                    + ", maxConnections="
-                    + sourceContext.getMaxConnections() + ",channel is " + e.getChannel());
-            e.getChannel().disconnect();
-            e.getChannel().close();
-        }
-    }
-
-    /**
-     * messageReceived
+     * channelRead
      * 
      * @param  ctx
-     * @param  e
+     * @param  msg
      * @throws Exception
      */
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         LOG.debug("message received");
-        if (e == null) {
-            LOG.error("get null messageevent, just skip");
-            this.addMetric(false, 0, null);
-            return;
-        }
-        ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
-        int readableLength = cb.readableBytes();
-        if (readableLength == 0) {
-            LOG.warn("skip empty msg.");
-            cb.clear();
+        if (msg == null) {
+            LOG.error("get null msg, just skip");
             this.addMetric(false, 0, null);
             return;
         }
-        if (readableLength > LENGTH_PARAM_LENGTH + VERSION_PARAM_LENGTH + sourceContext.getMaxMsgLength()) {
-            this.addMetric(false, 0, null);
-            throw new Exception("err msg, MSG_MAX_LENGTH_BYTES "
-                    + "< readableLength, and readableLength=" + readableLength + ", and MSG_MAX_LENGTH_BYTES="
-                    + sourceContext.getMaxMsgLength());
-        }
-        // save index, reset it if buffer is not satisfied.
-        cb.markReaderIndex();
-        int totalPackLength = cb.readInt();
-        cb.resetReaderIndex();
-        if (readableLength < totalPackLength + LENGTH_PARAM_LENGTH) {
-            // reset index.
-            this.addMetric(false, 0, null);
-            throw new Exception("err msg, channel buffer is not satisfied, and  readableLength="
-                    + readableLength + ", and totalPackLength=" + totalPackLength);
-        }
-
-        // read version
-        int version = cb.readShort();
-        switch (version) {
-            case VERSION_1 :
-                // decode version 1
-                int bodyLength = totalPackLength - VERSION_PARAM_LENGTH;
-                decodeVersion1(cb, bodyLength, e);
-                break;
-            default :
+        ByteBuf cb = (ByteBuf) msg;
+        try {
+            int readableLength = cb.readableBytes();
+            if (readableLength == 0) {
+                LOG.warn("skip empty msg.");
+                cb.clear();
                 this.addMetric(false, 0, null);
-                throw new Exception("err version, unknown version:" + version);
+                return;
+            }
+            if (readableLength > LENGTH_PARAM_LENGTH + VERSION_PARAM_LENGTH + sourceContext.getMaxMsgLength()) {
+                this.addMetric(false, 0, null);
+                throw new Exception("err msg, MSG_MAX_LENGTH_BYTES "
+                        + "< readableLength, and readableLength=" + readableLength + ", and MSG_MAX_LENGTH_BYTES="
+                        + sourceContext.getMaxMsgLength());
+            }
+            // save index, reset it if buffer is not satisfied.
+            cb.markReaderIndex();
+            int totalPackLength = cb.readInt();
+            cb.resetReaderIndex();
+            if (readableLength < totalPackLength + LENGTH_PARAM_LENGTH) {
+                // reset index.
+                this.addMetric(false, 0, null);
+                throw new Exception("err msg, channel buffer is not satisfied, and  readableLength="
+                        + readableLength + ", and totalPackLength=" + totalPackLength);
+            }
+
+            // read version
+            int version = cb.readShort();
+            switch (version) {
+                case VERSION_1 :
+                    // decode version 1
+                    int bodyLength = totalPackLength - VERSION_PARAM_LENGTH;
+                    decodeVersion1(ctx, cb, bodyLength);
+                    break;
+                default :
+                    this.addMetric(false, 0, null);
+                    throw new Exception("err version, unknown version:" + version);
+            }
+        } finally {
+            cb.release();
         }
     }
 
-    private void decodeVersion1(ChannelBuffer cb, int bodyLength, MessageEvent e) throws Exception {
+    private void decodeVersion1(ChannelHandlerContext ctx, ByteBuf cb, int bodyLength) throws Exception {
         // read bytes
         byte[] msgBytes = new byte[bodyLength];
         cb.readBytes(msgBytes);
@@ -157,11 +141,11 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
             } catch (Throwable ex) {
                 LOG.error("Process Controller Event error can't write event to channel.", ex);
                 this.addMetric(false, event.getBody().length, event);
-                this.responsePackage(ResultCode.ERR_REJECT, e);
+                this.responsePackage(ctx, ResultCode.ERR_REJECT);
                 return;
             }
         }
-        this.responsePackage(ResultCode.SUCCUSS, e);
+        this.responsePackage(ctx, ResultCode.SUCCUSS);
     }
 
     /**
@@ -191,12 +175,12 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
 
     /**
      * responsePackage
-     * 
+     *
+     * @param ctx
      * @param  code
-     * @param  e
      * @throws Exception
      */
-    private void responsePackage(ResultCode code, MessageEvent e)
+    private void responsePackage(ChannelHandlerContext ctx, ResultCode code)
             throws Exception {
         ResponseInfo.Builder builder = ResponseInfo.newBuilder();
         builder.setResult(code);
@@ -204,15 +188,15 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
         // encode
         byte[] responseBytes = builder.build().toByteArray();
         //
-        ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(responseBytes);
-
-        Channel remoteChannel = e.getChannel();
+        ByteBuf buffer = Unpooled.wrappedBuffer(responseBytes);
+        Channel remoteChannel = ctx.channel();
         if (remoteChannel.isWritable()) {
-            remoteChannel.write(buffer, e.getRemoteAddress());
+            remoteChannel.write(buffer);
         } else {
             LOG.warn(
                     "the send buffer2 is full, so disconnect it!please check remote client"
                             + "; Connection info:" + remoteChannel);
+            buffer.release();
             throw new Exception(
                     "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
                             + remoteChannel);
@@ -223,41 +207,59 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
      * exceptionCaught
      * 
      * @param  ctx
-     * @param  e
+     * @param  cause
      * @throws Exception
      */
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-        LOG.error("exception caught", e.getCause());
-        super.exceptionCaught(ctx, e);
-        if (e.getChannel() != null) {
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        LOG.error("exception caught cause = {}", cause);
+        ctx.fireExceptionCaught(cause);
+        if (ctx.channel() != null) {
             try {
-                e.getChannel().disconnect();
-                e.getChannel().close();
+                ctx.channel().disconnect();
+                ctx.channel().close();
             } catch (Exception ex) {
                 LOG.error("Close connection error!", ex);
             }
-            sourceContext.getAllChannels().remove(e.getChannel());
+            sourceContext.getAllChannels().remove(ctx.channel());
         }
     }
 
     /**
-     * channelClosed
-     * 
-     * @param  ctx
-     * @param  e
+     * channelInactive
+     *
+     * @param ctx
      * @throws Exception
      */
     @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        LOG.debug("Connection to {} disconnected.", e.getChannel());
-        super.channelClosed(ctx, e);
+    public void channelInactive(ChannelHandlerContext ctx) {
+        LOG.debug("Connection to {} disconnected.", ctx.channel());
+        ctx.fireChannelInactive();
         try {
-            e.getChannel().disconnect();
-            e.getChannel().close();
+            ctx.channel().disconnect();
+            ctx.channel().close();
         } catch (Exception ex) {
-            //
+            LOG.error("channelInactive has exception e = {}", ex);
+        }
+        sourceContext.getAllChannels().remove(ctx.channel());
+
+    }
+
+    /**
+     * channelActive
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        if (sourceContext.getAllChannels().size() - 1 >= sourceContext.getMaxConnections()) {
+            LOG.warn("refuse to connect , and connections="
+                    + (sourceContext.getAllChannels().size() - 1)
+                    + ", maxConnections="
+                    + sourceContext.getMaxConnections() + ",channel is " + ctx.channel());
+            ctx.channel().disconnect();
+            ctx.channel().close();
         }
-        sourceContext.getAllChannels().remove(e.getChannel());
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelPipelineFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelPipelineFactory.java
index 77333a2..30a3bb7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelPipelineFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelPipelineFactory.java
@@ -17,6 +17,11 @@
 
 package org.apache.inlong.dataproxy.source.tcp;
 
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 import java.lang.reflect.Constructor;
 import java.util.concurrent.TimeUnit;
 
@@ -25,14 +30,6 @@ import org.apache.flume.Context;
 import org.apache.flume.conf.Configurable;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.source.SourceContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +38,8 @@ import com.google.common.base.Preconditions;
 /**
  * InlongTcpChannelPipelineFactory
  */
-public class InlongTcpChannelPipelineFactory implements ChannelPipelineFactory, Configurable {
+public class InlongTcpChannelPipelineFactory extends ChannelInitializer<SocketChannel>
+        implements Configurable {
 
     public static final Logger LOG = LoggerFactory.getLogger(InlongTcpChannelPipelineFactory.class);
     public static final int DEFAULT_LENGTH_FIELD_OFFSET = 0;
@@ -52,54 +50,48 @@ public class InlongTcpChannelPipelineFactory implements ChannelPipelineFactory,
     private static final int DEFAULT_READ_IDLE_TIME = 70 * 60 * 1000;
     private SourceContext sourceContext;
     private String messageHandlerName;
-    private Timer timer = new HashedWheelTimer();
+    private String protocolType;
 
     /**
      * get server factory
      *
      * @param sourceContext
      */
-    public InlongTcpChannelPipelineFactory(SourceContext sourceContext) {
+    public InlongTcpChannelPipelineFactory(SourceContext sourceContext, String protocolType) {
         this.sourceContext = sourceContext;
+        this.protocolType = protocolType;
     }
 
     @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline cp = Channels.pipeline();
-        return addMessageHandlersTo(cp);
-    }
+    protected void initChannel(SocketChannel ch) {
 
-    /**
-     * get message handlers
-     * 
-     * @param  cp
-     * @return
-     */
-    @SuppressWarnings("unchecked")
-    public ChannelPipeline addMessageHandlersTo(ChannelPipeline cp) {
-        cp.addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
-                sourceContext.getMaxMsgLength(), DEFAULT_LENGTH_FIELD_OFFSET, DEFAULT_LENGTH_FIELD_LENGTH,
-                DEFAULT_LENGTH_ADJUSTMENT, DEFAULT_INITIAL_BYTES_TO_STRIP, DEFAULT_FAIL_FAST));
-        cp.addLast("readTimeoutHandler", new ReadTimeoutHandler(timer,
-                DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
+        if (StringUtils.isEmpty(protocolType) || this.protocolType
+                .equalsIgnoreCase(ConfigConstants.TCP_PROTOCOL)) {
+            ch.pipeline().addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
+                    sourceContext.getMaxMsgLength(), DEFAULT_LENGTH_FIELD_OFFSET,
+                    DEFAULT_LENGTH_FIELD_LENGTH,
+                    DEFAULT_LENGTH_ADJUSTMENT, DEFAULT_INITIAL_BYTES_TO_STRIP, DEFAULT_FAIL_FAST));
+            ch.pipeline().addLast("readTimeoutHandler",
+                    new ReadTimeoutHandler(DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
+        }
 
         if (sourceContext.getSource().getChannelProcessor() != null) {
             try {
-                Class<? extends SimpleChannelHandler> clazz = (Class<? extends SimpleChannelHandler>) Class
+                Class<? extends ChannelInboundHandlerAdapter> clazz =
+                        (Class<? extends ChannelInboundHandlerAdapter>) Class
                         .forName(messageHandlerName);
 
                 Constructor<?> ctor = clazz.getConstructor(SourceContext.class);
 
-                SimpleChannelHandler messageHandler = (SimpleChannelHandler) ctor
+                ChannelInboundHandlerAdapter messageHandler = (ChannelInboundHandlerAdapter) ctor
                         .newInstance(sourceContext);
 
-                cp.addLast("messageHandler", messageHandler);
+                ch.pipeline().addLast("messageHandler", messageHandler);
             } catch (Exception e) {
-                LOG.error("SimpleChannelHandler.newInstance  has error:" + sourceContext.getSource().getName(), e);
+                LOG.error("SimpleChannelHandler.newInstance  has error:"
+                        + sourceContext.getSource().getName(), e);
             }
         }
-
-        return cp;
     }
 
     @Override
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.java
index d6658d0..ed9d98a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.java
@@ -17,28 +17,17 @@
 
 package org.apache.inlong.dataproxy.source.tcp;
 
+import io.netty.channel.ChannelInitializer;
 import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
-import org.apache.flume.source.AbstractSource;
-import org.apache.inlong.dataproxy.base.NamedThreadFactory;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.source.SimpleTcpSource;
 import org.apache.inlong.dataproxy.source.SourceContext;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.ThreadRenamingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,32 +36,16 @@ import com.google.common.base.Preconditions;
 /**
  * Inlong tcp source
  */
-public class InlongTcpSource extends AbstractSource implements Configurable, EventDrivenSource {
+public class InlongTcpSource extends SimpleTcpSource
+        implements Configurable, EventDrivenSource {
 
     public static final Logger LOG = LoggerFactory.getLogger(InlongTcpSource.class);
-    public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 64 * 1024;
-    public static final int MAX_RECEIVE_BUFFER_SIZE = 16 * 1024 * 1024;
-    public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
-    public static final int MAX_SEND_BUFFER_SIZE = 16 * 1024 * 1024;
 
-    protected ChannelGroup allChannels;
     protected SourceContext sourceContext;
 
-    protected ServerBootstrap serverBootstrap = null;
-    protected int port;
-    protected String host = null;
     protected String msgFactoryName;
     protected String messageHandlerName;
 
-    private boolean tcpNoDelay = true;
-    private boolean keepAlive = true;
-    private int receiveBufferSize;
-    private int highWaterMark;
-    private int sendBufferSize;
-    private int trafficClass;
-
-    private Channel nettyChannel = null;
-
     private Configurable pipelineFactoryConfigurable = null;
 
     /**
@@ -80,7 +53,6 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
      */
     public InlongTcpSource() {
         super();
-        allChannels = new DefaultChannelGroup();
     }
 
     /**
@@ -88,71 +60,8 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
      */
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public synchronized void start() {
-        try {
-            LOG.info("start " + this.getName());
-
-            ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
-            ChannelFactory factory = new NioServerSocketChannelFactory(
-                    Executors.newCachedThreadPool(
-                            new NamedThreadFactory("tcpSource-nettyBoss-threadGroup")),
-                    1,
-                    Executors.newCachedThreadPool(
-                            new NamedThreadFactory("tcpSource-nettyWorker-threadGroup")),
-                    this.sourceContext.getMaxThreads());
-            LOG.info("Set max workers : {} ;", this.sourceContext.getMaxThreads());
-            ChannelPipelineFactory pipelineFactory = null;
-
-            serverBootstrap = new ServerBootstrap(factory);
-            serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
-            serverBootstrap.setOption("child.keepAlive", keepAlive);
-            serverBootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
-            serverBootstrap.setOption("child.sendBufferSize", sendBufferSize);
-            serverBootstrap.setOption("child.trafficClass", trafficClass);
-            serverBootstrap.setOption("child.writeBufferHighWaterMark", highWaterMark);
-            LOG.info("load msgFactory=" + msgFactoryName
-                    + " and messageHandlerName=" + messageHandlerName);
-            try {
-                Class<? extends ChannelPipelineFactory> clazz = (Class<? extends ChannelPipelineFactory>) Class
-                        .forName(msgFactoryName);
-
-                Constructor ctor = clazz.getConstructor(SourceContext.class);
-
-                LOG.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
-                pipelineFactory = (ChannelPipelineFactory) ctor.newInstance(sourceContext);
-                if (pipelineFactory instanceof Configurable) {
-                    this.pipelineFactoryConfigurable = ((Configurable) pipelineFactory);
-                    this.pipelineFactoryConfigurable.configure(sourceContext.getParentContext());
-                }
-            } catch (Exception e) {
-                LOG.error(
-                        "Inlong Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}",
-                        msgFactoryName, e);
-                stop();
-                throw new FlumeException(e.getMessage(), e);
-            }
-
-            serverBootstrap.setPipelineFactory(pipelineFactory);
-
-            try {
-                if (host == null) {
-                    nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
-                } else {
-                    nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
-                }
-            } catch (Exception e) {
-                LOG.error("Inlong TCP Source error bind host {} port {},program will exit!", host,
-                        port);
-                System.exit(-1);
-            }
-
-            allChannels.add(nettyChannel);
-
-            LOG.info("Inlong TCP Source started at host {}, port {}", host, port);
-            super.start();
-        } catch (Throwable t) {
-            LOG.error(t.getMessage(), t);
-        }
+    public synchronized void startSource() {
+        super.startSource();
     }
 
     /**
@@ -161,28 +70,6 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
     @Override
     public synchronized void stop() {
         LOG.info("[STOP SOURCE]{} stopping...", super.toString());
-        if (allChannels != null && !allChannels.isEmpty()) {
-            try {
-                allChannels.unbind().awaitUninterruptibly();
-                allChannels.close().awaitUninterruptibly();
-            } catch (Exception e) {
-                LOG.warn("Inlong TCP Source netty server stop ex", e);
-            } finally {
-                allChannels.clear();
-            }
-        }
-
-        if (serverBootstrap != null) {
-            try {
-                serverBootstrap.releaseExternalResources();
-            } catch (Exception e) {
-                LOG.warn("Inlong TCP Source serverBootstrap stop ex ", e);
-            } finally {
-                serverBootstrap = null;
-            }
-        }
-
-        LOG.info("[STOP SOURCE]{} stopped", super.getName());
         super.stop();
     }
 
@@ -195,32 +82,13 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
     public void configure(Context context) {
         try {
             LOG.info("context is {}", context);
+            super.configure(context);
             this.sourceContext = new SourceContext(this, allChannels, context);
             // start
             this.sourceContext.start();
 
-            tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true);
-            keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true);
-            highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, DEFAULT_RECEIVE_BUFFER_SIZE);
-            receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, DEFAULT_RECEIVE_BUFFER_SIZE);
-            if (receiveBufferSize > MAX_RECEIVE_BUFFER_SIZE) {
-                receiveBufferSize = MAX_RECEIVE_BUFFER_SIZE;
-            }
-            Preconditions.checkArgument(receiveBufferSize > 0, "receiveBufferSize must be > 0");
-
-            sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, DEFAULT_SEND_BUFFER_SIZE);
-            if (sendBufferSize > MAX_SEND_BUFFER_SIZE) {
-                sendBufferSize = MAX_SEND_BUFFER_SIZE;
-            }
-            Preconditions.checkArgument(sendBufferSize > 0, "sendBufferSize must be > 0");
-
-            trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, 0);
-            Preconditions.checkArgument((trafficClass == 0 || trafficClass == 96),
-                    "trafficClass must be == 0 or == 96");
-
             msgFactoryName = context.getString(ConfigConstants.MSG_FACTORY_NAME,
-                    InlongTcpChannelPipelineFactory.class.getName());
-            msgFactoryName = msgFactoryName.trim();
+                    InlongTcpChannelPipelineFactory.class.getName()).trim();
             Preconditions.checkArgument(StringUtils.isNotBlank(msgFactoryName),
                     "msgFactoryName is empty");
 
@@ -237,4 +105,36 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
             LOG.error(t.getMessage(), t);
         }
     }
+
+    public ChannelInitializer getChannelInitializerFactory() {
+        LOG.info(new StringBuffer("load msgFactory=").append(msgFactoryName)
+                .append(" and serviceDecoderName=").append(serviceDecoderName).toString());
+
+        ChannelInitializer fac = null;
+        try {
+            Class<? extends ChannelInitializer> clazz =
+                    (Class<? extends ChannelInitializer>) Class.forName(msgFactoryName);
+
+            Constructor ctor = clazz.getConstructor(SourceContext.class, String.class);
+
+            LOG.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
+            fac = (ChannelInitializer) ctor.newInstance(sourceContext,
+                    this.getProtocolName());
+            if (fac instanceof Configurable) {
+                this.pipelineFactoryConfigurable = ((Configurable) fac);
+                this.pipelineFactoryConfigurable.configure(sourceContext.getParentContext());
+            }
+        } catch (Exception e) {
+            LOG.error(
+                    "Inlong Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}",
+                    msgFactoryName, e);
+            stop();
+            throw new FlumeException(e.getMessage(), e);
+        }
+        return fac;
+    }
+
+    public String getProtocolName() {
+        return "tcp";
+    }
 }