You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/21 01:22:11 UTC

[incubator-inlong] branch master updated: [INLONG-1950][DataProxy] DataProxy add supporting to udp protocol for reporting data (#2185)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 897c895  [INLONG-1950][DataProxy] DataProxy add supporting to udp protocol for reporting data (#2185)
897c895 is described below

commit 897c8958385f33a9ebb25e5cfb8cc1d5388e9e38
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Jan 21 09:22:03 2022 +0800

    [INLONG-1950][DataProxy] DataProxy add supporting to udp protocol for reporting data (#2185)
---
 .../apache/inlong/dataproxy/source/BaseSource.java | 283 +++++++++++++++++++++
 .../inlong/dataproxy/source/ConfStringUtils.java   |  48 ++++
 .../dataproxy/source/DefaultServiceDecoder.java    |  46 +++-
 .../dataproxy/source/ServerMessageFactory.java     |  65 +++--
 .../dataproxy/source/ServerMessageHandler.java     | 195 ++++++++------
 .../inlong/dataproxy/source/ServiceDecoder.java    |   3 +-
 .../dataproxy/source/SimpleMessageHandler.java     |  14 +-
 .../inlong/dataproxy/source/SimpleTcpSource.java   | 196 ++++----------
 .../inlong/dataproxy/source/SimpleUdpSource.java   | 103 ++++++++
 .../inlong/dataproxy/source/UdpSourceTest.java     |  43 ++++
 10 files changed, 740 insertions(+), 256 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
new file mode 100644
index 0000000..5ac6121
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.source;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.commons.monitor.MonitorIndex;
+import org.apache.inlong.commons.monitor.MonitorIndexExt;
+import org.jboss.netty.bootstrap.Bootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * source base clase
+ *
+ */
+public abstract class BaseSource
+        extends AbstractSource
+        implements EventDrivenSource, Configurable {
+  private static final Logger logger = LoggerFactory.getLogger(BaseSource.class);
+
+  protected Context context;
+
+  protected int port;
+
+  protected String host = null;
+
+  protected String msgFactoryName;
+
+  protected String serviceDecoderName;
+
+  protected String messageHandlerName;
+
+  protected int maxMsgLength;
+
+  protected boolean isCompressed;
+
+  protected String topic;
+
+  protected String attr;
+
+  protected boolean filterEmptyMsg;
+
+  private int statIntervalSec;
+
+  protected int pkgTimeoutSec;
+
+  protected int maxConnections = Integer.MAX_VALUE;
+
+  private static final String CONNECTIONS = "connections";
+
+  protected boolean customProcessor = false;
+
+  /*
+   * monitor
+   */
+  private MonitorIndex monitorIndex;
+
+  private MonitorIndexExt monitorIndexExt;
+
+  /*
+   * netty server
+   */
+
+  protected Bootstrap serverBootstrap = null;
+
+  protected ChannelGroup allChannels;
+
+  protected Channel nettyChannel = null;
+
+  private static String HOST_DEFAULT_VALUE = "0.0.0.0";
+
+  private static int maxMonitorCnt = 300000;
+
+  private static int DEFAULT_MAX_CONNECTIONS = 5000;
+
+  private static int STAT_INTERVAL_MUST_THAN = 0;
+
+  private static int PKG_TIMEOUT_DEFAULT_SEC = 3;
+
+  private static int MSG_MIN_LENGTH = 4;
+
+  private static int MAX_MSG_DEFAULT_LENGTH = 1024 * 64;
+
+  private static int INTERVAL_SEC = 60;
+
+  protected static int DEFAULT_MAX_THREADS = 32;
+
+  protected int maxThreads = 32;
+
+  public BaseSource() {
+    super();
+    allChannels = new DefaultChannelGroup();
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    /*
+     * init monitor logic
+     */
+    monitorIndex = new MonitorIndex("Source",INTERVAL_SEC, maxMonitorCnt);
+    monitorIndexExt = new MonitorIndexExt("DataProxy_monitors#"
+            + this.getProtocolName(),INTERVAL_SEC, maxMonitorCnt);
+    startSource();
+  }
+
+  @Override
+  public synchronized void stop() {
+    logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(), this.getName());
+    if (!allChannels.isEmpty()) {
+      try {
+        allChannels.close().awaitUninterruptibly();
+      } catch (Exception e) {
+        logger.warn("Simple UDP Source netty server stop ex, {}", e);
+      } finally {
+        allChannels.clear();
+      }
+    }
+
+    if (serverBootstrap != null) {
+      try {
+        serverBootstrap.releaseExternalResources();
+      } catch (Exception e) {
+        logger.warn("Simple UDP Source serverBootstrap stop ex {}", e);
+      } finally {
+        serverBootstrap = null;
+      }
+    }
+    super.stop();
+    if (monitorIndex != null) {
+      monitorIndex.shutDown();
+    }
+    if (monitorIndexExt != null) {
+      monitorIndexExt.shutDown();
+    }
+    logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), this.getName());
+  }
+
+  @Override
+  public void configure(Context context) {
+    this.context = context;
+
+    port = context.getInteger(ConfigConstants.CONFIG_PORT);
+
+    host = context.getString(ConfigConstants.CONFIG_HOST, HOST_DEFAULT_VALUE);
+
+    Configurables.ensureRequiredNonNull(context, ConfigConstants.CONFIG_PORT);
+
+    Preconditions.checkArgument(ConfStringUtils.isValidIp(host), "ip config not valid");
+    Preconditions.checkArgument(ConfStringUtils.isValidPort(port), "port config not valid");
+
+    msgFactoryName =
+            context.getString(ConfigConstants.MSG_FACTORY_NAME,
+                    "org.apache.inlong.dataproxy.source.ServerMessageFactory");
+    msgFactoryName = msgFactoryName.trim();
+    Preconditions.checkArgument(StringUtils.isNotBlank(msgFactoryName),
+            "msgFactoryName is empty");
+
+    serviceDecoderName =
+            context.getString(ConfigConstants.SERVICE_PROCESSOR_NAME,
+                    "org.apache.inlong.dataproxy.source.DefaultServiceDecoder");
+    serviceDecoderName = serviceDecoderName.trim();
+    Preconditions.checkArgument(StringUtils.isNotBlank(serviceDecoderName),
+            "serviceProcessorName is empty");
+
+    messageHandlerName =
+            context.getString(ConfigConstants.MESSAGE_HANDLER_NAME,
+                    "org.apache.inlong.dataproxy.source.ServerMessageHandler");
+    messageHandlerName = messageHandlerName.trim();
+    Preconditions.checkArgument(StringUtils.isNotBlank(messageHandlerName),
+            "messageHandlerName is empty");
+
+    maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, MAX_MSG_DEFAULT_LENGTH);
+    Preconditions.checkArgument(
+            (maxMsgLength >= MSG_MIN_LENGTH && maxMsgLength <= ConfigConstants.MSG_MAX_LENGTH_BYTES),
+            "maxMsgLength must be >= 4 and <= " + ConfigConstants.MSG_MAX_LENGTH_BYTES);
+    isCompressed = context.getBoolean(ConfigConstants.MSG_COMPRESSED, true);
+
+    filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false);
+
+    topic = context.getString(ConfigConstants.TOPIC);
+    attr = context.getString(ConfigConstants.ATTR);
+    Configurables.ensureRequiredNonNull(context, ConfigConstants.TOPIC, ConfigConstants.ATTR);
+
+    topic = topic.trim();
+    Preconditions.checkArgument(!topic.isEmpty(), "topic is empty");
+    attr = attr.trim();
+    Preconditions.checkArgument(!attr.isEmpty(), "attr is empty");
+
+    filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false);
+
+    statIntervalSec = context.getInteger(ConfigConstants.STAT_INTERVAL_SEC, INTERVAL_SEC);
+    Preconditions.checkArgument((statIntervalSec >= STAT_INTERVAL_MUST_THAN), "statIntervalSec must be >= 0");
+
+    pkgTimeoutSec = context.getInteger(ConfigConstants.PACKAGE_TIMEOUT_SEC, PKG_TIMEOUT_DEFAULT_SEC);
+
+    try {
+      maxConnections = context.getInteger(CONNECTIONS, DEFAULT_MAX_CONNECTIONS);
+    } catch (NumberFormatException e) {
+      logger.warn("BaseSource\'s \"connections\" property must specify an integer value.",
+              context.getString(CONNECTIONS));
+    }
+
+    try {
+      maxThreads = context.getInteger(ConfigConstants.MAX_THREADS, DEFAULT_MAX_THREADS);
+    } catch (NumberFormatException e) {
+      logger.warn("Simple TCP Source max-threads property must specify an integer value. {}",
+              context.getString(ConfigConstants.MAX_THREADS));
+    }
+
+    this.customProcessor = context.getBoolean(ConfigConstants.CUSTOM_CHANNEL_PROCESSOR, false);
+  }
+
+  /**
+   * channel factory
+   * @return
+   */
+  public ChannelPipelineFactory getChannelPiplineFactory() {
+    logger.info(new StringBuffer("load msgFactory=").append(msgFactoryName)
+            .append(" and serviceDecoderName=").append(serviceDecoderName).toString());
+    ChannelPipelineFactory fac = null;
+    try {
+      ServiceDecoder serviceDecoder = (ServiceDecoder)Class.forName(serviceDecoderName).newInstance();
+      Class<? extends ChannelPipelineFactory> clazz =
+              (Class<? extends ChannelPipelineFactory>) Class.forName(msgFactoryName);
+      Constructor ctor = clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
+              String.class, ServiceDecoder.class, String.class, Integer.class,
+              String.class, String.class, Boolean.class,
+              Integer.class, Boolean.class, MonitorIndex.class,
+              MonitorIndexExt.class, String.class);
+      logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
+      fac = (ChannelPipelineFactory) ctor.newInstance(this, allChannels,
+              this.getProtocolName(), serviceDecoder, messageHandlerName, maxMsgLength,
+              topic, attr, filterEmptyMsg,
+              maxConnections, isCompressed, monitorIndex,
+              monitorIndexExt, this.getProtocolName());
+    } catch (Exception e) {
+      logger.error(
+              "Simple {} Source start error, fail to construct ChannelPipelineFactory with name "
+                      + "{}, ex {}",this.getProtocolName(), msgFactoryName, e);
+      stop();
+      throw new FlumeException(e.getMessage());
+    }
+    return fac;
+  }
+
+  public Context getContext() {
+    return context;
+  }
+
+  public abstract String getProtocolName();
+
+  public abstract void startSource();
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ConfStringUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ConfStringUtils.java
new file mode 100644
index 0000000..618cde6
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ConfStringUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.source;
+
+public class ConfStringUtils {
+    public static boolean isValidIp(String ip) {
+        if (ip == null || ip.trim().isEmpty()) {
+            return false;
+        }
+        boolean b = false;
+        ip = ip.trim();
+        if (ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
+            String[] s = ip.split("\\.");
+            if (Integer.parseInt(s[0]) < 255) {
+                if (Integer.parseInt(s[1]) < 255) {
+                    if (Integer.parseInt(s[2]) < 255) {
+                        if (Integer.parseInt(s[3]) < 255) {
+                            b = true;
+                        }
+                    }
+                }
+            }
+        }
+        return b;
+    }
+
+    public static boolean isValidPort(int port) {
+        if (port < 0 || port > 65535) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index 28c4681..d2c7ff6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -36,6 +36,7 @@ import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.MessageEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
@@ -76,13 +77,14 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * @param resultMap
      * @param cb
      * @param channel
+     * @param msgEvent
      * @param totalDataLen
      * @return
      * @throws
      */
     private Map<String, Object> extractNewBinHB(Map<String, Object> resultMap,
-                                                ChannelBuffer cb, Channel channel,
-                                                int totalDataLen) throws Exception {
+            ChannelBuffer cb, Channel channel,
+            MessageEvent msgEvent, int totalDataLen) throws Exception {
         int msgHeadPos = cb.readerIndex() - 5;
 
         // check validation
@@ -95,7 +97,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
                 + attrLen + BIN_HB_FORMAT_SIZE)) || (msgMagic != BIN_MSG_MAGIC)) {
 
             LOG.error("err msg, bodyLen + attrLen > totalDataLen, "
-                            + "and bodyLen={},attrLen={},totalDataLen={},magic={};Connection info:{}",
+                            + "and bodyLen={},attrLen={},totalDataLen={},magic={};Connection "
+                            + "info:{}",
                     bodyLen, attrLen, totalDataLen, Integer.toHexString(msgMagic), channel.toString());
 
             return resultMap;
@@ -116,7 +119,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
     }
 
     private void handleDateTime(Map<String, String> commonAttrMap, Channel channel,
-        long uniq, long dataTime, int msgCount) {
+            MessageEvent msgEvent,
+            long uniq, long dataTime, int msgCount) {
         commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
         String time = "";
         if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
@@ -126,7 +130,16 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             time = String.valueOf(dataTime);
         }
         StringBuilder sidBuilder = new StringBuilder();
