You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2021/11/18 08:34:41 UTC

[incubator-inlong] branch master updated: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796 (#1797)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e117b3c  [INLONG-1796]DataProxy support monitor indicator with JMX. #1796 (#1797)
e117b3c is described below

commit e117b3c5b422706ff782f6b9ae0925c45a37ddb5
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Nov 18 16:34:32 2021 +0800

    [INLONG-1796]DataProxy support monitor indicator with JMX. #1796 (#1797)
    
    * [Feature]DataProxy support monitor indicator with JMX. #1796
    
    * [Feature]DataProxy support monitor indicator with JMX. #1796
    
    * [Feature]DataProxy support monitor indicator with JMX. #1796
    
    * [Feature]DataProxy support monitor indicator with JMX. #1796
    
    * fix PR reviews.
    
    * fix PR reviews.
    
    * fix checkstyle problem.
    
    * add more unit test case.
    
    * add apache licenses.
    
    * fix checkstyle problem.
    
    * fix checkstyle problem.
---
 .../commons/config/metrics/MetricItemSet.java      |  20 ++
 .../commons/config/metrics/MetricRegister.java     |   4 +-
 .../config/metrics/set/DataProxyMetricItemSet.java |   1 +
 .../config/metrics/set/TestMetricItemSetMBean.java |   4 +-
 inlong-dataproxy/dataproxy-source/pom.xml          | 106 ++++----
 .../config/holder/CacheClusterConfigHolder.java    | 139 ++++++++++
 .../config/holder/CommonPropertiesHolder.java      | 116 ++++++++
 .../config/holder/IdTopicConfigHolder.java         | 161 +++++++++++
 .../config/loader/CacheClusterConfigLoader.java    |  31 +++
 .../ClassResourceCommonPropertiesLoader.java       |  68 +++++
 .../config/loader/CommonPropertiesLoader.java      |  34 +++
 .../dataproxy/config/loader/ConfigLoader.java      |  31 +++
 .../loader/ContextCacheClusterConfigLoader.java    |  76 ++++++
 .../config/loader/ContextIdTopicConfigLoader.java  |  64 +++++
 .../config/loader/IdTopicConfigLoader.java         |  30 +++
 .../loader/ManagerCacheClusterConfigLoader.java    |  65 +++++
 .../config/loader/ManagerIdTopicConfigLoader.java  |  78 ++++++
 .../dataproxy/config/pojo/CacheClusterConfig.java  |  53 ++--
 .../inlong/dataproxy/config/pojo/CacheType.java    |  57 ++--
 .../inlong/dataproxy/config/pojo/DataType.java     |  57 ++--
 .../dataproxy/config/pojo/IdTopicConfig.java       | 178 +++++++++++++
 .../dataproxy/metrics/DataProxyMetricItem.java     | 117 ++++++++
 .../dataproxy/metrics}/DataProxyMetricItemSet.java |  28 +-
 .../inlong/dataproxy/metrics/MetricItemValue.java  |  84 ++++++
 .../inlong/dataproxy/metrics/MetricListener.java   |  39 +++
 .../dataproxy/metrics/MetricListenerRunnable.java  | 132 +++++++++
 .../inlong/dataproxy/metrics/MetricObserver.java   | 114 ++++++++
 .../prometheus/PrometheusMetricListener.java       |  42 +++
 .../apache/inlong/dataproxy/node/Application.java  |   5 +
 .../org/apache/inlong/dataproxy/sink/MetaSink.java |  89 ++++++-
 .../pulsar/federation/PulsarFederationSink.java    | 140 ++++++++++
 .../federation/PulsarFederationSinkContext.java    | 198 ++++++++++++++
 .../pulsar/federation/PulsarFederationWorker.java  | 142 ++++++++++
 .../pulsar/federation/PulsarProducerCluster.java   | 295 +++++++++++++++++++++
 .../federation/PulsarProducerFederation.java       | 163 ++++++++++++
 .../dataproxy/source/ServerMessageFactory.java     |  11 +-
 .../dataproxy/source/ServerMessageHandler.java     |  68 ++++-
 .../inlong/dataproxy/source/SimpleTcpSource.java   |  25 +-
 .../apache/inlong/dataproxy/utils/BufferQueue.java | 167 ++++++++++++
 .../apache/inlong/dataproxy/utils/Constants.java   |  33 +++
 .../inlong/dataproxy/utils/SizeSemaphore.java      | 126 +++++++++
 .../TestClassResourceCommonPropertiesLoader.java   |  47 ++++
 .../TestContextCacheClusterConfigLoader.java       |  84 ++++++
 .../loader/TestContextIdTopicConfigLoader.java     |  86 ++++++
 .../metrics/TestDataProxyMetricItemSet.java        |  20 +-
 .../metrics/TestMetricListenerRunnable.java        | 103 ++++---
 .../federation/TestPulsarFederationSink.java       |  90 +++++++
 .../federation/TestPulsarProducerFederation.java   | 100 +++++++
 .../apache/inlong/dataproxy/utils/MockUtils.java   | 157 +++++++++++
 .../src/test/resources/common.properties           |  22 ++
 .../dataproxy-source/src/test/resources/flume.conf |  86 ++++++
 51 files changed, 3944 insertions(+), 242 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