-        sidBuilder.append(channel.getRemoteAddress().toString()).append("#").append(time)
+        /*
+         * udp need use msgEvent get remote address
+         */
+        String remoteAddress = "";
+        if (channel != null && channel.getRemoteAddress() != null) {
+            remoteAddress = channel.getRemoteAddress().toString();
+        } else if (msgEvent != null && msgEvent.getRemoteAddress() != null) {
+            remoteAddress = msgEvent.getRemoteAddress().toString();
+        }
+        sidBuilder.append(remoteAddress).append("#").append(time)
             .append("#").append(uniq);
         commonAttrMap.put(AttributeConstants.SEQUENCE_ID, new String(sidBuilder));
 
@@ -224,14 +237,15 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * @param resultMap
      * @param cb
      * @param channel
+     * @param msgEvent
      * @param totalDataLen
      * @param msgType
      * @return
      * @throws Exception
      */
     private Map<String, Object> extractNewBinData(Map<String, Object> resultMap,
-                                                  ChannelBuffer cb, Channel channel,
-                                                  int totalDataLen, MsgType msgType) throws Exception {
+            ChannelBuffer cb, Channel channel, MessageEvent msgEvent,
+            int totalDataLen, MsgType msgType) throws Exception {
         int msgHeadPos = cb.readerIndex() - 5;
 
         int bodyLen = cb.getInt(msgHeadPos + BIN_MSG_BODYLEN_OFFSET);
@@ -292,7 +306,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
         }
 
         try {
-            handleDateTime(commonAttrMap, channel, uniq, dataTime, msgCount);
+            handleDateTime(commonAttrMap, channel, msgEvent, uniq, dataTime, msgCount);
             final boolean index = handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos);
             ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos,
                 totalDataLen, attrLen, strAttr, bodyLen);
@@ -330,6 +344,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
                 resultMap.put(ConfigConstants.MSG_LIST, msgList);
             }
         } catch (Exception ex) {
+            LOG.error("extractNewBinData has error! ex = {}", ex);
             cb.clear();
             throw new MessageIDException(uniq, ErrorCode.OTHER_ERROR, ex.getCause());
         }
@@ -343,13 +358,15 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * @param cb
      * @param channel
      * @param totalDataLen
+     * @param msgEvent
      * @param msgType
      * @return
      * @throws Exception
      */
     private Map<String, Object> extractDefaultData(Map<String, Object> resultMap,
-                                                   ChannelBuffer cb, Channel channel,
-                                                   int totalDataLen, MsgType msgType) throws Exception {
+            ChannelBuffer cb, Channel channel,
+            MessageEvent msgEvent,
+            int totalDataLen, MsgType msgType) throws Exception {
         int bodyLen = cb.readInt();
         if (bodyLen == 0) {
             throw new Exception(new Throwable("err msg,  bodyLen is empty" + ";"
@@ -464,7 +481,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
      * +--------+--------+--------+----------------+--------+----------------+------------------------+
      */
     @Override
-    public Map<String, Object> extractData(ChannelBuffer cb, Channel channel) throws Exception {
+    public Map<String, Object> extractData(ChannelBuffer cb, Channel channel,
+            MessageEvent msgEvent) throws Exception {
         Map<String, Object> resultMap = new HashMap<String, Object>();
         if (null == cb) {
             LOG.error("cb == null");
@@ -491,14 +509,14 @@ public class DefaultServiceDecoder implements ServiceDecoder {
             }
             // if it's bin heart beat.
             if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-                return extractNewBinHB(resultMap, cb, channel, totalDataLen);
+                return extractNewBinHB(resultMap, cb, channel, msgEvent, totalDataLen);
             }
 
             if (msgType.getValue() >= MsgType.MSG_BIN_MULTI_BODY.getValue()) {
                 resultMap.put(ConfigConstants.COMPRESS_TYPE, (compressType != 0) ? "snappy" : "");
-                return extractNewBinData(resultMap, cb, channel, totalDataLen, msgType);
+                return extractNewBinData(resultMap, cb, channel, msgEvent, totalDataLen, msgType);
             } else {
-                return extractDefaultData(resultMap, cb, channel, totalDataLen, msgType);
+                return extractDefaultData(resultMap, cb, channel, msgEvent, totalDataLen, msgType);
             }
 
         } else {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index 69c4856..fa42169 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -19,10 +19,11 @@ package org.apache.inlong.dataproxy.source;
 
 import java.lang.reflect.Constructor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.commons.monitor.MonitorIndex;
+import org.apache.inlong.commons.monitor.MonitorIndexExt;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
@@ -40,21 +41,47 @@ import org.slf4j.LoggerFactory;
 public class ServerMessageFactory implements ChannelPipelineFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(ServerMessageFactory.class);
+
     private static final int DEFAULT_READ_IDLE_TIME = 70 * 60 * 1000;
+
+    private static long MAX_CHANNEL_MEMORY_SIZE = 1024 * 1024;
+
+    private static long MAX_TOTAL_MEMORY_SIZE = 1024 * 1024;
+
+    private static int MSG_LENGTH_LEN = 4;
+
     private AbstractSource source;
+
     private ChannelProcessor processor;
+
     private ChannelGroup allChannels;
+
     private ExecutionHandler executionHandler;
+
     private String protocolType;
-    private ServiceDecoder serviceProcessor;
+
+    private ServiceDecoder serviceDecoder;
+
     private String messageHandlerName;
+
     private int maxConnections = Integer.MAX_VALUE;
+
     private int maxMsgLength;
+
     private boolean isCompressed;
+
     private String name;
+
     private String topic;
+
     private String attr;
+
     private boolean filterEmptyMsg;
+
+    private MonitorIndex monitorIndex;
+
+    private MonitorIndexExt monitorIndexExt;
+
     private Timer timer = new HashedWheelTimer();
 
     /**
@@ -63,21 +90,22 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
      * @param source
      * @param allChannels
      * @param protocol
-     * @param serProcessor
+     * @param serviceDecoder
      * @param messageHandlerName
-     * @param maxMsgLength
      * @param topic
      * @param attr
      * @param filterEmptyMsg
      * @param maxCons
      * @param isCompressed
+     * @param monitorIndex
+     * @param monitorIndexExt
      * @param name
      */
-    public ServerMessageFactory(AbstractSource source,
-                                ChannelGroup allChannels, String protocol, ServiceDecoder serProcessor,
-                                String messageHandlerName, Integer maxMsgLength,
-                                String topic, String attr, Boolean filterEmptyMsg, Integer maxCons,
-                                Boolean isCompressed, String name) {
+    public ServerMessageFactory(AbstractSource source, ChannelGroup allChannels, String protocol,
+            ServiceDecoder serviceDecoder, String messageHandlerName, Integer maxMsgLength,
+            String topic, String attr, Boolean filterEmptyMsg, Integer maxCons,
+            Boolean isCompressed, MonitorIndex monitorIndex, MonitorIndexExt monitorIndexExt,
+            String name) {
         this.source = source;
         this.processor = source.getChannelProcessor();
         this.allChannels = allChannels;
@@ -86,17 +114,18 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
         this.filterEmptyMsg = filterEmptyMsg;
         int cores = Runtime.getRuntime().availableProcessors();
         this.protocolType = protocol;
-        this.serviceProcessor = serProcessor;
+        this.serviceDecoder = serviceDecoder;
         this.messageHandlerName = messageHandlerName;
         this.name = name;
         this.maxConnections = maxCons;
         this.maxMsgLength = maxMsgLength;
         this.isCompressed = isCompressed;
-
+        this.monitorIndex = monitorIndex;
+        this.monitorIndexExt = monitorIndexExt;
         if (protocolType.equalsIgnoreCase(ConfigConstants.UDP_PROTOCOL)) {
             this.executionHandler = new ExecutionHandler(
                     new OrderedMemoryAwareThreadPoolExecutor(cores * 2,
-                            1024 * 1024, 1024 * 1024));
+                            MAX_CHANNEL_MEMORY_SIZE, MAX_TOTAL_MEMORY_SIZE));
         }
     }
 
@@ -116,7 +145,7 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
         if (this.protocolType
                 .equalsIgnoreCase(ConfigConstants.TCP_PROTOCOL)) {
             cp.addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
-                    this.maxMsgLength, 0, 4, 0, 0, true));
+                    this.maxMsgLength, 0, MSG_LENGTH_LEN, 0, 0, true));
             cp.addLast("readTimeoutHandler", new ReadTimeoutHandler(timer,
                     DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
         }
@@ -128,12 +157,14 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
 
                 Constructor<?> ctor = clazz.getConstructor(
                         AbstractSource.class, ServiceDecoder.class, ChannelGroup.class,
-                        String.class, String.class, Boolean.class, Integer.class,
-                        Integer.class, Boolean.class, String.class);
+                        String.class, String.class, Boolean.class,
+                        Integer.class, Boolean.class, MonitorIndex.class,
+                        MonitorIndexExt.class, String.class);
 
                 SimpleChannelHandler messageHandler = (SimpleChannelHandler) ctor
-                        .newInstance(source, serviceProcessor, allChannels, topic, attr,
-                                filterEmptyMsg, maxMsgLength, maxConnections, isCompressed, protocolType
+                        .newInstance(source, serviceDecoder, allChannels, topic, attr,
+                                filterEmptyMsg, maxConnections,
+                                isCompressed,  monitorIndex, monitorIndexExt, protocolType
                         );
 
                 cp.addLast("messageHandler", messageHandler);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 8925916..2ab09f5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -22,6 +22,8 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
 import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -32,7 +34,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
@@ -48,6 +49,9 @@ import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.commons.monitor.MonitorIndex;
+import org.apache.inlong.commons.monitor.MonitorIndexExt;
+import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -60,9 +64,6 @@ import org.jboss.netty.channel.group.ChannelGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-
 /**
  * Server message handler
  *
@@ -71,10 +72,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
 
     private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
+
     private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
+
     private static final ConfigManager configManager = ConfigManager.getInstance();
+
     private static final Joiner.MapJoiner mapJoiner = Joiner.on(AttributeConstants.SEPARATOR)
             .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+
     private static final Splitter.MapSplitter mapSplitter = Splitter
             .on(AttributeConstants.SEPARATOR)
             .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
@@ -86,6 +91,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                     return new SimpleDateFormat("yyyyMMddHHmm");
                 }
             };
+
     private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer =
             new ThreadLocal<SimpleDateFormat>() {
                 @Override
@@ -94,40 +100,43 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 }
             };
     private AbstractSource source;
+
     private final ChannelGroup allChannels;
+
     private int maxConnections = Integer.MAX_VALUE;
+
     private boolean filterEmptyMsg = false;
+
     private final boolean isCompressed;
+
     private final ChannelProcessor processor;
-    private final ServiceDecoder serviceProcessor;
+
+    private final ServiceDecoder serviceDecoder;
+
     private final String defaultTopic;
+
     private String defaultMXAttr = "m=3";
+
     private final ChannelBuffer heartbeatBuffer;
+
     private final String protocolType;
+
+
+    private MonitorIndex monitorIndex;
+
+    private MonitorIndexExt monitorIndexExt;
+
     //
     private final DataProxyMetricItemSet metricItemSet;
 
-    /**
-     * ServerMessageHandler
-     * @param source
-     * @param serProcessor
-     * @param allChannels
-     * @param topic
-     * @param attr
-     * @param filterEmptyMsg
-     * @param maxMsgLength
-     * @param maxCons
-     * @param isCompressed
-     * @param protocolType
-     */
-    public ServerMessageHandler(AbstractSource source, ServiceDecoder serProcessor,
-                                ChannelGroup allChannels,
-                                String topic, String attr, Boolean filterEmptyMsg, Integer maxMsgLength,
-                                Integer maxCons,
-                                Boolean isCompressed, String protocolType) {
+    public ServerMessageHandler(AbstractSource source, ServiceDecoder serviceDecoder,
+            ChannelGroup allChannels,
+            String topic, String attr, Boolean filterEmptyMsg,
+            Integer maxCons, Boolean isCompressed, MonitorIndex monitorIndex,
+            MonitorIndexExt monitorIndexExt, String protocolType) {
         this.source = source;
         this.processor = source.getChannelProcessor();
-        this.serviceProcessor = serProcessor;
+        this.serviceDecoder = serviceDecoder;
         this.allChannels = allChannels;
         this.defaultTopic = topic;
         if (null != attr) {
@@ -144,11 +153,20 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         } else {
             this.metricItemSet = new DataProxyMetricItemSet(this.toString());
         }
+        this.monitorIndex = monitorIndex;
+        this.monitorIndexExt = monitorIndexExt;
     }
 
     private String getRemoteIp(Channel channel) {
+        return getRemoteIp(channel, null);
+    }
+
+    private String getRemoteIp(Channel channel, SocketAddress remoteAddress) {
         String strRemoteIp = DEFAULT_REMOTE_IP_VALUE;
         SocketAddress remoteSocketAddress = channel.getRemoteAddress();
+        if (remoteSocketAddress == null) {
+            remoteSocketAddress = remoteAddress;
+        }
         if (null != remoteSocketAddress) {
             strRemoteIp = remoteSocketAddress.toString();
             try {
@@ -237,7 +255,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     }
 
     private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
-        Map<String, String> attrMap, AtomicReference<String> topicInfo) {
+            Map<String, String> attrMap, AtomicReference<String> topicInfo) {
         String groupId = message.getGroupId();
         String streamId = message.getStreamId();
         if (null != groupId) {
@@ -245,10 +263,10 @@ public class ServerMessageHandler extends SimpleChannelHandler {
             if ("dc".equals(from)) {
                 String dcInterfaceId = message.getStreamId();
                 if (StringUtils.isNotEmpty(dcInterfaceId)
-                    && configManager.getDcMappingProperties()
-                    .containsKey(dcInterfaceId.trim())) {
+                        && configManager.getDcMappingProperties()
+                        .containsKey(dcInterfaceId.trim())) {
                     groupId = configManager.getDcMappingProperties()
-                        .get(dcInterfaceId.trim()).trim();
+                            .get(dcInterfaceId.trim()).trim();
                     message.setGroupId(groupId);
                 }
             }
@@ -270,16 +288,16 @@ public class ServerMessageHandler extends SimpleChannelHandler {
             String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
 
             if (configManager.getGroupIdMappingProperties() != null
-                && configManager.getStreamIdMappingProperties() != null) {
+                    && configManager.getStreamIdMappingProperties() != null) {
                 groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
                 streamId = (configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
-                    ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+                        ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
                 if (groupId != null && streamId != null) {
                     String enableTrans =
-                        (configManager.getGroupIdEnableMappingProperties() == null)
-                            ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
+                            (configManager.getGroupIdEnableMappingProperties() == null)
+                                    ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
                     if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
-                        .equalsIgnoreCase(num2name))) {
+                            .equalsIgnoreCase(num2name))) {
                         String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
                         message.setData(newBinMsg(message.getData(), extraAttr));
                     }
@@ -299,8 +317,8 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     }
 
     private void updateMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap,
-        Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-        String strRemoteIP, MsgType msgType) {
+            Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+            String strRemoteIP, MsgType msgType) {
         for (ProxyMessage message : msgList) {
             Map<String, String> attrMap = message.getAttributeMap();
 
@@ -310,7 +328,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
             checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo);
             topic = topicInfo.get();
 
-//                if(groupId==null)groupId="b_test";//default groupId
+            //                if(groupId==null)groupId="b_test";//default groupId
 
             message.setTopic(topic);
             commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP);
@@ -327,15 +345,15 @@ public class ServerMessageHandler extends SimpleChannelHandler {
             if (groupId != null && streamId != null) {
                 String tubeSwtichKey = groupId + SEPARATOR + streamId;
                 if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) != null
-                    && "false".equals(configManager.getTubeSwitchProperties()
-                    .get(tubeSwtichKey).trim())) {
+                        && "false".equals(configManager.getTubeSwitchProperties()
+                        .get(tubeSwtichKey).trim())) {
                     continue;
                 }
             }
 
             if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))
-                && !MsgType.MSG_MULTI_BODY.equals(msgType)
-                && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
+                    && !MsgType.MSG_MULTI_BODY.equals(msgType)
+                    && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
                 byte[] data = message.getData();
                 if (data[data.length - 1] == '\n') {
                     int tripDataLen = data.length - 1;
@@ -352,16 +370,16 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 streamId = "";
             }
             HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
-                .computeIfAbsent(topic, k -> new HashMap<>());
+                    .computeIfAbsent(topic, k -> new HashMap<>());
             List<ProxyMessage> streamIdMsgList = streamIdMsgMap
-                .computeIfAbsent(streamId, k -> new ArrayList<>());
+                    .computeIfAbsent(streamId, k -> new ArrayList<>());
             streamIdMsgList.add(message);
         }
     }
 
     private void formatMessagesAndSend(Map<String, String> commonAttrMap,
-        Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-        String strRemoteIP, MsgType msgType) throws MessageIDException {
+            Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+            String strRemoteIP, MsgType msgType) throws MessageIDException {
 
         int tdMsgVer = 1;
         if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
@@ -419,7 +437,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
 
                     StringBuilder sidBuilder = new StringBuilder();
                     sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
-                        .append(SEPARATOR).append(sequenceId);
+                            .append(SEPARATOR).append(sequenceId);
                     headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
                 }
 
@@ -432,19 +450,33 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 } catch (Exception e1) {
                     long uniqVal = Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
                     throw new MessageIDException(uniqVal,
-                        ErrorCode.DT_ERROR,
-                        new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
-                            + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
-                            + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
+                            ErrorCode.DT_ERROR,
+                            new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
+                                    + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
+                                    + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
                 }
 
                 dtten = dtten / 1000 / 60 / 10;
                 dtten = dtten * 1000 * 60 * 10;
+                StringBuilder newbase = new StringBuilder();
+                newbase.append(protocolType).append(SEPARATOR)
+                        .append(topicEntry.getKey()).append(SEPARATOR)
+                        .append(streamIdEntry.getKey()).append(SEPARATOR)
+                        .append(strRemoteIP).append(SEPARATOR)
+                        .append(NetworkUtils.getLocalIp()).append(SEPARATOR)
+                        .append(new SimpleDateFormat("yyyyMMddHHmm")
+                                .format(dtten)).append(SEPARATOR).append(pkgTimeStr);
                 try {
                     processor.processEvent(event);
+                    monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
                     this.addMetric(true, data.length);
+                    monitorIndex.addAndGet(new String(newbase),
+                            Integer.parseInt(proxyMetricMsgCnt), 1, data.length, 0);
                 } catch (Throwable ex) {
                     logger.error("Error writting to channel,data will discard.", ex);
+                    monitorIndexExt.incrementAndGet("EVENT_DROPPED");
+                    monitorIndex.addAndGet(new String(newbase), 0,0,0,
+                            Integer.parseInt(proxyMetricMsgCnt));
                     this.addMetric(false, data.length);
                     throw new ChannelException("ProcessEvent error can't write event to channel.");
                 }
@@ -453,15 +485,15 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     }
 
     private void responsePackage(Map<String, String> commonAttrMap,
-        Map<String, Object> resultMap,
-        Channel remoteChannel,
-        SocketAddress remoteSocketAddress,
-        MsgType msgType) throws Exception {
+            Map<String, Object> resultMap,
+            Channel remoteChannel,
+            SocketAddress remoteSocketAddress,
+            MsgType msgType) throws Exception {
         if (!commonAttrMap.containsKey("isAck") || "true".equals(commonAttrMap.get("isAck"))) {
             if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN
-                .equals(msgType)
-                || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR
-                .equals(msgType)) {
+                    .equals(msgType)
+                    || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR
+                    .equals(msgType)) {
                 byte[] backAttr = mapJoiner.join(commonAttrMap).getBytes(StandardCharsets.UTF_8);
                 byte[] backBody = null;
 
@@ -486,14 +518,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                     } else {
                         String backAttrStr = new String(backAttr, StandardCharsets.UTF_8);
                         logger.warn(
-                            "the send buffer1 is full, so disconnect it!please check remote client"
-                                + "; Connection info:"
-                                + remoteChannel + ";attr is " + backAttrStr);
+                                "the send buffer1 is full, so disconnect it!please check remote client"
+                                        + "; Connection info:"
+                                        + remoteChannel + ";attr is " + backAttrStr);
                         throw new Exception(new Throwable(
-                            "the send buffer1 is full, so disconnect it!please check remote client"
-                                +
-                                "; Connection info:" + remoteChannel + ";attr is "
-                                + backAttrStr));
+                                "the send buffer1 is full, so disconnect it!please check remote client"
+                                        +
+                                        "; Connection info:" + remoteChannel + ";attr is "
+                                        + backAttrStr));
                     }
                 }
             } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