index 20fea7f..0426699 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
@@ -28,9 +28,29 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public abstract class MetricItemSet<T extends MetricItem> implements MetricItemSetMBean {
 
+    protected String name;
+
     protected Map<String, T> itemMap = new ConcurrentHashMap<>();
 
     /**
+     * Constructor
+     * 
+     * @param name
+     */
+    public MetricItemSet(String name) {
+        this.name = name;
+    }
+
+    /**
+     * getName
+     * 
+     * @return
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
      * createItem
      * 
      * @return
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
index d4a5fc5..e459dc9 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
@@ -61,8 +61,8 @@ public class MetricRegister {
     public static void register(MetricItemSet<? extends MetricItem> obj) {
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         StringBuilder beanName = new StringBuilder();
-        beanName.append(MetricUtils.getDomain(obj.getClass())).append(MetricItemMBean.DOMAIN_SEPARATOR).append("type=")
-                .append(obj.getClass().toString());
+        beanName.append(MetricUtils.getDomain(obj.getClass())).append(MetricItemMBean.DOMAIN_SEPARATOR).append("name=")
+                .append(obj.getName());
         String strBeanName = beanName.toString();
         try {
             ObjectName objName = new ObjectName(strBeanName);
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
index c50e5cd..0ba892c 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
@@ -31,6 +31,7 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
      * Constructor
      */
     private DataProxyMetricItemSet() {
+        super("DataProxy");
     }
 
     /**
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
index 4e70d63..e6b7cb0 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
@@ -123,8 +123,8 @@ public class TestMetricItemSetMBean {
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         StringBuilder beanName = new StringBuilder();
         beanName.append(MetricUtils.getDomain(DataProxyMetricItemSet.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
-                .append("type=")
-                .append(DataProxyMetricItemSet.class.toString());
+                .append("name=")
+                .append(itemSet.getName());
         String strBeanName = beanName.toString();
         ObjectName objName = new ObjectName(strBeanName);
         {
diff --git a/inlong-dataproxy/dataproxy-source/pom.xml b/inlong-dataproxy/dataproxy-source/pom.xml
index 7d2f1d0..31b1c5d 100644
--- a/inlong-dataproxy/dataproxy-source/pom.xml
+++ b/inlong-dataproxy/dataproxy-source/pom.xml
@@ -1,53 +1,65 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>org.apache.inlong</groupId>
-        <artifactId>inlong-dataproxy</artifactId>
-        <version>0.12.0-incubating-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <name>Apache InLong - DataProxy Source</name>
-    <artifactId>dataproxy-source</artifactId>
-
-    <properties>
-        <maven.compiler.source>8</maven.compiler.source>
-        <maven.compiler.target>8</maven.compiler.target>
-        <guava.version>19.0</guava.version>
-    </properties>
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<groupId>org.apache.inlong</groupId>
+		<artifactId>inlong-dataproxy</artifactId>
+		<version>0.12.0-incubating-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+	<name>Apache InLong - DataProxy Source</name>
+	<artifactId>dataproxy-source</artifactId>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>tubemq-client</artifactId>
-            <version>${project.version}</version>
-            <scope>compile</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-    </dependencies>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<compiler.source>1.8</compiler.source>
+		<compiler.target>1.8</compiler.target>
+		<junit.version>4.13</junit.version>
+		<guava.version>19.0</guava.version>
+		<skipTests>false</skipTests>
+	</properties>
 
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.inlong</groupId>
+			<artifactId>tubemq-client</artifactId>
+			<version>${project.version}</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.powermock</groupId>
+			<artifactId>powermock-module-junit4</artifactId>
+			<version>2.0.2</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.powermock</groupId>
+			<artifactId>powermock-api-mockito2</artifactId>
+			<version>2.0.2</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>${junit.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
 </project>
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
new file mode 100644
index 0000000..a792bfe
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import static org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader.CACHE_CLUSTER_CONFIG_TYPE;
+import static org.apache.inlong.dataproxy.config.loader.ConfigLoader.RELOAD_INTERVAL;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * CacheClusterConfigHolder
+ */
+public class CacheClusterConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CacheClusterConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private CacheClusterConfigLoader loader;
+
+    private List<CacheClusterConfig> configList;
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong(RELOAD_INTERVAL, 60000L);
+        String loaderType = context.getString(CACHE_CLUSTER_CONFIG_TYPE,
+                ContextCacheClusterConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof CacheClusterConfigLoader) {
+                this.loader = (CacheClusterConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextCacheClusterConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+
+            /**
+             * run
+             */
+            public void run() {
+                reload();
+            }
+        };
+        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        try {
+            this.configList = this.loader.load();
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * get configList
+     * 
+     * @return the configList
+     */
+    public List<CacheClusterConfig> getConfigList() {
+        return configList;
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
new file mode 100644
index 0000000..6d9aa95
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
+import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * CommonPropertiesHolder
+ */
+public class CommonPropertiesHolder {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CommonPropertiesHolder.class);
+    public static final String KEY_COMMON_PROPERTIES = "common-properties-loader";
+    public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
+
+    private static Map<String, String> props;
+
+    /**
+     * init
+     */
+    private static void init() {
+        synchronized (KEY_COMMON_PROPERTIES) {
+            if (props == null) {
+                props = new ConcurrentHashMap<>();
+                String loaderClassName = System.getenv(KEY_COMMON_PROPERTIES);
+                loaderClassName = (loaderClassName == null) ? DEFAULT_LOADER : loaderClassName;
+                try {
+                    Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
+                    Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+                    if (loaderObject instanceof CommonPropertiesLoader) {
+                        CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
+                        props.putAll(loader.load());
+                        LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
+                    }
+                } catch (Throwable t) {
+                    LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
+                            loaderClassName, t.getMessage());
+                    LOG.error(t.getMessage(), t);
+                }
+
+            }
+        }
+    }
+
+    /**
+     * get props
+     * 
+     * @return the props
+     */
+    public static Map<String, String> get() {
+        if (props != null) {
+            return props;
+        }
+        init();
+        return props;
+    }
+
+    /**
+     * Gets value mapped to key, returning defaultValue if unmapped.
+     * 
+     * @param  key          to be found
+     * @param  defaultValue returned if key is unmapped
+     * @return              value associated with key
+     */
+    public static String getString(String key, String defaultValue) {
+        return get().getOrDefault(key, defaultValue);
+    }
+
+    /**
+     * Gets value mapped to key, returning null if unmapped.
+     * 
+     * @param  key to be found
+     * @return     value associated with key or null if unmapped
+     */
+    public static String getString(String key) {
+        return get().get(key);
+    }
+
+    /**
+     * getStringFromContext
+     * 
+     * @param  context
+     * @param  key
+     * @param  defaultValue
+     * @return
+     */
+    public static String getStringFromContext(Context context, String key, String defaultValue) {
+        String value = context.getString(key);
+        value = (value != null) ? value : props.getOrDefault(key, defaultValue);
+        return value;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
new file mode 100644
index 0000000..acb653c
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import static org.apache.inlong.dataproxy.config.loader.ConfigLoader.RELOAD_INTERVAL;
+import static org.apache.inlong.dataproxy.config.loader.IdTopicConfigLoader.IDTOPIC_CONFIG_TYPE;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.IdTopicConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * IdTopicConfigHolder
+ */
+public class IdTopicConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(IdTopicConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private IdTopicConfigLoader loader;
+
+    private List<IdTopicConfig> configList = new ArrayList<>();
+    private Map<String, IdTopicConfig> configMap = new ConcurrentHashMap<>();
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong(RELOAD_INTERVAL, 60000L);
+        String loaderType = context.getString(IDTOPIC_CONFIG_TYPE, ContextIdTopicConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof IdTopicConfigLoader) {
+                this.loader = (IdTopicConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextIdTopicConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+
+            /**
+             * run
+             */
+            public void run() {
+                reload();
+            }
+        };
+        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        try {
+            List<IdTopicConfig> newConfigList = this.loader.load();
+            Map<String, IdTopicConfig> newConfigMap = new ConcurrentHashMap<>();
+            for (IdTopicConfig config : newConfigList) {
+                newConfigMap.put(config.getUid(), config);
+            }
+            this.configList = newConfigList;
+            this.configMap = newConfigMap;
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * get configList
+     * 
+     * @return the configList
+     */
+    public List<IdTopicConfig> getConfigList() {
+        return configList;
+    }
+
+    /**
+     * getTopic
+     * 
+     * @param  uid
+     * @return
+     */
+    public String getTopic(String uid) {
+        IdTopicConfig config = this.configMap.get(uid);
+        if (config != null) {
+            return config.getTopicName();
+        }
+        return null;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CacheClusterConfigLoader.java
new file mode 100644
index 0000000..f437eec
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CacheClusterConfigLoader.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+
+/**
+ * 
+ * CacheClusterConfigLoader
+ */
+public interface CacheClusterConfigLoader extends ConfigLoader<CacheClusterConfig>, Configurable {
+
+    String CACHE_CLUSTER_CONFIG = "cacheClusterConfig";
+    String CACHE_CLUSTER_CONFIG_TYPE = "cacheClusterConfig.type";
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
new file mode 100644
index 0000000..d029442
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * FileCommonPropertiesLoader
+ */
+public class ClassResourceCommonPropertiesLoader implements CommonPropertiesLoader {
+
+    public static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public Map<String, String> load() {
+        return this.loadProperties("common.properties");
+    }
+
+    /**
+     * loadProperties
+     * 
+     * @param  fileName
+     * @return
+     */
+    protected Map<String, String> loadProperties(String fileName) {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        try (InputStream inStream = getClass().getClassLoader().getResource(fileName).openStream()) {
+            Properties props = new Properties();
+            props.load(inStream);
+            for (Map.Entry<Object, Object> entry : props.entrySet()) {
+                result.put((String) entry.getKey(), (String) entry.getValue());
+            }
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
+        } catch (Exception e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
+        }
+        return result;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
new file mode 100644
index 0000000..fcf839d
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.Map;
+
+/**
+ * 
+ * CommonPropertiesLoader
+ */
+public interface CommonPropertiesLoader {
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    Map<String, String> load();
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ConfigLoader.java
new file mode 100644
index 0000000..536256a
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ConfigLoader.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.List;
+
+/**
+ * 
+ * ConfigLoader
+ */
+public interface ConfigLoader<T> {
+
+    String RELOAD_INTERVAL = "reloadInterval";
+
+    List<T> load();
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextCacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextCacheClusterConfigLoader.java
new file mode 100644
index 0000000..ff6ac98
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextCacheClusterConfigLoader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+/**
+ * 
+ * ContextCacheClusterConfigLoader
+ */
+public class ContextCacheClusterConfigLoader implements CacheClusterConfigLoader {
+
+    private Context context;
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public List<CacheClusterConfig> load() {
+        String clusterNames = context.getString(CACHE_CLUSTER_CONFIG);
+        if (StringUtils.isBlank(clusterNames)) {
+            return new ArrayList<>();
+        }
+        //
+        String[] clusterNameArray = StringUtils.split(clusterNames);
+        Set<String> clusterNameSet = new HashSet<>();
+        clusterNameSet.addAll(Arrays.asList(clusterNameArray));
+        //
+        List<CacheClusterConfig> configList = new ArrayList<>(clusterNameSet.size());
+        for (String clusterName : clusterNameSet) {
+            CacheClusterConfig config = new CacheClusterConfig();
+            configList.add(config);
+            config.setClusterName(clusterName);
+            Map<String, String> params = context.getSubProperties("cacheClusterConfig." + clusterName + ".");
+            config.setParams(params);
+        }
+        return configList;
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextIdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextIdTopicConfigLoader.java
new file mode 100644
index 0000000..56fd240
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextIdTopicConfigLoader.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+
+/**
+ * 
+ * ContextIdTopicConfigLoader
+ */
+public class ContextIdTopicConfigLoader implements IdTopicConfigLoader {
+
+    private Context context;
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public List<IdTopicConfig> load() {
+        Map<String, String> idTopicMap = context.getSubProperties("idTopicConfig.");
+        List<IdTopicConfig> configList = new ArrayList<>(idTopicMap.size());
+        for (Entry<String, String> entry : idTopicMap.entrySet()) {
+            IdTopicConfig config = new IdTopicConfig();
+            config.setInlongGroupId(entry.getKey());
+            config.setTopicName(entry.getValue());
+            configList.add(config);
+        }
+        return configList;
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/IdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/IdTopicConfigLoader.java
new file mode 100644
index 0000000..548c9e3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/IdTopicConfigLoader.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+
+/**
+ * 
+ * IdTopicConfigLoader
+ */
+public interface IdTopicConfigLoader extends ConfigLoader<IdTopicConfig>, Configurable {
+
+    String IDTOPIC_CONFIG_TYPE = "idTopicConfig.type";
+}
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerCacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerCacheClusterConfigLoader.java
new file mode 100644
index 0000000..6eb5c2e
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerCacheClusterConfigLoader.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.inlong.commons.pojo.dataproxy.CacheClusterObject;
+import org.apache.inlong.commons.pojo.dataproxy.DataProxyCluster;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+
+/**
+ * 
+ * ManagerCacheClusterConfigLoader
+ */
+public class ManagerCacheClusterConfigLoader implements CacheClusterConfigLoader {
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public List<CacheClusterConfig> load() {
+        List<CacheClusterConfig> configList = new ArrayList<>();
+        DataProxyCluster dataProxyCluster = RemoteConfigManager.getInstance().getCurrentClusterConfig();
+        if (dataProxyCluster == null) {
+            return configList;
+        }
+        for (CacheClusterObject obj : dataProxyCluster.getCacheClusterSet().getCacheClusters()) {
+            CacheClusterConfig config = new CacheClusterConfig();
+            config.setClusterName(obj.getName());
+            config.getParams().putAll(obj.getParams());
+            configList.add(config);
+        }
+        return configList;
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerIdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerIdTopicConfigLoader.java
new file mode 100644
index 0000000..b48fe05
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerIdTopicConfigLoader.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.inlong.commons.pojo.dataproxy.DataProxyCluster;
+import org.apache.inlong.commons.pojo.dataproxy.InLongIdObject;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.pojo.DataType;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+
+/**
+ * 
+ * ManagerIdTopicConfigLoader
+ */
+public class ManagerIdTopicConfigLoader implements IdTopicConfigLoader {
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public List<IdTopicConfig> load() {
+        List<IdTopicConfig> configList = new ArrayList<>();
+        DataProxyCluster dataProxyCluster = RemoteConfigManager.getInstance().getCurrentClusterConfig();
+        if (dataProxyCluster == null) {
+            return configList;
+        }
+        for (InLongIdObject obj : dataProxyCluster.getProxyCluster().getInlongIds()) {
+            IdTopicConfig config = new IdTopicConfig();
+            String id = obj.getInlongId();
+            String[] ids = id.split("\\.");
+            if (ids.length == 2) {
+                config.setInlongGroupId(ids[0]);
+                config.setInlongStreamid(ids[1]);
+            } else {
+                config.setInlongGroupId(id);
+            }
+            config.setTopicName(obj.getTopic());
+            Map<String, String> params = obj.getParams();
+            config.setDataType(DataType.convert(params.getOrDefault("dataType", DataType.TEXT.value())));
+            config.setFieldDelimiter(params.getOrDefault("fieldDelimiter", "|"));
+            config.setFileDelimiter(params.getOrDefault("fileDelimiter", "\n"));
+            configList.add(config);
+        }
+        return configList;
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+    }
+
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
similarity index 52%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
copy to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
index c50e5cd..099e731 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
@@ -15,49 +15,54 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.commons.config.metrics.set;
+package org.apache.inlong.dataproxy.config.pojo;
 
-import org.apache.inlong.commons.config.metrics.MetricItemSet;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * 
- * DataProxyMetricItemSet
+ * CacheClusterConfig
  */
-public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+public class CacheClusterConfig {
 
-    private static DataProxyMetricItemSet instance;
+    private String clusterName;
+    private Map<String, String> params = new HashMap<>();
 
     /**
-     * Constructor
+     * get clusterName
+     * 
+     * @return the clusterName
+     */
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    /**
+     * set clusterName
+     * 
+     * @param clusterName the clusterName to set
      */
-    private DataProxyMetricItemSet() {
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
     }
 
     /**
-     * getInstance
+     * get params
      * 
-     * @return
+     * @return the params
      */
-    public static DataProxyMetricItemSet getInstance() {
-        if (instance != null) {
-            return instance;
-        }
-        synchronized (DataProxyMetricItemSet.class) {
-            if (instance == null) {
-                instance = new DataProxyMetricItemSet();
-            }
-        }
-        return instance;
+    public Map<String, String> getParams() {
+        return params;
     }
 
     /**
-     * createItem
+     * set params
      * 
-     * @return
+     * @param params the params to set
      */
-    @Override
-    protected DataProxyMetricItem createItem() {
-        return new DataProxyMetricItem();
+    public void setParams(Map<String, String> params) {
+        this.params = params;
     }
 
 }
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
similarity index 56%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
copy to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
index c50e5cd..b224777 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
@@ -15,49 +15,56 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.commons.config.metrics.set;
-
-import org.apache.inlong.commons.config.metrics.MetricItemSet;
+package org.apache.inlong.dataproxy.config.pojo;
 
 /**
- * 
- * DataProxyMetricItemSet
+ * cache cluster type
  */
-public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+public enum CacheType {
+
+    TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), N("n");
 
-    private static DataProxyMetricItemSet instance;
+    private final String value;
 
     /**
+     * 
      * Constructor
+     * 
+     * @param value
      */
-    private DataProxyMetricItemSet() {
+    private CacheType(String value) {
+        this.value = value;
     }
 
     /**
-     * getInstance
-     * 
+     * value
+     *
      * @return
      */
-    public static DataProxyMetricItemSet getInstance() {
-        if (instance != null) {
-            return instance;
-        }
-        synchronized (DataProxyMetricItemSet.class) {
-            if (instance == null) {
-                instance = new DataProxyMetricItemSet();
-            }
-        }
-        return instance;
+    public String value() {
+        return this.value;
     }
 
     /**
-     * createItem
-     * 
-     * @return
+     * toString
      */
     @Override
-    protected DataProxyMetricItem createItem() {
-        return new DataProxyMetricItem();
+    public String toString() {
+        return this.name() + ":" + this.value;
     }
 
+    /**
+     * convert
+     *
+     * @param  value
+     * @return
+     */
+    public static CacheType convert(String value) {
+        for (CacheType v : values()) {
+            if (v.value().equals(value)) {
+                return v;
+            }
+        }
+        return N;
+    }
 }
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
similarity index 56%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
copy to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
index c50e5cd..c09433d 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
@@ -15,49 +15,56 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.commons.config.metrics.set;
-
-import org.apache.inlong.commons.config.metrics.MetricItemSet;
+package org.apache.inlong.dataproxy.config.pojo;
 
 /**
- * 
- * DataProxyMetricItemSet
+ * data content type
  */
-public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+public enum DataType {
+
+    TEXT("text"), PB("pb"), JCE("jce"), N("n");
 
-    private static DataProxyMetricItemSet instance;
+    private final String value;
 
     /**
+     * 
      * Constructor
+     * 
+     * @param value
      */
-    private DataProxyMetricItemSet() {
+    private DataType(String value) {
+        this.value = value;
     }
 
     /**
-     * getInstance
-     * 
+     * value
+     *
      * @return
      */
-    public static DataProxyMetricItemSet getInstance() {
-        if (instance != null) {
-            return instance;
-        }
-        synchronized (DataProxyMetricItemSet.class) {
-            if (instance == null) {
-                instance = new DataProxyMetricItemSet();
-            }
-        }
-        return instance;
+    public String value() {
+        return this.value;
     }
 
     /**
-     * createItem
-     * 
-     * @return
+     * toString
      */
     @Override
-    protected DataProxyMetricItem createItem() {
-        return new DataProxyMetricItem();
+    public String toString() {
+        return this.name() + ":" + this.value;
     }
 
+    /**
+     * convert
+     *
+     * @param  value
+     * @return
+     */
+    public static DataType convert(String value) {
+        for (DataType v : values()) {
+            if (v.value().equals(value)) {
+                return v;
+            }
+        }
+        return N;
+    }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
new file mode 100644
index 0000000..c8a1fa3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.pojo;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * 
+ * IdTopicConfig
+ */
+public class IdTopicConfig {
+
+    private String uid;
+    private String inlongGroupId;
+    private String inlongStreamid;
+    private String topicName;
+    private DataType dataType = DataType.TEXT;
+    private String fieldDelimiter = "|";
+    private String fileDelimiter = "\n";
+
+    /**
+     * get uid
+     * 
+     * @return the uid
+     */
+    public String getUid() {
+        return uid;
+    }
+
+    /**
+     * generateUid
+     * 
+     * @param  inlongGroupId
+     * @param  inlongStreamId
+     * @return
+     */
+    public static String generateUid(String inlongGroupId, String inlongStreamId) {
+        if (StringUtils.isBlank(inlongGroupId)) {
+            if (StringUtils.isBlank(inlongStreamId)) {
+                return "";
+            } else {
+                return inlongStreamId;
+            }
+        } else {
+            if (StringUtils.isBlank(inlongStreamId)) {
+                return inlongGroupId;
+            } else {
+                return inlongGroupId + "." + inlongStreamId;
+            }
+        }
+    }
+
+    /**
+     * get inlongGroupId
+     * 
+     * @return the inlongGroupId
+     */
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    /**
+     * set inlongGroupId
+     * 
+     * @param inlongGroupId the inlongGroupId to set
+     */
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
+        this.uid = generateUid(this.inlongGroupId, this.inlongStreamid);
+    }
+
+    /**
+     * get inlongStreamid
+     * 
+     * @return the inlongStreamid
+     */
+    public String getInlongStreamid() {
+        return inlongStreamid;
+    }
+
+    /**
+     * set inlongStreamid
+     * 
+     * @param inlongStreamid the inlongStreamid to set
+     */
+    public void setInlongStreamid(String inlongStreamid) {
+        this.inlongStreamid = inlongStreamid;
+        this.uid = generateUid(this.inlongGroupId, this.inlongStreamid);
+    }
+
+    /**
+     * get topicName
+     * 
+     * @return the topicName
+     */
+    public String getTopicName() {
+        return topicName;
+    }
+
+    /**
+     * set topicName
+     * 
+     * @param topicName the topicName to set
+     */
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    /**
+     * get dataType
+     * 
+     * @return the dataType
+     */
+    public DataType getDataType() {
+        return dataType;
+    }
+
+    /**
+     * set dataType
+     * 
+     * @param dataType the dataType to set
+     */
+    public void setDataType(DataType dataType) {
+        this.dataType = dataType;
+    }
+
+    /**
+     * get fieldDelimiter
+     * 
+     * @return the fieldDelimiter
+     */
+    public String getFieldDelimiter() {
+        return fieldDelimiter;
+    }
+
+    /**
+     * set fieldDelimiter
+     * 
+     * @param fieldDelimiter the fieldDelimiter to set
+     */
+    public void setFieldDelimiter(String fieldDelimiter) {
+        this.fieldDelimiter = fieldDelimiter;
+    }
+
+    /**
+     * get fileDelimiter
+     * 
+     * @return the fileDelimiter
+     */
+    public String getFileDelimiter() {
+        return fileDelimiter;
+    }
+
+    /**
+     * set fileDelimiter
+     * 
+     * @param fileDelimiter the fileDelimiter to set
+     */
+    public void setFileDelimiter(String fileDelimiter) {
+        this.fileDelimiter = fileDelimiter;
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
new file mode 100644
index 0000000..328078e
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.flume.Event;
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.dataproxy.utils.Constants;
+
+/**
+ * 
+ * DataProxyMetricItem
+ */
+@MetricDomain(name = "DataProxy")
+public class DataProxyMetricItem extends MetricItem {
+
+    public static final String KEY_CLUSTER_ID = "clusterId";
+    public static final String KEY_SOURCE_ID = "sourceId";
+    public static final String KEY_SOURCE_DATA_ID = "sourceDataId";
+    public static final String KEY_INLONG_GROUP_ID = "inlongGroupId";
+    public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
+    public static final String KEY_SINK_ID = "sinkId";
+    public static final String KEY_SINK_DATA_ID = "sinkDataId";
+    //
+    public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
+    public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
+    public static final String M_READ_FAIL_COUNT = "readFailCount";
+    public static final String M_READ_FAIL_SIZE = "readFailSize";
+    public static final String M_SEND_COUNT = "sendCount";
+    public static final String M_SEND_SIZE = "sendSize";
+    public static final String M_SEND_SUCCESS_COUNT = "sendSuccessCount";
+    public static final String M_SEND_SUCCESS_SIZE = "sendSuccessSize";
+    public static final String M_SEND_FAIL_COUNT = "sendFailCount";
+    public static final String M_SEND_FAIL_SIZE = "sendFailSize";
+    //
+    public static final String M_SINK_DURATION = "sinkDuration";
+    public static final String M_NODE_DURATION = "nodeDuration";
+    public static final String M_WHOLE_DURATION = "wholeDuration";
+
+    @Dimension
+    public String clusterId;
+    @Dimension
+    public String sourceId;
+    @Dimension
+    public String sourceDataId;
+    @Dimension
+    public String inlongGroupId;
+    @Dimension
+    public String inlongStreamId;
+    @Dimension
+    public String sinkId;
+    @Dimension
+    public String sinkDataId;
+    @CountMetric
+    public AtomicLong readSuccessCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readSuccessSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readFailCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readFailSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSuccessCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSuccessSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendFailCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendFailSize = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - sinkBeginTime(milliseconds)
+    public AtomicLong sinkDuration = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - sourceReceiveTime(milliseconds)
+    public AtomicLong nodeDuration = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - eventCreateTime(milliseconds)
+    public AtomicLong wholeDuration = new AtomicLong(0);
+
+    /**
+     * fillInlongId
+     * 
+     * @param event
+     * @param dimensions
+     */
+    public static void fillInlongId(Event event, Map<String, String> dimensions) {
+        Map<String, String> headers = event.getHeaders();
+        String inlongGroupId = headers.getOrDefault(Constants.INLONG_GROUP_ID, "");
+        String inlongStreamId = headers.getOrDefault(Constants.INLONG_STREAM_ID, "");
+        dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+        dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+    }
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
similarity index 68%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
copy to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index c50e5cd..bf8d042 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -15,39 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.commons.config.metrics.set;
+package org.apache.inlong.dataproxy.metrics;
 
+import org.apache.inlong.commons.config.metrics.MetricDomain;
 import org.apache.inlong.commons.config.metrics.MetricItemSet;
 
 /**
  * 
- * DataProxyMetricItemSet
+ * MetaSinkMetricItemSet
  */
+@MetricDomain(name = "DataProxy")
 public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
 
-    private static DataProxyMetricItemSet instance;
-
     /**
      * Constructor
-     */
-    private DataProxyMetricItemSet() {
-    }
-
-    /**
-     * getInstance
      * 
-     * @return
+     * @param name
      */
-    public static DataProxyMetricItemSet getInstance() {
-        if (instance != null) {
-            return instance;
-        }
-        synchronized (DataProxyMetricItemSet.class) {
-            if (instance == null) {
-                instance = new DataProxyMetricItemSet();
-            }
-        }
-        return instance;
+    public DataProxyMetricItemSet(String name) {
+        super(name);
     }
 
     /**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricItemValue.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricItemValue.java
new file mode 100644
index 0000000..6c5f2b7
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricItemValue.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.Map;
+
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.apache.pulsar.shade.com.google.gson.Gson;
+
+/**
+ * 
+ * MetricItemValue
+ */
+public class MetricItemValue {
+
+    private final String key;
+    private final Map<String, String> dimensions;
+    private final Map<String, MetricValue> metrics;
+
+    /**
+     * Constructor
+     * 
+     * @param key
+     * @param dimensions
+     * @param metrics
+     */
+    public MetricItemValue(String key, Map<String, String> dimensions, Map<String, MetricValue> metrics) {
+        this.key = key;
+        this.dimensions = dimensions;
+        this.metrics = metrics;
+    }
+
+    /**
+     * get key
+     * 
+     * @return the key
+     */
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * get dimensions
+     * 
+     * @return the dimensions
+     */
+    public Map<String, String> getDimensions() {
+        return dimensions;
+    }
+
+    /**
+     * get metrics
+     * 
+     * @return the metrics
+     */
+    public Map<String, MetricValue> getMetrics() {
+        return metrics;
+    }
+
+    /**
+     * toString
+     * 
+     * @return
+     */
+    public String toString() {
+        Gson gson = new Gson();
+        return gson.toJson(this);
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListener.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListener.java
new file mode 100644
index 0000000..7ae89f3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListener.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.List;
+
+/**
+ * 
+ * MetricListener
+ */
+public interface MetricListener {
+
+    String KEY_METRIC_DOMAINS = "metricDomains";
+    String KEY_DOMAIN_LISTENERS = "domainListeners";
+    String KEY_SNAPSHOT_INTERVAL = "snapshotInterval";
+
+    /**
+     * snapshot
+     * 
+     * @param domain
+     * @param itemValues
+     */
+    public void snapshot(String domain, List<MetricItemValue> itemValues);
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java
new file mode 100644
index 0000000..fcab604
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricListenerRunnable
+ */
+public class MetricListenerRunnable implements Runnable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
+
+    private String domain;
+    private List<MetricListener> listenerList;
+
+    /**
+     * Constructor
+     * 
+     * @param domain
+     * @param listenerList
+     */
+    public MetricListenerRunnable(String domain, List<MetricListener> listenerList) {
+        this.domain = domain;
+        this.listenerList = listenerList;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        LOG.info("begin to snapshot metric:{}", domain);
+        try {
+            List<MetricItemValue> itemValues = this.getItemValues();
+            LOG.info("snapshot metric:{},size:{}", domain, itemValues.size());
+            this.listenerList.forEach((item) -> {
+                item.snapshot(domain, itemValues);
+            });
+        } catch (Throwable t) {
+            LOG.error(t.getMessage(), t);
+        }
+        LOG.info("end to snapshot metric:{}", domain);
+    }
+
+    /**
+     * getItemValues
+     * 
+     * @return                              MetricItemValue List
+     * @throws InstanceNotFoundException
+     * @throws AttributeNotFoundException
+     * @throws ReflectionException
+     * @throws MBeanException
+     * @throws MalformedObjectNameException
+     * @throws ClassNotFoundException
+     */
+    @SuppressWarnings("unchecked")
+    private List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException,
+            ReflectionException, MBeanException, MalformedObjectNameException, ClassNotFoundException {
+        ObjectName objName = new ObjectName(domain + MetricItemMBean.DOMAIN_SEPARATOR + "*");
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
+        LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans);
+        List<MetricItemValue> itemValues = new ArrayList<>();
+        for (ObjectInstance mbean : mbeans) {
+            String className = mbean.getClassName();
+            Class<?> clazz = ClassUtils.getClass(className);
+            if (ClassUtils.isAssignable(clazz, MetricItemMBean.class)) {
+                ObjectName metricObjectName = mbean.getObjectName();
+                String dimensionsKey = (String) mbs.getAttribute(metricObjectName,
+                        MetricItemMBean.ATTRIBUTE_KEY);
+                Map<String, String> dimensions = (Map<String, String>) mbs
+                        .getAttribute(metricObjectName, MetricItemMBean.ATTRIBUTE_DIMENSIONS);
+                Map<String, MetricValue> metrics = (Map<String, MetricValue>) mbs
+                        .invoke(metricObjectName, MetricItemMBean.METHOD_SNAPSHOT, null, null);
+                MetricItemValue itemValue = new MetricItemValue(dimensionsKey, dimensions, metrics);
+                LOG.info("MetricItemMBean get itemValue:{}", itemValue);
+                itemValues.add(itemValue);
+            } else if (ClassUtils.isAssignable(clazz, MetricItemSetMBean.class)) {
+                ObjectName metricObjectName = mbean.getObjectName();
+                List<MetricItem> items = (List<MetricItem>) mbs.invoke(metricObjectName,
+                        MetricItemMBean.METHOD_SNAPSHOT, null, null);
+                for (MetricItem item : items) {
+                    String dimensionsKey = item.getDimensionsKey();
+                    Map<String, String> dimensions = item.getDimensions();
+                    Map<String, MetricValue> metrics = item.snapshot();
+                    MetricItemValue itemValue = new MetricItemValue(dimensionsKey, dimensions, metrics);
+                    LOG.info("MetricItemSetMBean get itemValue:{}", itemValue);
+                    itemValues.add(itemValue);
+                }
+            }
+        }
+        return itemValues;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricObserver.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricObserver.java
new file mode 100644
index 0000000..50d9a82
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricObserver.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricObserver
+ */
+public class MetricObserver {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
+    private static final AtomicBoolean isInited = new AtomicBoolean(false);
+    private static ScheduledExecutorService statExecutor = Executors.newScheduledThreadPool(5);
+
+    /**
+     * init
+     * 
+     * @param commonProperties
+     */
+    public static void init(Map<String, String> commonProperties) {
+        if (!isInited.compareAndSet(false, true)) {
+            return;
+        }
+        // init
+        Context context = new Context(commonProperties);
+        // get domain name list
+        String metricDomains = context.getString(MetricListener.KEY_METRIC_DOMAINS);
+        if (StringUtils.isBlank(metricDomains)) {
+            return;
+        }
+        // split domain name
+        String[] domains = metricDomains.split("\\s+");
+        for (String domain : domains) {
+            // get domain parameters
+            Context domainContext = new Context(
+                    context.getSubProperties(MetricListener.KEY_METRIC_DOMAINS + "." + domain + "."));
+            List<MetricListener> listenerList = parseDomain(domain, domainContext);
+            // no listener
+            if (listenerList == null || listenerList.size() <= 0) {
+                continue;
+            }
+            // get snapshot interval
+            long snapshotInterval = domainContext.getLong(MetricListener.KEY_SNAPSHOT_INTERVAL, 60000L);
+            LOG.info("begin to register domain:{},MetricListeners:{},snapshotInterval:{}", domain, listenerList,
+                    snapshotInterval);
+            statExecutor.scheduleWithFixedDelay(new MetricListenerRunnable(domain, listenerList), snapshotInterval,
+                    snapshotInterval, TimeUnit.MILLISECONDS);
+        }
+
+    }
+
+    /**
+     * parseDomain
+     * 
+     * @param  domain
+     * @param  context
+     * @return
+     */
+    private static List<MetricListener> parseDomain(String domain, Context domainContext) {
+        String listeners = domainContext.getString(MetricListener.KEY_DOMAIN_LISTENERS);
+        if (StringUtils.isBlank(listeners)) {
+            return null;
+        }
+        String[] listenerTypes = listeners.split("\\s+");
+        List<MetricListener> listenerList = new ArrayList<>();
+        for (String listenerType : listenerTypes) {
+            // new listener object
+            try {
+                Class<?> listenerClass = ClassUtils.getClass(listenerType);
+                Object listenerObject = listenerClass.getDeclaredConstructor().newInstance();
+                if (listenerObject == null || !(listenerObject instanceof MetricListener)) {
+                    LOG.error("{} is not instance of MetricListener.", listenerType);
+                    continue;
+                }
+                final MetricListener listener = (MetricListener) listenerObject;
+                listenerList.add(listener);
+            } catch (Throwable t) {
+                LOG.error("Fail to init MetricListener:{},error:{}",
+                        listenerType, t.getMessage());
+                continue;
+            }
+        }
+        return listenerList;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
new file mode 100644
index 0000000..7c35f96
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics.prometheus;
+
+import java.util.List;
+
+import org.apache.inlong.dataproxy.metrics.MetricItemValue;
+import org.apache.inlong.dataproxy.metrics.MetricListener;
+
+/**
+ * 
+ * PrometheusMetricListener
+ */
+public class PrometheusMetricListener implements MetricListener {
+
+    /**
+     * snapshot
+     * 
+     * @param domain
+     * @param itemValues
+     */
+    @Override
+    public void snapshot(String domain, List<MetricItemValue> itemValues) {
+        // TODO Auto-generated method stub
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
index b7307cf..a009d76 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
@@ -54,6 +54,7 @@ import org.apache.flume.util.SSLUtil;
 import org.apache.inlong.commons.config.IDataProxyConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.metrics.MetricObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -385,6 +386,10 @@ public class Application {
                     application.handleConfigurationEvent(configurationProvider.getConfiguration());
                 }
             }
+            //metrics
+            MetricObserver.init(ConfigManager.getInstance().getCommonProperties());
+            
+            //start application
             application.start();
 
             final Application appReference = application;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
index 034b2b4..159f860 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
@@ -17,11 +17,6 @@
 
 package org.apache.inlong.dataproxy.sink;
 
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,10 +39,14 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.flume.source.shaded.guava.RateLimiter;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.utils.Constants;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -58,14 +57,21 @@ import org.apache.inlong.tubemq.client.producer.MessageSentResult;
 import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
 import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 public class MetaSink extends AbstractSink implements Configurable {
 
     private static final Logger logger = LoggerFactory.getLogger(MetaSink.class);
     private static int MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
     private static final String TUBE_REQUEST_TIMEOUT = "tube-request-timeout";
+    private static final String KEY_DISK_IO_RATE_PER_SEC = "disk-io-rate-per-sec";
 
     private static int BAD_EVENT_QUEUE_SIZE = 10000;
 
@@ -135,6 +141,9 @@ public class MetaSink extends AbstractSink implements Configurable {
     private long sessionMaxAllowedDelayedMsgCount;
     private long nettyWriteBufferHighWaterMark;
     private int recoverthreadcount;
+    //
+    private Map<String, String> dimensions;
+    private DataProxyMetricItemSet metricItemSet;
 
     private static final LoadingCache<String, Long> agentIdCache = CacheBuilder
             .newBuilder().concurrencyLevel(4 * 8).initialCapacity(5000000).expireAfterAccess(30, TimeUnit.SECONDS)
@@ -321,6 +330,14 @@ public class MetaSink extends AbstractSink implements Configurable {
 
     @Override
     public void start() {
+        this.dimensions = new HashMap<>();
+        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
+        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+        //register metrics
+        this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+        MetricRegister.register(metricItemSet);
+        
+        //create tube connection
         try {
             createConnection();
         } catch (FlumeException e) {
@@ -521,16 +538,20 @@ public class MetaSink extends AbstractSink implements Configurable {
 
     public class MyCallback implements MessageSentCallback {
         private EventStat myEventStat;
+        private long sendTime;
 
         public MyCallback(EventStat eventStat) {
             this.myEventStat = eventStat;
+            this.sendTime = System.currentTimeMillis();
         }
 
         @Override
         public void onMessageSent(final MessageSentResult result) {
             if (result.isSuccess()) {
                 // TODO: add stats
+                this.addMetric(myEventStat.getEvent(), true, sendTime);
             } else {
+                this.addMetric(myEventStat.getEvent(), false, 0);
                 if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
                     logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
                             result.getErrMsg(), resendQueue.size(),
@@ -547,6 +568,44 @@ public class MetaSink extends AbstractSink implements Configurable {
             }
         }
 
+        /**
+         * addMetric
+         * 
+         * @param currentRecord
+         * @param topic
+         * @param result
+         * @param size
+         */
+        private void addMetric(Event currentRecord, boolean result, long sendTime) {
+            Map<String, String> dimensions = new HashMap<>();
+            dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, MetaSink.this.getName());
+            dimensions.put(DataProxyMetricItem.KEY_SINK_ID, MetaSink.this.getName());
+            if (currentRecord.getHeaders().containsKey(TOPIC)) {
+                dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, currentRecord.getHeaders().get(TOPIC));
+            } else {
+                dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
+            }
+            DataProxyMetricItem metricItem = MetaSink.this.metricItemSet.findMetricItem(dimensions);
+            if (result) {
+                metricItem.sendSuccessCount.incrementAndGet();
+                metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
+                if (sendTime > 0) {
+                    long currentTime = System.currentTimeMillis();
+                    long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+                            sendTime);
+                    long sinkDuration = currentTime - sendTime;
+                    long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
+                    long wholeDuration = currentTime - msgTime;
+                    metricItem.sinkDuration.addAndGet(sinkDuration);
+                    metricItem.nodeDuration.addAndGet(nodeDuration);
+                    metricItem.wholeDuration.addAndGet(wholeDuration);
+                }
+            } else {
+                metricItem.sendFailCount.incrementAndGet();
+                metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
+            }
+        }
+
         @Override
         public void onException(final Throwable e) {
             Throwable t = e;
@@ -613,6 +672,15 @@ public class MetaSink extends AbstractSink implements Configurable {
                     tx.rollback();
                 } else {
                     tx.commit();
+                    // metric
+                    if (event.getHeaders().containsKey(TOPIC)) {
+                        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().get(TOPIC));
+                    } else {
+                        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
+                    }
+                    DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
+                    metricItem.readFailCount.incrementAndGet();
+                    metricItem.readFailSize.addAndGet(event.getBody().length);
                 }
             } else {
 
@@ -712,7 +780,7 @@ public class MetaSink extends AbstractSink implements Configurable {
         sinkThreadPool = new Thread[threadNum];
         eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
 
-        diskIORatePerSec = context.getLong("disk-io-rate-per-sec", 0L);
+        diskIORatePerSec = context.getLong(KEY_DISK_IO_RATE_PER_SEC, 0L);
         if (diskIORatePerSec != 0) {
             diskRateLimiter = RateLimiter.create(diskIORatePerSec);
         }
@@ -728,4 +796,13 @@ public class MetaSink extends AbstractSink implements Configurable {
         recoverthreadcount = context.getInteger(ConfigConstants.RECOVER_THREAD_COUNT,
                 Runtime.getRuntime().availableProcessors() + 1);
     }
+
+    /**
+     * get metricItemSet
+     * @return the metricItemSet
+     */
+    public DataProxyMetricItemSet getMetricItemSet() {
+        return metricItemSet;
+    }
+    
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
new file mode 100644
index 0000000..b0bb2a4
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * PulsarSetSink
+ */
+public class PulsarFederationSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationSink.class);
+
+    private PulsarFederationSinkContext context;
+    private List<PulsarFederationWorker> workers = new ArrayList<>();
+    private Map<String, String> dimensions;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        String sinkName = this.getName();
+        // create worker
+        for (int i = 0; i < context.getMaxThreads(); i++) {
+            PulsarFederationWorker worker = new PulsarFederationWorker(sinkName, i, context);
+            worker.start();
+            this.workers.add(worker);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        for (PulsarFederationWorker worker : workers) {
+            try {
+                worker.close();
+            } catch (Throwable e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        this.context.close();
+        super.stop();
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.context = new PulsarFederationSinkContext(this.getName(), context);
+        this.dimensions = new HashMap<>();
+        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
+        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            //
+            int eventSize = event.getBody().length;
+            if (!this.context.getBufferQueue().tryAcquire(eventSize)) {
+                // record the failure of queue full for monitor
+                // metric
+                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+                metricItem.readFailCount.incrementAndGet();
+                metricItem.readFailSize.addAndGet(eventSize);
+                //
+                tx.rollback();
+                return Status.BACKOFF;
+            }
+            this.context.getBufferQueue().offer(event);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+                // metric
+                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+                metricItem.readFailCount.incrementAndGet();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java
new file mode 100644
index 0000000..a62abeb
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.utils.BufferQueue;
+
+/**
+ * 
+ * PulsarFederationContext
+ */
+public class PulsarFederationSinkContext {
+
+    public static final String KEY_MAX_THREADS = "max-threads";
+    public static final String KEY_MAXTRANSACTION = "maxTransaction";
+    public static final String KEY_PROCESSINTERVAL = "processInterval";
+    public static final String KEY_RELOADINTERVAL = "reloadInterval";
+    public static final String KEY_MAXBUFFERQUEUESIZE = "maxBufferQueueSize";
+    public static final String PREFIX_PRODUCER = "producer.";
+
+    private final String proxyClusterId;
+    private final Context sinkContext;
+    private final Context producerContext;
+    //
+    private final IdTopicConfigHolder idTopicHolder;
+    private final CacheClusterConfigHolder cacheHolder;
+    private final BufferQueue<Event> bufferQueue;
+    //
+    private final int maxThreads;
+    private final int maxTransaction;
+    private final long processInterval;
+    private final long reloadInterval;
+    //
+    private final DataProxyMetricItemSet metricItemSet;
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     */
+    public PulsarFederationSinkContext(String sinkName, Context context) {
+        this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+        this.sinkContext = context;
+        this.maxThreads = context.getInteger(KEY_MAX_THREADS, 10);
+        this.maxTransaction = context.getInteger(KEY_MAXTRANSACTION, 1);
+        this.processInterval = context.getInteger(KEY_PROCESSINTERVAL, 100);
+        this.reloadInterval = context.getLong(KEY_RELOADINTERVAL, 60000L);
+        //
+        this.idTopicHolder = new IdTopicConfigHolder();
+        this.idTopicHolder.configure(context);
+        this.idTopicHolder.start();
+        //
+        this.cacheHolder = new CacheClusterConfigHolder();
+        this.cacheHolder.configure(context);
+        this.cacheHolder.start();
+        //
+        int maxBufferQueueSize = context.getInteger(KEY_MAXBUFFERQUEUESIZE, 128 * 1024);
+        this.bufferQueue = new BufferQueue<Event>(maxBufferQueueSize);
+        //
+        Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
+        this.producerContext = new Context(producerParams);
+        //
+        this.metricItemSet = new DataProxyMetricItemSet(sinkName);
+        MetricRegister.register(this.metricItemSet);
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        this.idTopicHolder.close();
+        this.cacheHolder.close();
+    }
+
+    /**
+     * get proxyClusterId
+     * 
+     * @return the proxyClusterId
+     */
+    public String getProxyClusterId() {
+        return proxyClusterId;
+    }
+
+    /**
+     * get sinkContext
+     * 
+     * @return the sinkContext
+     */
+    public Context getSinkContext() {
+        return sinkContext;
+    }
+
+    /**
+     * get producerContext
+     * 
+     * @return the producerContext
+     */
+    public Context getProducerContext() {
+        return producerContext;
+    }
+
+    /**
+     * get idTopicHolder
+     * 
+     * @return the idTopicHolder
+     */
+    public IdTopicConfigHolder getIdTopicHolder() {
+        return idTopicHolder;
+    }
+
+    /**
+     * get cacheHolder
+     * 
+     * @return the cacheHolder
+     */
+    public CacheClusterConfigHolder getCacheHolder() {
+        return cacheHolder;
+    }
+
+    /**
+     * get bufferQueue
+     * 
+     * @return the bufferQueue
+     */
+    public BufferQueue<Event> getBufferQueue() {
+        return bufferQueue;
+    }
+
+    /**
+     * get maxThreads
+     * 
+     * @return the maxThreads
+     */
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    /**
+     * get maxTransaction
+     * 
+     * @return the maxTransaction
+     */
+    public int getMaxTransaction() {
+        return maxTransaction;
+    }
+
+    /**
+     * get processInterval
+     * 
+     * @return the processInterval
+     */
+    public long getProcessInterval() {
+        return processInterval;
+    }
+
+    /**
+     * get reloadInterval
+     * 
+     * @return the reloadInterval
+     */
+    public long getReloadInterval() {
+        return reloadInterval;
+    }
+
+    /**
+     * get metricItemSet
+     * 
+     * @return the metricItemSet
+     */
+    public DataProxyMetricItemSet getMetricItemSet() {
+        return metricItemSet;
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
new file mode 100644
index 0000000..914cff3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * PulsarSetWorker
+ */
+public class PulsarFederationWorker extends Thread {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationWorker.class);
+
+    private final String workerName;
+    private final PulsarFederationSinkContext context;
+
+    private PulsarProducerFederation producerFederation;
+    private LifecycleState status;
+    private Map<String, String> dimensions;
+
+    /**
+     * Constructor
+     * 
+     * @param sinkName
+     * @param workerIndex
+     * @param context
+     */
+    public PulsarFederationWorker(String sinkName, int workerIndex, PulsarFederationSinkContext context) {
+        super();
+        this.workerName = sinkName + "-worker-" + workerIndex;
+        this.context = context;
+        this.producerFederation = new PulsarProducerFederation(workerName, this.context);
+        this.status = LifecycleState.IDLE;
+        this.dimensions = new HashMap<>();
+        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
+        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, sinkName);
+    }
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        this.producerFederation.start();
+        this.status = LifecycleState.START;
+        super.start();
+    }
+
+    /**
+     * 
+     * close
+     */
+    public void close() {
+        // close all producers
+        this.producerFederation.close();
+        this.status = LifecycleState.STOP;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
+        while (status != LifecycleState.STOP) {
+            try {
+                Event currentRecord = context.getBufferQueue().pollRecord();
+                if (currentRecord == null) {
+                    Thread.sleep(context.getProcessInterval());
+                    continue;
+                }
+                // fill topic
+                this.fillTopic(currentRecord);
+                // metric
+                DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
+                this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+                        currentRecord.getHeaders().get(Constants.TOPIC));
+                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+                metricItem.sendCount.incrementAndGet();
+                metricItem.sendSize.addAndGet(currentRecord.getBody().length);
+                // send
+                this.producerFederation.send(currentRecord);
+            } catch (Throwable e) {
+                LOG.error(e.getMessage(), e);
+                this.sleepOneInterval();
+            }
+        }
+    }
+
+    /**
+     * fillTopic
+     * 
+     * @param currentRecord
+     */
+    private void fillTopic(Event currentRecord) {
+        Map<String, String> headers = currentRecord.getHeaders();
+        String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
+        String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
+        String uid = IdTopicConfig.generateUid(inlongGroupId, inlongStreamId);
+        String topic = this.context.getIdTopicHolder().getTopic(uid);
+        if (!StringUtils.isBlank(topic)) {
+            headers.put(Constants.TOPIC, topic);
+        }
+    }
+
+    /**
+     * sleepOneInterval
+     */
+    private void sleepOneInterval() {
+        try {
+            Thread.sleep(context.getProcessInterval());
+        } catch (InterruptedException e1) {
+            LOG.error(e1.getMessage(), e1);
+        }
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
new file mode 100644
index 0000000..69a8ee1
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * PulsarProducerCluster
+ */
+public class PulsarProducerCluster implements LifecycleAware {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarProducerCluster.class);
+
+    public static final String KEY_SERVICE_URL = "serviceUrl";
+    public static final String KEY_AUTHENTICATION = "authentication";
+
+    public static final String KEY_ENABLEBATCHING = "enableBatching";
+    public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
+    public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
+    public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
+    public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
+    public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
+    public static final String KEY_SENDTIMEOUT = "sendTimeout";
+    public static final String KEY_COMPRESSIONTYPE = "compressionType";
+    public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
+    public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
+            + "BatchingPartitionSwitchFrequency";
+
+    private final String workerName;
+    private final CacheClusterConfig config;
+    private final PulsarFederationSinkContext sinkContext;
+    private final Context context;
+    private final String cacheClusterName;
+    private LifecycleState state;
+
+    /**
+     * pulsar client
+     */
+    private PulsarClient client;
+    private ProducerBuilder<byte[]> baseBuilder;
+
+    private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     * 
+     * @param workerName
+     * @param config
+     * @param context
+     */
+    public PulsarProducerCluster(String workerName, CacheClusterConfig config, PulsarFederationSinkContext context) {
+        this.workerName = workerName;
+        this.config = config;
+        this.sinkContext = context;
+        this.context = context.getProducerContext();
+        this.state = LifecycleState.IDLE;
+        this.cacheClusterName = config.getClusterName();
+    }
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        this.state = LifecycleState.START;
+        // create pulsar client
+        try {
+            String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
+            String authentication = config.getParams().get(KEY_AUTHENTICATION);
+            this.client = PulsarClient.builder()
+                    .serviceUrl(serviceUrl)
+                    .authentication(AuthenticationFactory.token(authentication))
+                    .build();
+            this.baseBuilder = client.newProducer();
+//            Map<String, Object> builderConf = new HashMap<>();
+//            builderConf.putAll(context.getParameters());
+            this.baseBuilder
+                    .hashingScheme(HashingScheme.Murmur3_32Hash)
+                    .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
+                    .batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 5242880))
+                    .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 3000))
+                    .batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 1),
+                            TimeUnit.MILLISECONDS);
+            this.baseBuilder.maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 1000))
+                    .maxPendingMessagesAcrossPartitions(
+                            context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 50000))
+                    .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
+                    .compressionType(this.getPulsarCompressionType())
+                    .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true))
+                    .roundRobinRouterBatchingPartitionSwitchFrequency(
+                            context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 10))
+                    .batcherBuilder(BatcherBuilder.DEFAULT);
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * getPulsarCompressionType
+     * 
+     * @return CompressionType
+     */
+    private CompressionType getPulsarCompressionType() {
+        String type = this.context.getString(KEY_COMPRESSIONTYPE);
+        switch (type) {
+            case "LZ4" :
+                return CompressionType.LZ4;
+            case "NONE" :
+                return CompressionType.NONE;
+            case "ZLIB" :
+                return CompressionType.ZLIB;
+            case "ZSTD" :
+                return CompressionType.ZSTD;
+            case "SNAPPY" :
+                return CompressionType.SNAPPY;
+            default :
+                return CompressionType.NONE;
+        }
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        this.state = LifecycleState.STOP;
+        //
+        for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (PulsarClientException e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        try {
+            this.client.close();
+        } catch (PulsarClientException e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * getLifecycleState
+     * 
+     * @return
+     */
+    @Override
+    public LifecycleState getLifecycleState() {
+        return state;
+    }
+
+    /**
+     * send
+     * 
+     * @param event
+     */
+    public boolean send(Event event) {
+        // send
+        Map<String, String> headers = event.getHeaders();
+        String topic = headers.get(Constants.TOPIC);
+        // get producer
+        Producer<byte[]> producer = this.producerMap.get(topic);
+        if (producer == null) {
+            try {
+                LOG.info("try to new a object for topic " + topic);
+                producer = baseBuilder.clone().topic(topic)
+                        .producerName(workerName + "-" + cacheClusterName + "-" + topic)
+                        .create();
+                LOG.info("create new producer success:{}", producer.getProducerName());
+                Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(topic, producer);
+                if (oldProducer != null) {
+                    producer.close();
+                    LOG.info("close producer success:{}", producer.getProducerName());
+                    producer = oldProducer;
+                }
+            } catch (Throwable ex) {
+                LOG.error("create new producer failed", ex);
+            }
+        }
+        // create producer failed
+        if (producer == null) {
+            sinkContext.getBufferQueue().release(event.getBody().length);
+            this.addMetric(event, topic, false, 0);
+            return false;
+        }
+        String messageKey = headers.get(Constants.MESSAGE_KEY);
+        if (messageKey == null) {
+            messageKey = headers.get(Constants.HEADER_KEY_SOURCE_IP);
+        }
+        // sendAsync
+        long sendTime = System.currentTimeMillis();
+        CompletableFuture<MessageId> future = producer.newMessage().key(messageKey).properties(headers)
+                .value(event.getBody()).sendAsync();
+        // callback
+        future.whenCompleteAsync((msgId, ex) -> {
+            if (ex != null) {
+                LOG.error("Send fail:{}", ex.getMessage());
+                LOG.error(ex.getMessage(), ex);
+                sinkContext.getBufferQueue().offer(event);
+                this.addMetric(event, topic, false, 0);
+            } else {
+                sinkContext.getBufferQueue().release(event.getBody().length);
+                this.addMetric(event, topic, true, sendTime);
+            }
+        });
+        return true;
+    }
+
+    /**
+     * addMetric
+     * 
+     * @param currentRecord
+     * @param topic
+     * @param result
+     * @param size
+     */
+    private void addMetric(Event currentRecord, String topic, boolean result, long sendTime) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.sinkContext.getProxyClusterId());
+        // metric
+        DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
+        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
+        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
+        DataProxyMetricItem metricItem = this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
+        if (result) {
+            metricItem.sendSuccessCount.incrementAndGet();
+            metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
+            if (sendTime > 0) {
+                long currentTime = System.currentTimeMillis();
+                long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+                        sendTime);
+                long sinkDuration = currentTime - sendTime;
+                long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
+                long wholeDuration = currentTime - msgTime;
+                metricItem.sinkDuration.addAndGet(sinkDuration);
+                metricItem.nodeDuration.addAndGet(nodeDuration);
+                metricItem.wholeDuration.addAndGet(wholeDuration);
+            }
+        } else {
+            metricItem.sendFailCount.incrementAndGet();
+            metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
+        }
+    }
+
+    /**
+     * get cacheClusterName
+     * 
+     * @return the cacheClusterName
+     */
+    public String getCacheClusterName() {
+        return cacheClusterName;
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java
new file mode 100644
index 0000000..bc387ef
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * PulsarProducerSet
+ */
+public class PulsarProducerFederation {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarProducerFederation.class);
+
+    private final String workerName;
+    private final PulsarFederationSinkContext context;
+    private Timer reloadTimer;
+
+    private List<PulsarProducerCluster> clusterList = new ArrayList<>();
+    private List<PulsarProducerCluster> deletingClusterList = new ArrayList<>();
+
+    private AtomicInteger clusterIndex = new AtomicInteger(0);
+
+    /**
+     * Constructor
+     * 
+     * @param workerName
+     * @param context
+     */
+    public PulsarProducerFederation(String workerName, PulsarFederationSinkContext context) {
+        this.workerName = workerName;
+        this.context = context;
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        for (PulsarProducerCluster cluster : this.clusterList) {
+            cluster.stop();
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+
+            public void run() {
+                reload();
+            }
+        };
+        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + context.getReloadInterval()),
+                context.getReloadInterval());
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        try {
+            // stop deleted cluster
+            deletingClusterList.forEach(item -> {
+                item.stop();
+            });
+            deletingClusterList.clear();
+            // update cluster list
+            List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
+            List<PulsarProducerCluster> newClusterList = new ArrayList<>(configList.size());
+            // prepare
+            Set<String> newClusterNames = new HashSet<>();
+            configList.forEach(item -> {
+                newClusterNames.add(item.getClusterName());
+            });
+            Set<String> oldClusterNames = new HashSet<>();
+            clusterList.forEach(item -> {
+                oldClusterNames.add(item.getCacheClusterName());
+            });
+            // add
+            for (CacheClusterConfig config : configList) {
+                if (!oldClusterNames.contains(config.getClusterName())) {
+                    PulsarProducerCluster cluster = new PulsarProducerCluster(workerName, config, context);
+                    cluster.start();
+                    newClusterList.add(cluster);
+                }
+            }
+            // remove
+            for (PulsarProducerCluster cluster : this.clusterList) {
+                if (newClusterNames.contains(cluster.getCacheClusterName())) {
+                    newClusterList.add(cluster);
+                } else {
+                    deletingClusterList.add(cluster);
+                }
+            }
+            this.clusterList = newClusterList;
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * send
+     * 
+     * @param event
+     */
+    public boolean send(Event event) {
+        int currentIndex = clusterIndex.getAndIncrement();
+        if (currentIndex > Integer.MAX_VALUE / 2) {
+            clusterIndex.set(0);
+        }
+        List<PulsarProducerCluster> currentClusterList = this.clusterList;
+        int currentSize = currentClusterList.size();
+        int realIndex = currentIndex % currentSize;
+        PulsarProducerCluster clusterProducer = currentClusterList.get(realIndex);
+        return clusterProducer.send(event);
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index da1d627..eb58398 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Constructor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -40,6 +41,7 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(ServerMessageFactory.class);
     private static final int DEFAULT_READ_IDLE_TIME = 70 * 60 * 1000;
+    private AbstractSource source;
     private ChannelProcessor processor;
     private ChannelGroup allChannels;
     private ExecutionHandler executionHandler;
@@ -71,12 +73,13 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
      * @param isCompressed
      * @param name
      */
-    public ServerMessageFactory(ChannelProcessor processor,
+    public ServerMessageFactory(AbstractSource source,
                                 ChannelGroup allChannels, String protocol, ServiceDecoder serProcessor,
                                 String messageHandlerName, Integer maxMsgLength,
                                 String topic, String attr, Boolean filterEmptyMsg, Integer maxCons,
                                 Boolean isCompressed, String name) {
-        this.processor = processor;
+        this.source = source;
+        this.processor = source.getChannelProcessor();
         this.allChannels = allChannels;
         this.topic = topic;
         this.attr = attr;
@@ -124,12 +127,12 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
                         .forName(messageHandlerName);
 
                 Constructor<?> ctor = clazz.getConstructor(
-                        ChannelProcessor.class, ServiceDecoder.class, ChannelGroup.class,
+                        AbstractSource.class, ServiceDecoder.class, ChannelGroup.class,
                         String.class, String.class, Boolean.class, Integer.class,
                         Integer.class, Boolean.class, String.class);
 
                 SimpleChannelHandler messageHandler = (SimpleChannelHandler) ctor
-                        .newInstance(processor, serviceProcessor, allChannels, topic, attr,
+                        .newInstance(source, serviceProcessor, allChannels, topic, attr,
                                 filterEmptyMsg, maxMsgLength, maxConnections, isCompressed, protocolType
                         );
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index eadf2bc..e6eee0e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -18,13 +18,10 @@
 package org.apache.inlong.dataproxy.source;
 
 import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
 import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -34,13 +31,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.commons.msg.TDMsg1;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
@@ -48,6 +46,8 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -60,6 +60,9 @@ import org.jboss.netty.channel.group.ChannelGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+
 /**
  * Server message handler
  *
@@ -91,7 +94,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                     return new SimpleDateFormat("yyyyMMddHHmmss");
                 }
             };
-
+    private AbstractSource source;
     private final ChannelGroup allChannels;
     private int maxConnections = Integer.MAX_VALUE;
     private boolean filterEmptyMsg = false;
@@ -102,14 +105,16 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     private String defaultMXAttr = "m=3";
     private final ChannelBuffer heartbeatBuffer;
     private final String protocolType;
+    //
+    private final DataProxyMetricItemSet metricItemSet;
 
-    public ServerMessageHandler(ChannelProcessor processor, ServiceDecoder serProcessor,
+    public ServerMessageHandler(AbstractSource source, ServiceDecoder serProcessor,
                                 ChannelGroup allChannels,
                                 String topic, String attr, Boolean filterEmptyMsg, Integer maxMsgLength,
                                 Integer maxCons,
                                 Boolean isCompressed, String protocolType) {
-
-        this.processor = processor;
+        this.source = source;
+        this.processor = source.getChannelProcessor();
         this.serviceProcessor = serProcessor;
         this.allChannels = allChannels;
         this.defaultTopic = topic;
@@ -122,6 +127,11 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         this.heartbeatBuffer = ChannelBuffers.wrappedBuffer(new byte[]{0, 0, 0, 1, 1});
         this.maxConnections = maxCons;
         this.protocolType = protocolType;
+        if (source instanceof SimpleTcpSource) {
+            this.metricItemSet = ((SimpleTcpSource) source).getMetricItemSet();
+        } else {
+            this.metricItemSet = new DataProxyMetricItemSet(this.toString());
+        }
     }
 
     private String getRemoteIp(Channel channel) {
@@ -420,9 +430,10 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 dtten = dtten * 1000 * 60 * 10;
                 try {
                     processor.processEvent(event);
+                    this.addMetric(true, data.length);
                 } catch (Throwable ex) {
                     logger.error("Error writting to channel,data will discard.", ex);
-
+                    this.addMetric(false, data.length);
                     throw new ChannelException("ProcessEvent error can't write event to channel.");
                 }
             }
@@ -524,6 +535,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         logger.info("message received");
         if (e == null) {
             logger.error("get null messageevent, just skip");
+            this.addMetric(false, 0);
             return;
         }
         ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
@@ -533,6 +545,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         if (len == 0 && this.filterEmptyMsg) {
             logger.warn("skip empty msg.");
             cb.clear();
+            this.addMetric(false, 0);
             return;
         }
 
@@ -541,23 +554,27 @@ public class ServerMessageHandler extends SimpleChannelHandler {
         try {
             resultMap = serviceProcessor.extractData(cb, remoteChannel);
         } catch (MessageIDException ex) {
+            this.addMetric(false, 0);
             throw new IOException(ex.getCause());
         }
 
         if (resultMap == null) {
             logger.info("result is null");
+            this.addMetric(false, 0);
             return;
         }
 
         MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
         if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
             remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
+            this.addMetric(false, 0);
             return;
         }
 
         if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
 //            ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
 //            remoteChannel.write(binBuffer, remoteSocketAddress);
+            this.addMetric(false, 0);
             return;
         }
 
@@ -591,9 +608,10 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 Event event = EventBuilder.withBody(body, headers);
                 try {
                     processor.processEvent(event);
+                    this.addMetric(true, body.length);
                 } catch (Throwable ex) {
                     logger.error("Error writing to controller,data will discard.", ex);
-
+                    this.addMetric(false, body.length);
                     throw new ChannelException(
                             "Process Controller Event error can't write event to channel.");
                 }
@@ -611,9 +629,10 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 Event event = EventBuilder.withBody(body, headers);
                 try {
                     processor.processEvent(event);
+                    this.addMetric(true, body.length);
                 } catch (Throwable ex) {
                     logger.error("Error writing to controller,data will discard.", ex);
-
+                    this.addMetric(false, body.length);
                     throw new ChannelException(
                             "Process Controller Event error can't write event to channel.");
                 }
@@ -631,4 +650,29 @@ public class ServerMessageHandler extends SimpleChannelHandler {
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         logger.error("channel closed {}", ctx.getChannel());
     }
+
+    /**
+     * addMetric
+     * 
+     * @param currentRecord
+     * @param topic
+     * @param result
+     * @param size
+     */
+    private void addMetric(boolean result, long size) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
+        dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, source.getName());
+        dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, source.getName());
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, "");
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, "");
+        DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
+        if (result) {
+            metricItem.readSuccessCount.incrementAndGet();
+            metricItem.readSuccessSize.addAndGet(size);
+        } else {
+            metricItem.readFailCount.incrementAndGet();
+            metricItem.readFailSize.addAndGet(size);
+        }
+    }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 59df0a6..af7c8da 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.dataproxy.source;
 
-import com.google.common.base.Preconditions;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -37,12 +35,13 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.FlumeException;
-import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.dataproxy.base.NamedThreadFactory;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -55,6 +54,8 @@ import org.jboss.netty.util.ThreadRenamingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Simple tcp source
  *
@@ -92,6 +93,8 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
     protected boolean filterEmptyMsg;
 
     private Channel nettyChannel = null;
+    //
+    private DataProxyMetricItemSet metricItemSet;
 
     public SimpleTcpSource() {
         super();
@@ -190,6 +193,8 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
     @Override
     public synchronized void start() {
         logger.info("start " + this.getName());
+        this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+        MetricRegister.register(metricItemSet);
         checkBlackListThread = new CheckBlackListThread();
         checkBlackListThread.start();
         super.start();
@@ -222,13 +227,13 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
                     (Class<? extends ChannelPipelineFactory>) Class.forName(msgFactoryName);
 
             Constructor ctor =
-                    clazz.getConstructor(ChannelProcessor.class, ChannelGroup.class,
+                    clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
                             String.class, ServiceDecoder.class, String.class,
                             Integer.class, String.class, String.class, Boolean.class,
                             Integer.class, Boolean.class, String.class);
 
-            logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
-            fac = (ChannelPipelineFactory) ctor.newInstance(getChannelProcessor(), allChannels,
+            logger.info("Using channel processor:{}", this.getClass().getName());
+            fac = (ChannelPipelineFactory) ctor.newInstance(this, allChannels,
                     "tcp", serviceDecoder, messageHandlerName,
                     maxMsgLength, topic, attr, filterEmptyMsg, maxConnections, isCompressed, this.getName());
 
@@ -367,4 +372,12 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
                 "maxMsgLength must be >= 4 and <= " + ConfigConstants.MSG_MAX_LENGTH_BYTES);
         isCompressed = context.getBoolean(ConfigConstants.MSG_COMPRESSED, true);
     }
+
+    /**
+     * get metricItemSet
+     * @return the metricItemSet
+     */
+    public DataProxyMetricItemSet getMetricItemSet() {
+        return metricItemSet;
+    }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
new file mode 100644
index 0000000..34bb46d
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * BufferQueue
+ */
+public class BufferQueue<A> {
+
+    private final LinkedBlockingQueue<A> queue;
+    private final SizeSemaphore currentTokens;
+    private SizeSemaphore globalTokens;
+    private final AtomicLong offerCount = new AtomicLong(0);
+    private final AtomicLong pollCount = new AtomicLong(0);
+
+    /**
+     * Constructor
+     * 
+     * @param maxSizeKb
+     */
+    public BufferQueue(int maxSizeKb) {
+        this.queue = new LinkedBlockingQueue<>();
+        this.currentTokens = new SizeSemaphore(maxSizeKb, SizeSemaphore.ONEKB);
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param maxSizeKb
+     * @param globalTokens
+     */
+    public BufferQueue(int maxSizeKb, SizeSemaphore globalTokens) {
+        this(maxSizeKb);
+        this.globalTokens = globalTokens;
+    }
+
+    /**
+     * pollRecord
+     */
+    public A pollRecord() {
+        A record = queue.poll();
+        this.pollCount.getAndIncrement();
+        return record;
+    }
+
+    /**
+     * offer
+     */
+    public void offer(A record) {
+        if (record == null) {
+            return;
+        }
+        queue.offer(record);
+        this.offerCount.incrementAndGet();
+    }
+
+    /**
+     * queue size
+     */
+    public int size() {
+        return queue.size();
+    }
+
+    /**
+     * small change
+     */
+    public int leftKb() {
+        return currentTokens.leftSemaphore();
+    }
+
+    /**
+     * availablePermits
+     */
+    public int availablePermits() {
+        return currentTokens.availablePermits();
+    }
+
+    /**
+     * maxSizeKb
+     */
+    public int maxSizeKb() {
+        return currentTokens.maxSize();
+    }
+
+    /**
+     * getIdleRate
+     */
+    public double getIdleRate() {
+        double remaining = currentTokens.availablePermits();
+        return remaining * 100.0 / currentTokens.maxSize();
+    }
+
+    /**
+     * tryAcquire
+     */
+    public boolean tryAcquire(long sizeInByte) {
+        boolean cidResult = currentTokens.tryAcquire(sizeInByte);
+        if (!cidResult) {
+            return false;
+        }
+        if (this.globalTokens == null) {
+            return true;
+        }
+        boolean globalResult = this.globalTokens.tryAcquire(sizeInByte);
+        if (globalResult) {
+            return true;
+        }
+        currentTokens.release(sizeInByte);
+        return false;
+    }
+
+    /**
+     * acquire
+     */
+    public void acquire(long sizeInByte) {
+        currentTokens.acquire(sizeInByte);
+        if (this.globalTokens != null) {
+            globalTokens.acquire(sizeInByte);
+        }
+    }
+
+    /**
+     * release
+     */
+    public void release(long sizeInByte) {
+        if (this.globalTokens != null) {
+            this.globalTokens.release(sizeInByte);
+        }
+        this.currentTokens.release(sizeInByte);
+    }
+
+    /**
+     * get offerCount
+     * 
+     * @return the offerCount
+     */
+    public long getOfferCount() {
+        return offerCount.getAndSet(0);
+    }
+
+    /**
+     * get pollCount
+     * 
+     * @return the pollCount
+     */
+    public long getPollCount() {
+        return pollCount.getAndSet(0);
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/Constants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/Constants.java
new file mode 100644
index 0000000..18a8677
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/Constants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+/**
+ * 
+ * Constants
+ */
+public interface Constants {
+
+    String INLONG_GROUP_ID = "inlongGroupId";
+    String INLONG_STREAM_ID = "inlongStreamId";
+    String TOPIC = "topic";
+    String HEADER_KEY_MSG_TIME = "msgTime";
+    String HEADER_KEY_SOURCE_IP = "sourceIp";
+    String HEADER_KEY_SOURCE_TIME = "sourceTime";
+    String MESSAGE_KEY = "messageKey";
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
new file mode 100644
index 0000000..31803a6
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * SizeSemaphore
+ */
+public class SizeSemaphore {
+
+    public static final int ONEKB = 1024;
+
+    private int maxSize = 0;
+    private int leftSize = 0;
+    private Semaphore sizeSemaphore = null;
+    private AtomicInteger leftSemaphore = new AtomicInteger(0);
+
+    /**
+     * Constructor
+     * 
+     * @param maxSize
+     * @param leftSize
+     */
+    public SizeSemaphore(int maxSize, int leftSize) {
+        this.maxSize = maxSize;
+        this.leftSize = leftSize;
+        this.sizeSemaphore = new Semaphore(maxSize, true);
+    }
+
+    /**
+     * small change
+     */
+    public int leftSemaphore() {
+        return leftSemaphore.get();
+    }
+
+    /**
+     * availablePermits
+     */
+    public int availablePermits() {
+        return sizeSemaphore.availablePermits();
+    }
+
+    /**
+     * maxSize
+     */
+    public int maxSize() {
+        return maxSize;
+    }
+
+    /**
+     * getIdleRate
+     */
+    public double getIdleRate() {
+        double remaining = sizeSemaphore.availablePermits();
+        return remaining * 100.0 / maxSize;
+    }
+
+    /**
+     * tryAcquire
+     */
+    public boolean tryAcquire(long sizeInByte) {
+        int sizeInKb = (int) (sizeInByte / leftSize);
+        int sizeChange = (int) (sizeInByte % leftSize);
+        if (leftSemaphore.get() - sizeChange < 0) {
+            boolean result = sizeSemaphore.tryAcquire(sizeInKb + 1);
+            if (result) {
+                leftSemaphore.addAndGet(-sizeChange + leftSize);
+            }
+            return result;
+        } else {
+            boolean result = sizeSemaphore.tryAcquire(sizeInKb);
+            if (result) {
+                leftSemaphore.addAndGet(-sizeChange);
+            }
+            return result;
+        }
+    }
+
+    /**
+     * acquire
+     */
+    public void acquire(long sizeInByte) {
+        int sizeInKb = (int) (sizeInByte / leftSize);
+        int sizeChange = (int) (sizeInByte % leftSize);
+        if (leftSemaphore.get() - sizeChange < 0) {
+            sizeSemaphore.acquireUninterruptibly(sizeInKb + 1);
+            leftSemaphore.addAndGet(-sizeChange + leftSize);
+        } else {
+            sizeSemaphore.acquireUninterruptibly(sizeInKb);
+            leftSemaphore.addAndGet(-sizeChange);
+        }
+    }
+
+    /**
+     * release
+     */
+    public void release(long sizeInByte) {
+        int sizeInKb = (int) (sizeInByte / leftSize);
+        int sizeChange = (int) (sizeInByte % leftSize);
+        if (leftSemaphore.get() + sizeChange > leftSize) {
+            sizeSemaphore.release(sizeInKb + 1);
+            leftSemaphore.addAndGet(sizeChange - leftSize);
+        } else {
+            sizeSemaphore.release(sizeInKb);
+            leftSemaphore.addAndGet(sizeChange);
+        }
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
new file mode 100644
index 0000000..63656ae
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.metrics.MetricListener;
+import org.junit.Test;
+
+/**
+ * 
+ * TestClassResourceCommonPropertiesLoader
+ */
+public class TestClassResourceCommonPropertiesLoader {
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testResult() throws Exception {
+        // increase source
+        ClassResourceCommonPropertiesLoader loader = new ClassResourceCommonPropertiesLoader();
+        Map<String, String> props = loader.load();
+        assertEquals("proxy_inlong5th_sz", props.get(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME));
+        assertEquals("DataProxy", props.get(MetricListener.KEY_METRIC_DOMAINS));
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
new file mode 100644
index 0000000..44b38cf
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * TestContextCacheClusterConfigLoader
+ */
+public class TestContextCacheClusterConfigLoader {
+
+    public static final Logger LOG = LoggerFactory.getLogger(TestContextCacheClusterConfigLoader.class);
+    private static Context context;
+    private static Context sinkContext;
+
+    /**
+     * setup
+     */
+    @BeforeClass
+    public static void setup() {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        try (InputStream inStream = TestContextCacheClusterConfigLoader.class.getClassLoader().getResource("flume.conf")
+                .openStream()) {
+            Properties props = new Properties();
+            props.load(inStream);
+            for (Map.Entry<Object, Object> entry : props.entrySet()) {
+                result.put((String) entry.getKey(), (String) entry.getValue());
+            }
+            context = new Context(result);
+            sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+        } catch (Exception e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+        }
+    }
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testResult() throws Exception {
+        ContextCacheClusterConfigLoader loader = new ContextCacheClusterConfigLoader();
+        loader.configure(sinkContext);
+        List<CacheClusterConfig> configList = loader.load();
+        assertEquals(1, configList.size());
+        CacheClusterConfig config = configList.get(0);
+        assertEquals("cache_inlong5th_sz1", config.getClusterName());
+        assertEquals("http://tdmq.ap-gz.tencentyun.com:8080", config.getParams().get("serviceUrl"));
+        assertEquals("xxxx", config.getParams().get("authentication"));
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
new file mode 100644
index 0000000..a1eae40
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * TestContextCacheClusterConfigLoader
+ */
+public class TestContextIdTopicConfigLoader {
+
+    public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
+    private static Context context;
+    private static Context sinkContext;
+
+    /**
+     * setup
+     */
+    @BeforeClass
+    public static void setup() {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        try (InputStream inStream = TestContextIdTopicConfigLoader.class.getClassLoader().getResource("flume.conf")
+                .openStream()) {
+            Properties props = new Properties();
+            props.load(inStream);
+            for (Map.Entry<Object, Object> entry : props.entrySet()) {
+                result.put((String) entry.getKey(), (String) entry.getValue());
+            }
+            context = new Context(result);
+            sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+        } catch (Exception e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+        }
+    }
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testResult() throws Exception {
+        ContextIdTopicConfigLoader loader = new ContextIdTopicConfigLoader();
+        loader.configure(sinkContext);
+        List<IdTopicConfig> configList = loader.load();
+        assertEquals(2, configList.size());
+
+        for (IdTopicConfig config : configList) {
+            if ("03a00000026".equals(config.getInlongGroupId())) {
+                assertEquals("pulsar-9xn9wp35pbxb/test/topic1", config.getTopicName());
+            }
+        }
+    }
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
similarity index 92%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
copy to inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
index 4e70d63..2219eea 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.commons.config.metrics.set;
+package org.apache.inlong.dataproxy.metrics;
 
 import static org.junit.Assert.assertEquals;
 
@@ -39,9 +39,9 @@ import org.junit.Test;
  * 
  * TestMetricItemSetMBean
  */
-public class TestMetricItemSetMBean {
+public class TestDataProxyMetricItemSet {
 
-    public static final String SET_ID = "inlong5th_sz";
+    public static final String CLUSTER_ID = "inlong5th_sz";
     public static final String CONTAINER_NAME = "2222.inlong.DataProxy.sz100001";
     public static final String CONTAINER_IP = "127.0.0.1";
     private static final String SOURCE_ID = "agent-source";
@@ -60,13 +60,11 @@ public class TestMetricItemSetMBean {
      */
     @BeforeClass
     public static void setup() {
-        itemSet = DataProxyMetricItemSet.getInstance();
+        itemSet = new DataProxyMetricItemSet(CLUSTER_ID);
         MetricRegister.register(itemSet);
         // prepare
         DataProxyMetricItem itemSource = new DataProxyMetricItem();
-        itemSource.setId = SET_ID;
-        itemSource.containerName = CONTAINER_NAME;
-        itemSource.containerIp = CONTAINER_IP;
+        itemSource.clusterId = CLUSTER_ID;
         itemSource.sourceId = SOURCE_ID;
         itemSource.sourceDataId = SOURCE_DATA_ID;
         itemSource.inlongGroupId = INLONG_GROUP_ID1;
@@ -74,9 +72,7 @@ public class TestMetricItemSetMBean {
         dimSource = itemSource.getDimensions();
         //
         DataProxyMetricItem itemSink = new DataProxyMetricItem();
-        itemSink.setId = SET_ID;
-        itemSink.containerName = CONTAINER_NAME;
-        itemSink.containerIp = CONTAINER_IP;
+        itemSink.clusterId = CLUSTER_ID;
         itemSink.sinkId = SINK_ID;
         itemSink.sinkDataId = SINK_DATA_ID;
         itemSink.inlongGroupId = INLONG_GROUP_ID1;
@@ -123,8 +119,8 @@ public class TestMetricItemSetMBean {
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         StringBuilder beanName = new StringBuilder();
         beanName.append(MetricUtils.getDomain(DataProxyMetricItemSet.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
-                .append("type=")
-                .append(DataProxyMetricItemSet.class.toString());
+                .append("name=")
+                .append(itemSet.getName());
         String strBeanName = beanName.toString();
         ObjectName objName = new ObjectName(strBeanName);
         {
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
similarity index 53%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
copy to inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
index 4e70d63..0576200 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
@@ -15,20 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.commons.config.metrics.set;
+package org.apache.inlong.dataproxy.metrics;
 
 import static org.junit.Assert.assertEquals;
 
-import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.inlong.commons.config.metrics.MetricItem;
-import org.apache.inlong.commons.config.metrics.MetricItemMBean;
-import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
 import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.commons.config.metrics.MetricUtils;
 import org.apache.inlong.commons.config.metrics.MetricValue;
@@ -39,9 +33,9 @@ import org.junit.Test;
  * 
  * TestMetricItemSetMBean
  */
-public class TestMetricItemSetMBean {
+public class TestMetricListenerRunnable {
 
-    public static final String SET_ID = "inlong5th_sz";
+    public static final String CLUSTER_ID = "inlong5th_sz";
     public static final String CONTAINER_NAME = "2222.inlong.DataProxy.sz100001";
     public static final String CONTAINER_IP = "127.0.0.1";
     private static final String SOURCE_ID = "agent-source";
@@ -54,19 +48,21 @@ public class TestMetricItemSetMBean {
     private static DataProxyMetricItemSet itemSet;
     private static Map<String, String> dimSource;
     private static Map<String, String> dimSink;
+    private static String keySource1;
+    private static String keySource2;
+    private static String keySink1;
+    private static String keySink2;
 
     /**
      * setup
      */
     @BeforeClass
     public static void setup() {
-        itemSet = DataProxyMetricItemSet.getInstance();
+        itemSet = new DataProxyMetricItemSet(CLUSTER_ID);
         MetricRegister.register(itemSet);
         // prepare
         DataProxyMetricItem itemSource = new DataProxyMetricItem();
-        itemSource.setId = SET_ID;
-        itemSource.containerName = CONTAINER_NAME;
-        itemSource.containerIp = CONTAINER_IP;
+        itemSource.clusterId = CLUSTER_ID;
         itemSource.sourceId = SOURCE_ID;
         itemSource.sourceDataId = SOURCE_DATA_ID;
         itemSource.inlongGroupId = INLONG_GROUP_ID1;
@@ -74,9 +70,7 @@ public class TestMetricItemSetMBean {
         dimSource = itemSource.getDimensions();
         //
         DataProxyMetricItem itemSink = new DataProxyMetricItem();
-        itemSink.setId = SET_ID;
-        itemSink.containerName = CONTAINER_NAME;
-        itemSink.containerIp = CONTAINER_IP;
+        itemSink.clusterId = CLUSTER_ID;
         itemSink.sinkId = SINK_ID;
         itemSink.sinkDataId = SINK_DATA_ID;
         itemSink.inlongGroupId = INLONG_GROUP_ID1;
@@ -89,7 +83,6 @@ public class TestMetricItemSetMBean {
      * 
      * @throws Exception
      */
-    @SuppressWarnings("unchecked")
     @Test
     public void testResult() throws Exception {
         // increase source
@@ -97,20 +90,20 @@ public class TestMetricItemSetMBean {
         item = itemSet.findMetricItem(dimSource);
         item.readSuccessCount.incrementAndGet();
         item.readSuccessSize.addAndGet(100);
-        String keySource1 = MetricUtils.getDimensionsKey(dimSource);
+        keySource1 = MetricUtils.getDimensionsKey(dimSource);
         //
         dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
         item = itemSet.findMetricItem(dimSource);
         item.readFailCount.addAndGet(20);
         item.readFailSize.addAndGet(2000);
-        String keySource2 = MetricUtils.getDimensionsKey(dimSource);
+        keySource2 = MetricUtils.getDimensionsKey(dimSource);
         // increase sink
         item = itemSet.findMetricItem(dimSink);
         item.sendCount.incrementAndGet();
         item.sendSize.addAndGet(100);
         item.sendSuccessCount.incrementAndGet();
         item.sendSuccessSize.addAndGet(100);
-        String keySink1 = MetricUtils.getDimensionsKey(dimSink);
+        keySink1 = MetricUtils.getDimensionsKey(dimSink);
         //
         dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
         item = itemSet.findMetricItem(dimSink);
@@ -118,43 +111,41 @@ public class TestMetricItemSetMBean {
         item.sendSize.addAndGet(2000);
         item.sendFailCount.addAndGet(20);
         item.sendFailSize.addAndGet(2000);
-        String keySink2 = MetricUtils.getDimensionsKey(dimSink);
+        keySink2 = MetricUtils.getDimensionsKey(dimSink);
         // report
-        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        StringBuilder beanName = new StringBuilder();
-        beanName.append(MetricUtils.getDomain(DataProxyMetricItemSet.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
-                .append("type=")
-                .append(DataProxyMetricItemSet.class.toString());
-        String strBeanName = beanName.toString();
-        ObjectName objName = new ObjectName(strBeanName);
-        {
-            List<MetricItem> items = (List<MetricItem>) mbs.invoke(objName, MetricItemSetMBean.METHOD_SNAPSHOT, null,
-                    null);
-            for (MetricItem itemObj : items) {
-                if (keySource1.equals(itemObj.getDimensionsKey())) {
-                    Map<String, MetricValue> metricMap = itemObj.snapshot();
-                    assertEquals(1, metricMap.get("readSuccessCount").value);
-                    assertEquals(100, metricMap.get("readSuccessSize").value);
-                } else if (keySource2.equals(itemObj.getDimensionsKey())) {
-                    Map<String, MetricValue> metricMap = itemObj.snapshot();
-                    assertEquals(20, metricMap.get("readFailCount").value);
-                    assertEquals(2000, metricMap.get("readFailSize").value);
-                } else if (keySink1.equals(itemObj.getDimensionsKey())) {
-                    Map<String, MetricValue> metricMap = itemObj.snapshot();
-                    assertEquals(1, metricMap.get("sendCount").value);
-                    assertEquals(100, metricMap.get("sendSize").value);
-                    assertEquals(1, metricMap.get("sendSuccessCount").value);
-                    assertEquals(100, metricMap.get("sendSuccessSize").value);
-                } else if (keySink2.equals(itemObj.getDimensionsKey())) {
-                    Map<String, MetricValue> metricMap = itemObj.snapshot();
-                    assertEquals(20, metricMap.get("sendCount").value);
-                    assertEquals(2000, metricMap.get("sendSize").value);
-                    assertEquals(20, metricMap.get("sendFailCount").value);
-                    assertEquals(2000, metricMap.get("sendFailSize").value);
-                } else {
-                    System.out.println("bad MetricItem:" + itemObj.getDimensionsKey());
+        MetricListener listener = new MetricListener() {
+
+            @Override
+            public void snapshot(String domain, List<MetricItemValue> itemValues) {
+                assertEquals("DataProxy", domain);
+                for (MetricItemValue itemValue : itemValues) {
+                    String key = itemValue.getKey();
+                    Map<String, MetricValue> metricMap = itemValue.getMetrics();
+                    if (keySource1.equals(itemValue.getKey())) {
+                        assertEquals(1, metricMap.get("readSuccessCount").value);
+                        assertEquals(100, metricMap.get("readSuccessSize").value);
+                    } else if (keySource2.equals(key)) {
+                        assertEquals(20, metricMap.get("readFailCount").value);
+                        assertEquals(2000, metricMap.get("readFailSize").value);
+                    } else if (keySink1.equals(key)) {
+                        assertEquals(1, metricMap.get("sendCount").value);
+                        assertEquals(100, metricMap.get("sendSize").value);
+                        assertEquals(1, metricMap.get("sendSuccessCount").value);
+                        assertEquals(100, metricMap.get("sendSuccessSize").value);
+                    } else if (keySink2.equals(key)) {
+                        assertEquals(20, metricMap.get("sendCount").value);
+                        assertEquals(2000, metricMap.get("sendSize").value);
+                        assertEquals(20, metricMap.get("sendFailCount").value);
+                        assertEquals(2000, metricMap.get("sendFailSize").value);
+                    } else {
+                        System.out.println("bad MetricItem:" + key);
+                    }
                 }
             }
-        }
+        };
+        List<MetricListener> listeners = new ArrayList<>();
+        listeners.add(listener);
+        MetricListenerRunnable runnable = new MetricListenerRunnable("DataProxy", listeners);
+        runnable.run();
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
new file mode 100644
index 0000000..847f218
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.loader.TestContextIdTopicConfigLoader;
+import org.apache.inlong.dataproxy.utils.MockUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * TestPulsarFederationSink
+ */
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class TestPulsarFederationSink {
+
+    public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
+    public static Context context;
+    public static Context sinkContext;
+    public static PulsarFederationSink sinkObj;
+
+    /**
+     * setup
+     */
+    @BeforeClass
+    public static void setUp() {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource("flume.conf")
+                .openStream()) {
+            Properties props = new Properties();
+            props.load(inStream);
+            for (Map.Entry<Object, Object> entry : props.entrySet()) {
+                result.put((String) entry.getKey(), (String) entry.getValue());
+            }
+            context = new Context(result);
+            sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+        } catch (Exception e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+        }
+        //
+        sinkObj = new PulsarFederationSink();
+        sinkObj.configure(sinkContext);
+    }
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testResult() throws Exception {
+        // mock
+        Channel channel = MockUtils.mockChannel();
+        sinkObj.setChannel(channel);
+        //
+        sinkObj.process();
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
new file mode 100644
index 0000000..06f3776
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.loader.TestContextIdTopicConfigLoader;
+import org.apache.inlong.dataproxy.utils.MockUtils;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * TestPulsarProducerFederation
+ */
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({PulsarClient.class, ClientBuilder.class, MessageId.class,
+    Producer.class, ProducerBuilder.class, TypedMessageBuilder.class})
+public class TestPulsarProducerFederation {
+
+    public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
+    public static Context context;
+    public static Context sinkContext;
+
+    /**
+     * setup
+     */
+    @BeforeClass
+    public static void setUp() {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource("flume.conf")
+                .openStream()) {
+            Properties props = new Properties();
+            props.load(inStream);
+            for (Map.Entry<Object, Object> entry : props.entrySet()) {
+                result.put((String) entry.getKey(), (String) entry.getValue());
+            }
+            context = new Context(result);
+            sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
+            MockUtils.mockPulsarClient();
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+        } catch (Exception e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+        }
+    }
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testResult() throws Exception {
+        String workerName = "workerName";
+        PulsarFederationSinkContext pulsarContext = new PulsarFederationSinkContext(MockUtils.SINK_ID, sinkContext);
+        PulsarProducerFederation federation = new PulsarProducerFederation(workerName, pulsarContext);
+        federation.start();
+        Event event = MockUtils.mockEvent();
+        boolean result = federation.send(event);
+        assertEquals(true, result);
+    }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
new file mode 100644
index 0000000..beaf1e0
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.event.EventBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * 
+ * MockUtils
+ */
+public class MockUtils {
+
+    public static final String CLUSTER_ID = "proxy_inlong5th_sz";
+    public static final String CONTAINER_NAME = "2222.inlong.DataProxy.sz100001";
+    public static final String CONTAINER_IP = "127.0.0.1";
+    public static final String SOURCE_ID = "agent-source";
+    public static final String SOURCE_DATA_ID = "12069";
+    public static final String INLONG_GROUP_ID1 = "03a00000026";
+    public static final String INLONG_GROUP_ID2 = "03a00000126";
+    public static final String INLONG_STREAM_ID = "";
+    public static final String SINK_ID = "inlong5th-pulsar-sz";
+    public static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
+
+    /**
+     * mockChannel
+     * 
+     * @return
+     * @throws Exception
+     */
+    public static Channel mockChannel() throws Exception {
+        Transaction transaction = PowerMockito.mock(Transaction.class);
+        Channel channel = PowerMockito.mock(Channel.class);
+        PowerMockito.when(channel.getTransaction()).thenReturn(transaction);
+        PowerMockito.doNothing().when(transaction, "begin");
+        PowerMockito.doNothing().when(transaction, "commit");
+        PowerMockito.doNothing().when(transaction, "rollback");
+        PowerMockito.doNothing().when(transaction, "close");
+        Event event = mockEvent();
+        PowerMockito.when(channel.take()).thenReturn(event);
+        return channel;
+    }
+
+    /**
+     * mockEvent
+     * 
+     * @return
+     * @throws Exception
+     */
+    public static Event mockEvent() throws Exception {
+        Map<String, String> headers = new HashMap<>();
+        String sourceTime = String.valueOf(System.currentTimeMillis());
+        headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(sourceTime));
+        headers.put(Constants.HEADER_KEY_SOURCE_IP, CONTAINER_IP);
+        headers.put(Constants.HEADER_KEY_SOURCE_TIME, sourceTime);
+        headers.put(Constants.INLONG_GROUP_ID, INLONG_GROUP_ID1);
+        headers.put(Constants.TOPIC, SINK_DATA_ID);
+        byte[] body = "testContent".getBytes();
+        Event event = EventBuilder.withBody(body, headers);
+        return event;
+    }
+
+    /**
+     * mockPulsarClient
+     * 
+     * @return
+     * @throws Exception
+     */
+    @SuppressWarnings("unchecked")
+    public static PulsarClient mockPulsarClient() throws Exception {
+        ClientBuilder clientBuilder = PowerMockito.mock(ClientBuilder.class);
+        PowerMockito.mockStatic(PulsarClient.class);
+        //
+        PowerMockito.when(PulsarClient.builder()).thenReturn(clientBuilder);
+        PowerMockito.when(clientBuilder.serviceUrl(anyString())).thenReturn(clientBuilder);
+        PowerMockito.when(clientBuilder.authentication(any())).thenReturn(clientBuilder);
+        PulsarClient client = PowerMockito.mock(PulsarClient.class);
+        PowerMockito.when(clientBuilder.build()).thenReturn(client);
+        //
+        ProducerBuilder<byte[]> producerBuilder = PowerMockito.mock(ProducerBuilder.class);
+        PowerMockito.when(client.newProducer()).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.hashingScheme(any())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.batchingMaxBytes(anyInt())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.batchingMaxMessages(anyInt())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.batchingMaxPublishDelay(anyInt(), any())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.maxPendingMessagesAcrossPartitions(anyInt())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.compressionType(any())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.blockIfQueueFull(anyBoolean())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.roundRobinRouterBatchingPartitionSwitchFrequency(anyInt()))
+                .thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.batcherBuilder(any())).thenReturn(producerBuilder);
+        //
+        Producer<byte[]> producer = mockProducer();
+        PowerMockito.when(producerBuilder.clone()).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.topic(anyString())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+        PowerMockito.when(producerBuilder.create()).thenReturn(producer);
+        return client;
+    }
+
+    /**
+     * mockProducer
+     * 
+     * @return
+     * @throws Exception
+     */
+    @SuppressWarnings({"unchecked"})
+    public static Producer<byte[]> mockProducer() throws Exception {
+        Producer<byte[]> producer = PowerMockito.mock(Producer.class);
+        PowerMockito.doNothing().when(producer, "close");
+        TypedMessageBuilder<byte[]> msgBuilder = PowerMockito.mock(TypedMessageBuilder.class);
+        PowerMockito.when(producer.newMessage()).thenReturn(msgBuilder);
+        PowerMockito.when(msgBuilder.key(anyString())).thenReturn(msgBuilder);
+        PowerMockito.when(msgBuilder.properties(any())).thenReturn(msgBuilder);
+        PowerMockito.when(msgBuilder.value(any())).thenReturn(msgBuilder);
+        CompletableFuture<MessageId> future = PowerMockito.mock(CompletableFuture.class);
+        PowerMockito.when(future.whenCompleteAsync(any())).thenReturn(future);
+        PowerMockito.when(msgBuilder.sendAsync()).thenReturn(future);
+        return producer;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
new file mode 100644
index 0000000..4de61c1
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+proxy_cluster_name=proxy_inlong5th_sz
+metricDomains=DataProxy
+metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
+metricDomains.DataProxy.snapshotInterval=60000
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/flume.conf b/inlong-dataproxy/dataproxy-source/src/test/resources/flume.conf
new file mode 100644
index 0000000..d472f51
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/flume.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+proxy_inlong5th_sz.channels=ch-msg1
+proxy_inlong5th_sz.sinks=pulsar-sink-more1
+proxy_inlong5th_sz.sources=agent-source sdk-source
+
+proxy_inlong5th_sz.channels.ch-msg1.capacity=200000
+proxy_inlong5th_sz.channels.ch-msg1.keep-alive=0
+proxy_inlong5th_sz.channels.ch-msg1.transactionCapacity=2000
+proxy_inlong5th_sz.channels.ch-msg1.type=memory
+
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.channel=ch-msg1
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.type=org.apache.inlong.dataproxy.sink.pulsar.federation.PulsarFederationSink
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.maxBufferQueueSize=131072
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.reloadInterval=60000
+#context
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.idTopicConfig.03a00000026=pulsar-9xn9wp35pbxb/test/topic1
+#context
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.cacheClusterConfig.type=org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.cacheClusterConfig=cache_inlong5th_sz1
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.cacheClusterConfig.cache_inlong5th_sz1.serviceUrl=http://tdmq.ap-gz.tencentyun.com:8080
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.cacheClusterConfig.cache_inlong5th_sz1.authentication=xxxx
+
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.enableBatching=true
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.batchingMaxBytes=5242880
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.batchingMaxMessages=3000
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.batchingMaxPublishDelay=1
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.maxPendingMessages=1000
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.maxPendingMessagesAcrossPartitions=50000
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.sendTimeout=0
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.compressionType=SNAPPY
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.blockIfQueueFull=true
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.roundRobinRouterBatchingPartitionSwitchFrequency=10
+
+proxy_inlong5th_sz.sources.agent-source.type=com.tencent.pcg.atta.dataproxy.source.agent.SimpleTcpSource
+proxy_inlong5th_sz.sources.agent-source.selector.type=org.apache.flume.channel.ReplicatingChannelSelector
+proxy_inlong5th_sz.sources.agent-source.channels=ch-msg1
+proxy_inlong5th_sz.sources.agent-source.tcpNoDelay=true
+proxy_inlong5th_sz.sources.agent-source.keepAlive=true
+proxy_inlong5th_sz.sources.agent-source.highWaterMark=65536
+proxy_inlong5th_sz.sources.agent-source.receiveBufferSize=65536
+proxy_inlong5th_sz.sources.agent-source.sendBufferSize=65536
+proxy_inlong5th_sz.sources.agent-source.trafficClass=0
+proxy_inlong5th_sz.sources.agent-source.max-threads=32
+proxy_inlong5th_sz.sources.agent-source.connections=5000
+proxy_inlong5th_sz.sources.agent-source.msg-factory-name=com.tencent.pcg.atta.dataproxy.source.agent.ServerMessageFactory
+proxy_inlong5th_sz.sources.agent-source.message-handler-name=com.tencent.pcg.atta.dataproxy.source.agent.ServerMessageHandler
+proxy_inlong5th_sz.sources.agent-source.max-msg-length=10485760
+proxy_inlong5th_sz.sources.agent-source.reloadInterval=60000
+proxy_inlong5th_sz.sources.agent-source.idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader
+proxy_inlong5th_sz.sources.agent-source.idTopicConfig.03a00000026=pulsar-9xn9wp35pbxb/test/topic1
+
+proxy_inlong5th_sz.sources.sdk-source.type=com.tencent.pcg.atta.dataproxy.source.sdk.SimpleTcpSource
+proxy_inlong5th_sz.sources.sdk-source.selector.type=org.apache.flume.channel.ReplicatingChannelSelector
+proxy_inlong5th_sz.sources.sdk-source.channels=ch-msg1
+proxy_inlong5th_sz.sources.sdk-source.tcpNoDelay=true
+proxy_inlong5th_sz.sources.sdk-source.keepAlive=true
+proxy_inlong5th_sz.sources.sdk-source.highWaterMark=65536
+proxy_inlong5th_sz.sources.sdk-source.receiveBufferSize=65536
+proxy_inlong5th_sz.sources.sdk-source.sendBufferSize=65536
+proxy_inlong5th_sz.sources.sdk-source.trafficClass=0
+proxy_inlong5th_sz.sources.sdk-source.max-threads=32
+proxy_inlong5th_sz.sources.sdk-source.connections=5000
+proxy_inlong5th_sz.sources.sdk-source.msg-factory-name=com.tencent.pcg.atta.dataproxy.source.sdk.ServerMessageFactory
+proxy_inlong5th_sz.sources.sdk-source.message-handler-name=com.tencent.pcg.atta.dataproxy.source.sdk.ServerMessageHandler
+proxy_inlong5th_sz.sources.sdk-source.max-msg-length=10485760
+proxy_inlong5th_sz.sources.sdk-source.reloadInterval=60000
+proxy_inlong5th_sz.sources.sdk-source.idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader
+proxy_inlong5th_sz.sources.sdk-source.idTopicConfig.03a00000026=pulsar-9xn9wp35pbxb/test/topic1