@@ -531,12 +563,12 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                     remoteChannel.write(binBuffer, remoteSocketAddress);
                 } else {
                     logger.warn(
-                        "the send buffer2 is full, so disconnect it!please check remote client"
-                            + "; Connection info:" + remoteChannel + ";attr is "
-                            + backattrs);
+                            "the send buffer2 is full, so disconnect it!please check remote client"
+                                    + "; Connection info:" + remoteChannel + ";attr is "
+                                    + backattrs);
                     throw new Exception(new Throwable(
-                        "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
-                            + remoteChannel + ";attr is " + backattrs));
+                            "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
+                                    + remoteChannel + ";attr is " + backattrs));
                 }
             }
         }
@@ -544,14 +576,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
 
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-        logger.info("message received");
+        logger.debug("message received");
         if (e == null) {
             logger.error("get null messageevent, just skip");
             this.addMetric(false, 0);
             return;
         }
         ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
-        String strRemoteIP = getRemoteIp(e.getChannel());
+        String strRemoteIP = getRemoteIp(e.getChannel(), e.getRemoteAddress());
         SocketAddress remoteSocketAddress = e.getRemoteAddress();
         int len = cb.readableBytes();
         if (len == 0 && this.filterEmptyMsg) {
@@ -564,8 +596,9 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         Channel remoteChannel = e.getChannel();
         Map<String, Object> resultMap = null;
         try {
-            resultMap = serviceProcessor.extractData(cb, remoteChannel);
+            resultMap = serviceDecoder.extractData(cb, remoteChannel, e);
         } catch (MessageIDException ex) {
+            logger.error("MessageIDException ex = {}", ex);
             this.addMetric(false, 0);
             throw new IOException(ex.getCause());
         }
@@ -584,8 +617,6 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         }
 
         if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-//            ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
-//            remoteChannel.write(binBuffer, remoteSocketAddress);
             this.addMetric(false, 0);
             return;
         }
@@ -602,21 +633,19 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
             Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
                     new HashMap<String, HashMap<String, List<ProxyMessage>>>(
-                    msgList.size());
+                            msgList.size());
 
             updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
 
             formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
 
         } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
-//            logger.info("i am in FILE_CHECK_DATA ");
             Map<String, String> headers = new HashMap<String, String>();
             headers.put("msgtype", "filestatus");
             headers.put(ConfigConstants.FILE_CHECK_DATA,
                     "true");
             for (ProxyMessage message : msgList) {
                 byte[] body = message.getData();
-//                logger.info("data:"+new String(body));
                 Event event = EventBuilder.withBody(body, headers);
                 try {
                     processor.processEvent(event);
@@ -637,7 +666,6 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                     "true");
             for (ProxyMessage message : msgList) {
                 byte[] body = message.getData();
-//                logger.info("data:"+new String(body));
                 Event event = EventBuilder.withBody(body, headers);
                 try {
                     processor.processEvent(event);
@@ -656,11 +684,20 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
         logger.error("exception caught", e.getCause());
+        monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
     }
 
     @Override
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         logger.error("channel closed {}", ctx.getChannel());
+        super.channelClosed(ctx, e);
+        try {
+            e.getChannel().disconnect();
+            e.getChannel().close();
+        } catch (Exception ex) {
+            //
+        }
+        allChannels.remove(e.getChannel());
     }
 
     /**
@@ -689,7 +726,11 @@ public class ServerMessageHandler extends SimpleChannelHandler {
 
     /**
      * addMetric
+<<<<<<< HEAD
      * 
+=======
+     *
+>>>>>>> add udp feature
      * @param result
      * @param size
      */
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
index 520ae27..162d614 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.MessageEvent;
 
 /**
  * decoder interface definition
@@ -36,5 +37,5 @@ public interface ServiceDecoder {
      * @return
      * @throws
      */
-    Map<String, Object> extractData(ChannelBuffer cb, Channel channel) throws Exception;
+    Map<String, Object> extractData(ChannelBuffer cb, Channel channel, MessageEvent e) throws Exception;
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 4512db9..a0cbe05 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -576,16 +576,16 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent msgEvent) throws Exception {
         logger.info("message received");
-        if (e == null) {
+        if (msgEvent == null) {
             logger.error("get null messageevent, just skip");
             this.addMetric(false, 0);
             return;
         }
-        ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
-        String strRemoteIP = getRemoteIp(e.getChannel());
-        SocketAddress remoteSocketAddress = e.getRemoteAddress();
+        ChannelBuffer cb = ((ChannelBuffer) msgEvent.getMessage());
+        String strRemoteIP = getRemoteIp(msgEvent.getChannel());
+        SocketAddress remoteSocketAddress = msgEvent.getRemoteAddress();
         int len = cb.readableBytes();
         if (len == 0 && this.filterEmptyMsg) {
             logger.warn("skip empty msg.");
@@ -594,10 +594,10 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
             return;
         }
 
-        Channel remoteChannel = e.getChannel();
+        Channel remoteChannel = msgEvent.getChannel();
         Map<String, Object> resultMap = null;
         try {
-            resultMap = serviceProcessor.extractData(cb, remoteChannel);
+            resultMap = serviceProcessor.extractData(cb, remoteChannel, msgEvent);
         } catch (MessageIDException ex) {
             this.addMetric(false, 0);
             throw new IOException(ex.getCause());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index af7c8da..60db247 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import com.google.common.base.Preconditions;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.text.SimpleDateFormat;
@@ -29,15 +29,10 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.concurrent.Executors;
-
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
-import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.dataproxy.base.NamedThreadFactory;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -54,45 +49,57 @@ import org.jboss.netty.util.ThreadRenamingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Simple tcp source
  *
  */
-public class SimpleTcpSource extends AbstractSource implements Configurable, EventDrivenSource {
+public class SimpleTcpSource extends BaseSource
+        implements Configurable, EventDrivenSource {
 
     private static final Logger logger = LoggerFactory.getLogger(SimpleTcpSource.class);
+
     public static ArrayList<String> blacklist = new ArrayList<String>();
+
     private static final String blacklistFilePath = "blacklist.properties";
+
+    private static int TRAFFIC_CLASS_TYPE_0 = 0;
+
+    private static int TRAFFIC_CLASS_TYPE_96 = 96;
+
+    private static int BUFFER_SIZE_MUST_THAN = 0;
+
+    private static int HIGH_WATER_MARK_DEFAULT_VALUE = 64 * 1024;
+
+    private static int RECEIVE_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+    private static int SEND_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+    private static int RECEIVE_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+    private static int SEND_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+    private static int DEFAULT_SLEEP_TIME_MS = 5 * 1000;
+
     private static long propsLastModified;
-    private static final String CONNECTIONS = "connections";
-
-    protected int maxConnections = Integer.MAX_VALUE;
-    private ServerBootstrap serverBootstrap = null;
-    protected ChannelGroup allChannels;
-    protected int port;
-    protected String host = null;
-    protected String msgFactoryName;
-    protected String serviceDecoderName;
-    protected String messageHandlerName;
-    protected int maxMsgLength;
-    protected boolean isCompressed;
+
     private CheckBlackListThread checkBlackListThread;
+
     private int maxThreads = 32;
 
     private boolean tcpNoDelay = true;
+
     private boolean keepAlive = true;
+
     private int receiveBufferSize;
+
     private int highWaterMark;
+
     private int sendBufferSize;
+
     private int trafficClass;
 
     protected String topic;
-    protected String attr;
-    protected boolean filterEmptyMsg;
 
-    private Channel nettyChannel = null;
     //
     private DataProxyMetricItemSet metricItemSet;
 
@@ -178,7 +185,7 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
                         logger.info("blacklist.properties:{}\n{}",
                                 formator.format(new Date(blacklistFile.lastModified())), blacklist);
                     }
-                    Thread.sleep(5 * 1000);
+                    Thread.sleep(DEFAULT_SLEEP_TIME_MS);
                     checkBlackList(blacklist, allChannels);
                 } catch (InterruptedException e) {
                     logger.info("ConfigReloader thread exit!");
@@ -191,13 +198,12 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
     }
 
     @Override
-    public synchronized void start() {
+    public synchronized void startSource() {
         logger.info("start " + this.getName());
         this.metricItemSet = new DataProxyMetricItemSet(this.getName());
         MetricRegister.register(metricItemSet);
         checkBlackListThread = new CheckBlackListThread();
         checkBlackListThread.start();
-        super.start();
 
         ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
         ChannelFactory factory =
@@ -207,7 +213,6 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
                         Executors.newCachedThreadPool(
                                 new NamedThreadFactory("tcpSource-nettyWorker-threadGroup")), maxThreads);
         logger.info("Set max workers : {} ;", maxThreads);
-        ChannelPipelineFactory fac = null;
 
         serverBootstrap = new ServerBootstrap(factory);
         serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
@@ -218,39 +223,15 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
         serverBootstrap.setOption("child.writeBufferHighWaterMark", highWaterMark);
         logger.info("load msgFactory=" + msgFactoryName
                 + " and serviceDecoderName=" + serviceDecoderName);
-        try {
-
-            ServiceDecoder serviceDecoder =
-                    (ServiceDecoder) Class.forName(serviceDecoderName).newInstance();
-
-            Class<? extends ChannelPipelineFactory> clazz =
-                    (Class<? extends ChannelPipelineFactory>) Class.forName(msgFactoryName);
-
-            Constructor ctor =
-                    clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
-                            String.class, ServiceDecoder.class, String.class,
-                            Integer.class, String.class, String.class, Boolean.class,
-                            Integer.class, Boolean.class, String.class);
-
-            logger.info("Using channel processor:{}", this.getClass().getName());
-            fac = (ChannelPipelineFactory) ctor.newInstance(this, allChannels,
-                    "tcp", serviceDecoder, messageHandlerName,
-                    maxMsgLength, topic, attr, filterEmptyMsg, maxConnections, isCompressed, this.getName());
-
-        } catch (Exception e) {
-            logger.error("Simple Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}",
-                    msgFactoryName, e);
-            stop();
-            throw new FlumeException(e.getMessage());
-        }
 
+        ChannelPipelineFactory fac = this.getChannelPiplineFactory();
         serverBootstrap.setPipelineFactory(fac);
 
         try {
             if (host == null) {
-                nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
+                nettyChannel = ((ServerBootstrap)serverBootstrap).bind(new InetSocketAddress(port));
             } else {
-                nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
+                nettyChannel = ((ServerBootstrap)serverBootstrap).bind(new InetSocketAddress(host, port));
             }
         } catch (Exception e) {
             logger.error("Simple TCP Source error bind host {} port {},program will exit!", host, port);
@@ -265,59 +246,34 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
 
     @Override
     public synchronized void stop() {
-        logger.info("[STOP SOURCE]{} stopping...", super.getName());
         checkBlackListThread.shutdouwn();
-        if (allChannels != null && !allChannels.isEmpty()) {
-            try {
-                allChannels.unbind().awaitUninterruptibly();
-                allChannels.close().awaitUninterruptibly();
-            } catch (Exception e) {
-                logger.warn("Simple TCP Source netty server stop ex", e);
-            } finally {
-                allChannels.clear();
-                // allChannels = null;
-            }
-        }
-
-        if (serverBootstrap != null) {
-            try {
-
-                serverBootstrap.releaseExternalResources();
-            } catch (Exception e) {
-                logger.warn("Simple TCP Source serverBootstrap stop ex ", e);
-            } finally {
-                serverBootstrap = null;
-            }
-        }
-
         super.stop();
-        logger.info("[STOP SOURCE]{} stopped", super.getName());
     }
 
     @Override
     public void configure(Context context) {
         logger.info("context is {}", context);
-        port = context.getInteger(ConfigConstants.CONFIG_PORT);
-        host = context.getString(ConfigConstants.CONFIG_HOST, "0.0.0.0");
-
+        super.configure(context);
         tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true);
-
         keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true);
-        highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, 64 * 1024);
-        receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, 1024 * 64);
-        if (receiveBufferSize > 16 * 1024 * 1024) {
-            receiveBufferSize = 16 * 1024 * 1024;
+        highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, HIGH_WATER_MARK_DEFAULT_VALUE);
+        receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, RECEIVE_BUFFER_DEFAULT_SIZE);
+        if (receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
+            receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
         }
-        Preconditions.checkArgument(receiveBufferSize > 0, "receiveBufferSize must be > 0");
+        Preconditions.checkArgument(receiveBufferSize > BUFFER_SIZE_MUST_THAN,
+                "receiveBufferSize must be > 0");
 
-        sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, 1024 * 64);
-        if (sendBufferSize > 16 * 1024 * 1024) {
-            sendBufferSize = 16 * 1024 * 1024;
+        sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, SEND_BUFFER_DEFAULT_SIZE);
+        if (sendBufferSize > SEND_BUFFER_MAX_SIZE) {
+            sendBufferSize = SEND_BUFFER_MAX_SIZE;
         }
-        Preconditions.checkArgument(sendBufferSize > 0, "sendBufferSize must be > 0");
+        Preconditions.checkArgument(sendBufferSize > BUFFER_SIZE_MUST_THAN,
+                "sendBufferSize must be > 0");
 
-        trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, 0);
-        Preconditions.checkArgument((trafficClass == 0 || trafficClass == 96),
+        trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, TRAFFIC_CLASS_TYPE_0);
+        Preconditions.checkArgument((trafficClass == TRAFFIC_CLASS_TYPE_0
+                        || trafficClass == TRAFFIC_CLASS_TYPE_96),
                 "trafficClass must be == 0 or == 96");
 
         try {
@@ -326,51 +282,6 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
             logger.warn("Simple TCP Source max-threads property must specify an integer value. {}",
                     context.getString(ConfigConstants.MAX_THREADS));
         }
-
-        try {
-            maxConnections = context.getInteger(CONNECTIONS, 5000);
-        } catch (NumberFormatException e) {
-            logger.warn("BaseSource\'s \"connections\" property must specify an integer value.",
-                    context.getString(CONNECTIONS));
-        }
-
-        topic = context.getString(ConfigConstants.TOPIC);
-        attr = context.getString(ConfigConstants.ATTR);
-        Configurables.ensureRequiredNonNull(context, ConfigConstants.TOPIC, ConfigConstants.ATTR);
-
-        topic = topic.trim();
-        Preconditions.checkArgument(!topic.isEmpty(), "topic is empty");
-        attr = attr.trim();
-        Preconditions.checkArgument(!attr.isEmpty(), "attr is empty");
-
-        filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false);
-
-        msgFactoryName =
-                context.getString(ConfigConstants.MSG_FACTORY_NAME,
-                        "org.apache.inlong.dataproxy.source.ServerMessageFactory");
-        msgFactoryName = msgFactoryName.trim();
-        Preconditions.checkArgument(StringUtils.isNotBlank(msgFactoryName),
-                "msgFactoryName is empty");
-
-        serviceDecoderName =
-                context.getString(ConfigConstants.SERVICE_PROCESSOR_NAME,
-                        "org.apache.inlong.dataproxy.source.DefaultServiceDecoder");
-        serviceDecoderName = serviceDecoderName.trim();
-        Preconditions.checkArgument(StringUtils.isNotBlank(serviceDecoderName),
-                "serviceProcessorName is empty");
-
-        messageHandlerName =
-                context.getString(ConfigConstants.MESSAGE_HANDLER_NAME,
-                        "org.apache.inlong.dataproxy.source.ServerMessageHandler");
-        messageHandlerName = messageHandlerName.trim();
-        Preconditions.checkArgument(StringUtils.isNotBlank(messageHandlerName),
-                "messageHandlerName is empty");
-
-        maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, 1024 * 64);
-        Preconditions.checkArgument(
-                (maxMsgLength >= 4 && maxMsgLength <= ConfigConstants.MSG_MAX_LENGTH_BYTES),
-                "maxMsgLength must be >= 4 and <= " + ConfigConstants.MSG_MAX_LENGTH_BYTES);
-        isCompressed = context.getBoolean(ConfigConstants.MSG_COMPRESSED, true);
     }
 
     /**
@@ -380,4 +291,9 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
     public DataProxyMetricItemSet getMetricItemSet() {
         return metricItemSet;
     }
+
+    @Override
+    public String getProtocolName() {
+        return "tcp";
+    }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
new file mode 100644
index 0000000..7c09c30
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.dataproxy.source;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.base.NamedThreadFactory;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleUdpSource
+        extends BaseSource
+        implements EventDrivenSource, Configurable {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(SimpleUdpSource.class);
+
+    private static int UPD_BUFFER_DEFAULT_SIZE = 8192;
+
+    public SimpleUdpSource() {
+        super();
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void startSource() {
+        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+        // setup Netty server
+        serverBootstrap = new ConnectionlessBootstrap(
+                new NioDatagramChannelFactory(Executors
+                        .newCachedThreadPool(new NamedThreadFactory("udpSource-nettyWorker"
+                                + "-threadGroup")), maxThreads));
+        logger.info("Set max workers : {} ;",maxThreads);
+        serverBootstrap.setOption("receiveBufferSizePredictorFactory",
+                new FixedReceiveBufferSizePredictorFactory(UPD_BUFFER_DEFAULT_SIZE));
+        ChannelPipelineFactory fac = this.getChannelPiplineFactory();
+        serverBootstrap.setPipelineFactory(fac);
+        try {
+            if (host == null) {
+                nettyChannel =
+                        ((ConnectionlessBootstrap)serverBootstrap).bind(new InetSocketAddress(port));
+            } else {
+                nettyChannel =
+                        ((ConnectionlessBootstrap)serverBootstrap).bind(new InetSocketAddress(host, port));
+            }
+        } catch (Exception e) {
+            logger.error("Simple UDP Source error bind host {} port {}, program will exit!",
+                    new Object[] { host, port});
+            System.exit(-1);
+            //throw new FlumeException(e.getMessage());
+        }
+        allChannels.add(nettyChannel);
+        logger.info("Simple UDP Source started at host {}, port {}", host, port);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+    }
+
+    @Override
+    public void configure(Context context) {
+        super.configure(context);
+        try {
+            maxThreads = context.getInteger(ConfigConstants.MAX_THREADS, 32);
+        } catch (NumberFormatException e) {
+            logger.warn("Simple UDP Source max-threads property must specify an integer value.",
+                    context.getString(ConfigConstants.MAX_THREADS));
+        }
+    }
+
+    @Override
+    public String getProtocolName() {
+        return "udp";
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
new file mode 100644
index 0000000..19d0ede
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.source;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class UdpSourceTest {
+
+    @Test
+    public void configTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put(ConfigConstants.MAX_THREADS, "32");
+        map.put(ConfigConstants.CONFIG_PORT, "8080");
+        map.put(ConfigConstants.CONFIG_HOST, "127.0.0.1");
+        map.put(ConfigConstants.TOPIC, "topic");
+        map.put(ConfigConstants.ATTR, "{}");
+        Context context = new Context(map);
+        SimpleUdpSource udpSource = new SimpleUdpSource();
+        udpSource.configure(context);
+        int threadNum = udpSource.getContext().getInteger(ConfigConstants.MAX_THREADS);
+        Assert.assertEquals(threadNum, 32);
+    }
+}