You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/12 11:58:48 UTC

[incubator-inlong] branch master updated: [INLONG-1786] Inlong-common provide monitor indicator #1786 (#1788)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6353d91  [INLONG-1786] Inlong-common provide monitor indicator #1786 (#1788)
6353d91 is described below

commit 6353d9175944765ff71ca437cd18303ec4609508
Author: 卢春亮 <94...@qq.com>
AuthorDate: Fri Nov 12 19:58:40 2021 +0800

    [INLONG-1786] Inlong-common provide monitor indicator #1786 (#1788)
---
 .../inlong/commons/config/metrics/CountMetric.java |  41 +++++
 .../inlong/commons/config/metrics/Dimension.java   |  40 +++++
 .../inlong/commons/config/metrics/GaugeMetric.java |  41 +++++
 .../commons/config/metrics/MetricDomain.java       |  41 +++++
 .../inlong/commons/config/metrics/MetricItem.java  | 189 +++++++++++++++++++++
 .../commons/config/metrics/MetricItemMBean.java    |  58 +++++++
 .../commons/config/metrics/MetricItemSet.java      |  72 ++++++++
 .../commons/config/metrics/MetricItemSetMBean.java |  41 +++++
 .../commons/config/metrics/MetricRegister.java     |  75 ++++++++
 .../inlong/commons/config/metrics/MetricUtils.java |  91 ++++++++++
 .../inlong/commons/config/metrics/MetricValue.java |  48 ++++++
 .../config/metrics/item/AgentMetricItem.java       |  62 +++++++
 .../config/metrics/item/TestMetricItemMBean.java   | 108 ++++++++++++
 .../config/metrics/set/DataProxyMetricItem.java    |  80 +++++++++
 .../config/metrics/set/DataProxyMetricItemSet.java |  63 +++++++
 .../config/metrics/set/TestMetricItemSetMBean.java | 160 +++++++++++++++++
 16 files changed, 1210 insertions(+)

diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/CountMetric.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/CountMetric.java
new file mode 100644
index 0000000..490d40d
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/CountMetric.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target(FIELD)
+
+/**
+ * 
+ * CountMetric
+ */
+public @interface CountMetric {
+
+    /**
+     * metric name of count type, default value is field name.
+     *
+     * @return metric name
+     */
+    String name() default "";
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/Dimension.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/Dimension.java
new file mode 100644
index 0000000..31e9022
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/Dimension.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target(FIELD)
+/**
+ * 
+ * Dimension
+ */
+public @interface Dimension {
+
+    /**
+     * dimension name, default value is field name.
+     *
+     * @return dimension name
+     */
+    String name() default "";
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/GaugeMetric.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/GaugeMetric.java
new file mode 100644
index 0000000..86ce90a
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/GaugeMetric.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target(FIELD)
+
+/**
+ * 
+ * GaugeMetric
+ */
+public @interface GaugeMetric {
+
+    /**
+     * metric name of gauge type, default value is field name.
+     *
+     * @return metric name
+     */
+    String name() default "";
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricDomain.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricDomain.java
new file mode 100644
index 0000000..48317b6
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricDomain.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target(TYPE)
+
+/**
+ * 
+ * MetricDomain
+ */
+public @interface MetricDomain {
+
+    /**
+     * MBean domain name, default value is class name.
+     *
+     * @return metric name
+     */
+    String name() default "";
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItem.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItem.java
new file mode 100644
index 0000000..e5b81a2
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItem.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricItem
+ */
+public abstract class MetricItem implements MetricItemMBean {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(MetricItem.class);
+
+    private String key;
+    private Map<String, String> dimensions;
+    private Map<String, AtomicLong> countMetrics;
+    private Map<String, AtomicLong> gaugeMetrics;
+
+    /**
+     * getDimensionsKey
+     * 
+     * @return
+     */
+    @Override
+    public String getDimensionsKey() {
+        if (key != null) {
+            return key;
+        }
+        //
+        Map<String, String> dimensionMap = this.getDimensions();
+        this.key = MetricUtils.getDimensionsKey(dimensionMap);
+        return key;
+    }
+
+    /**
+     * getDimensions
+     * 
+     * @return
+     */
+    @Override
+    public Map<String, String> getDimensions() {
+        if (dimensions != null) {
+            return dimensions;
+        }
+        dimensions = new HashMap<>();
+
+        // get all fields
+        List<Field> fields = getDeclaredFieldsIncludingInherited(this.getClass());
+
+        // filter dimension fields
+        for (Field field : fields) {
+            field.setAccessible(true);
+            for (Annotation fieldAnnotation : field.getAnnotations()) {
+                if (fieldAnnotation instanceof Dimension) {
+                    Dimension dimension = (Dimension) fieldAnnotation;
+                    String name = dimension.name();
+                    name = (name != null && name.length() > 0) ? name : field.getName();
+                    try {
+                        Object fieldValue = field.get(this);
+                        String value = (fieldValue == null) ? "" : fieldValue.toString();
+                        dimensions.put(name, value);
+                    } catch (Throwable t) {
+                        LOGGER.error(t.getMessage(), t);
+                    }
+                    break;
+                }
+            }
+        }
+        return dimensions;
+    }
+
+    /**
+     * set dimensions
+     * 
+     * @param dimensions the dimensions to set
+     */
+    public void setDimensions(Map<String, String> dimensions) {
+        this.dimensions = new HashMap<String, String>();
+        this.dimensions.putAll(dimensions);
+    }
+
+    /**
+     * snapshot
+     * 
+     * @return
+     */
+    @Override
+    public Map<String, MetricValue> snapshot() {
+        if (this.countMetrics == null || this.gaugeMetrics == null) {
+            this.initMetricField();
+        }
+        //
+        Map<String, MetricValue> metrics = new HashMap<>();
+        this.countMetrics.forEach((key, value) -> {
+            metrics.put(key, MetricValue.of(key, value.getAndSet(0)));
+        });
+        this.gaugeMetrics.forEach((key, value) -> {
+            metrics.put(key, MetricValue.of(key, value.get()));
+        });
+        return metrics;
+    }
+
+    /**
+     * initMetricField
+     */
+    protected void initMetricField() {
+        this.countMetrics = new HashMap<>();
+        this.gaugeMetrics = new HashMap<>();
+
+        // get all fields
+        List<Field> fields = getDeclaredFieldsIncludingInherited(this.getClass());
+
+        // filter metric fields
+        for (Field field : fields) {
+            field.setAccessible(true);
+            for (Annotation fieldAnnotation : field.getAnnotations()) {
+                if (fieldAnnotation instanceof CountMetric) {
+                    CountMetric countMetric = (CountMetric) fieldAnnotation;
+                    String name = countMetric.name();
+                    name = (name != null && name.length() > 0) ? name : field.getName();
+                    try {
+                        Object fieldValue = field.get(this);
+                        if (fieldValue instanceof AtomicLong) {
+                            this.countMetrics.put(name, (AtomicLong) fieldValue);
+                        }
+                    } catch (Throwable t) {
+                        LOGGER.error(t.getMessage(), t);
+                    }
+                    break;
+                } else if (fieldAnnotation instanceof GaugeMetric) {
+                    GaugeMetric gaugeMetric = (GaugeMetric) fieldAnnotation;
+                    String name = gaugeMetric.name();
+                    name = (name != null && name.length() > 0) ? name : field.getName();
+                    try {
+                        Object fieldValue = field.get(this);
+                        if (fieldValue instanceof AtomicLong) {
+                            this.gaugeMetrics.put(name, (AtomicLong) fieldValue);
+                        }
+                    } catch (Throwable t) {
+                        LOGGER.error(t.getMessage(), t);
+                    }
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * Get declare fields.
+     * 
+     * @param  clazz
+     * @return
+     */
+    public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) {
+        List<Field> fields = new ArrayList<Field>();
+        // check whether parent exists
+        while (clazz != null) {
+            fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
+            clazz = clazz.getSuperclass();
+        }
+        return fields;
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemMBean.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemMBean.java
new file mode 100644
index 0000000..ef6cce9
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemMBean.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.util.Map;
+
+/**
+ * MetricItemMBean<br>
+ * Provide access interface of a metric item with JMX.<br>
+ * Decouple between metric item and monitor system, in particular scene, <br>
+ * inlong can depend on user-defined monitor system.
+ */
+public interface MetricItemMBean {
+
+    String ATTRIBUTE_KEY = "DimensionsKey";
+    String ATTRIBUTE_DIMENSIONS = "Dimensions";
+    String METHOD_SNAPSHOT = "snapshot";
+    char DOMAIN_SEPARATOR = ':';
+    char PROPERTY_SEPARATOR = ',';
+    char PROPERTY_EQUAL = '=';
+
+    /**
+     * getDimensionsKey
+     * 
+     * @return key string composed of key/value pair of dimensions.
+     */
+    String getDimensionsKey();
+
+    /**
+     * getDimensions
+     * 
+     * @return key/value pair of all dimensions.
+     */
+    Map<String, String> getDimensions();
+
+    /**
+     * snapshot
+     * 
+     * @return get snapshot all metric of item, CountMetric will get metric value and set 0 to value, <br>
+     *         GaugeMetric will only get metric value.
+     */
+    Map<String, MetricValue> snapshot();
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
new file mode 100644
index 0000000..20fea7f
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 
+ * MetricItemSet
+ */
+public abstract class MetricItemSet<T extends MetricItem> implements MetricItemSetMBean {
+
+    protected Map<String, T> itemMap = new ConcurrentHashMap<>();
+
+    /**
+     * createItem
+     * 
+     * @return
+     */
+    protected abstract T createItem();
+
+    /**
+     * findMetricItem
+     * 
+     * @param item
+     */
+    public T findMetricItem(Map<String, String> dimensions) {
+        String key = MetricUtils.getDimensionsKey(dimensions);
+        T currentItem = this.itemMap.get(key);
+        if (currentItem != null) {
+            return currentItem;
+        }
+        currentItem = createItem();
+        currentItem.setDimensions(dimensions);
+        T oldItem = this.itemMap.putIfAbsent(key, currentItem);
+        T returnItem = (oldItem == null) ? currentItem : oldItem;
+        return returnItem;
+    }
+
+    /**
+     * snapshot
+     * 
+     * @return
+     */
+    @Override
+    public List<MetricItem> snapshot() {
+        Map<String, T> oldItemMap = itemMap;
+        this.itemMap = new ConcurrentHashMap<>();
+        MetricUtils.sleepOneInterval();
+        List<MetricItem> result = new ArrayList<>(oldItemMap.size());
+        result.addAll(oldItemMap.values());
+        return result;
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSetMBean.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSetMBean.java
new file mode 100644
index 0000000..ffd0031
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSetMBean.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.util.List;
+
+/**
+ * 
+ * MetricItemSetMBean<br>
+ * Provide access interface of metric items with JMX.<br>
+ * Decouple between metric item and monitor system, in particular scene, <br>
+ * inlong can depend on user-defined monitor system.
+ */
+public interface MetricItemSetMBean {
+
+    String METHOD_SNAPSHOT = "snapshot";
+
+    /**
+     * snapshot
+     * 
+     * @return get snapshot all metric of item, CountMetric will get metric value and set 0 to value, <br>
+     *         GaugeMetric will only get metric value.
+     */
+    List<MetricItem> snapshot();
+
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
new file mode 100644
index 0000000..d4a5fc5
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricRegister
+ */
+public class MetricRegister {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(MetricRegister.class);
+
+    /**
+     * register MetricItem
+     * 
+     * @param obj
+     */
+    public static void register(MetricItem obj) {
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        StringBuilder beanName = new StringBuilder();
+        beanName.append(MetricUtils.getDomain(obj.getClass())).append(MetricItemMBean.DOMAIN_SEPARATOR)
+                .append(obj.getDimensionsKey());
+        String strBeanName = beanName.toString();
+        try {
+            ObjectName objName = new ObjectName(strBeanName);
+            mbs.registerMBean(obj, objName);
+        } catch (Exception ex) {
+            LOGGER.error("exception while register mbean:{},error:{}", strBeanName, ex.getMessage());
+            LOGGER.error(ex.getMessage(), ex);
+        }
+    }
+
+    /**
+     * register MetricItemSet
+     * 
+     * @param obj
+     */
+    public static void register(MetricItemSet<? extends MetricItem> obj) {
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        StringBuilder beanName = new StringBuilder();
+        beanName.append(MetricUtils.getDomain(obj.getClass())).append(MetricItemMBean.DOMAIN_SEPARATOR).append("type=")
+                .append(obj.getClass().toString());
+        String strBeanName = beanName.toString();
+        try {
+            ObjectName objName = new ObjectName(strBeanName);
+            mbs.registerMBean(obj, objName);
+        } catch (Exception ex) {
+            LOGGER.error("exception while register mbean:{},error:{}", strBeanName, ex.getMessage());
+            LOGGER.error(ex.getMessage(), ex);
+        }
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricUtils.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricUtils.java
new file mode 100644
index 0000000..df17fdf
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricUtils.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.lang.annotation.Annotation;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricUtils
+ */
+public class MetricUtils {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(MetricUtils.class);
+
+    /**
+     * getDomain
+     * 
+     * @param  cls
+     * @return
+     */
+    public static String getDomain(Class<?> cls) {
+        for (Annotation annotation : cls.getAnnotations()) {
+            if (annotation instanceof MetricDomain) {
+                MetricDomain domain = (MetricDomain) annotation;
+                String name = domain.name();
+                name = (name != null && name.length() > 0) ? name : cls.getName();
+                return name;
+            }
+        }
+        return cls.getName();
+    }
+
+    /**
+     * getDimensionsKey
+     * 
+     * @return
+     */
+    public static String getDimensionsKey(Map<String, String> dimensionMap) {
+        StringBuilder builder = new StringBuilder();
+        if (dimensionMap.size() <= 0) {
+            return "";
+        }
+        //
+        Set<String> fieldKeySet = dimensionMap.keySet();
+        List<String> fieldKeyList = new ArrayList<>(fieldKeySet.size());
+        fieldKeyList.addAll(fieldKeySet);
+        Collections.sort(fieldKeyList);
+        for (String fieldKey : fieldKeyList) {
+            String fieldValue = dimensionMap.get(fieldKey);
+            fieldValue = (fieldValue == null) ? "" : fieldValue;
+            builder.append(fieldKey).append(MetricItemMBean.PROPERTY_EQUAL).append(fieldValue)
+                    .append(MetricItemMBean.PROPERTY_SEPARATOR);
+        }
+        String key = builder.substring(0, builder.length() - 1);
+        return key;
+    }
+
+    /**
+     * sleepOneInterval
+     */
+    public static void sleepOneInterval() {
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            LOGGER.error(e.getMessage(), e);
+        }
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricValue.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricValue.java
new file mode 100644
index 0000000..b3e7751
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricValue.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+/**
+ * 
+ * MetricValue
+ */
+public class MetricValue {
+
+    public String name;
+    public long value;
+
+    /**
+     * Constructor
+     * 
+     * @param value
+     */
+    private MetricValue(String name, long value) {
+        this.name = name;
+        this.value = value;
+    }
+
+    /**
+     * of
+     * 
+     * @param  value
+     * @return
+     */
+    public static MetricValue of(String name, long value) {
+        return new MetricValue(name, value);
+    }
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/AgentMetricItem.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/AgentMetricItem.java
new file mode 100644
index 0000000..be0d0ec
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/AgentMetricItem.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.item;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.GaugeMetric;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+
+/**
+ * 
+ * AgentMetricItem, like PluginMetric
+ */
+@MetricDomain(name = "Agent")
+public class AgentMetricItem extends MetricItem {
+
+    @Dimension
+    public String module;
+    @Dimension
+    public String aspect;
+    @Dimension
+    public String tag;
+
+    @CountMetric
+    public AtomicLong readNum = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong sendNum = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong sendFailedNum = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong readFailedNum = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong readSuccessNum = new AtomicLong(0);
+
+    @CountMetric
+    public AtomicLong sendSuccessNum = new AtomicLong(0);
+    
+    @GaugeMetric
+    public AtomicLong runningTasks = new AtomicLong(0);
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/TestMetricItemMBean.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/TestMetricItemMBean.java
new file mode 100644
index 0000000..e7bf473
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/TestMetricItemMBean.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.item;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * TestMetricItem
+ */
+public class TestMetricItemMBean {
+
+    public static final String MODULE = "Plugin";
+    public static final String ASPECT = "PluginSummary";
+    public static final String TAG = "agent1";
+    private static AgentMetricItem item;
+
+    /**
+     * setup
+     */
+    @BeforeClass
+    public static void setup() {
+        item = new AgentMetricItem();
+        item.module = MODULE;
+        item.aspect = ASPECT;
+        item.tag = TAG;
+        MetricRegister.register(item);
+    }
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testResult() throws Exception {
+        // increase
+        item.readNum.incrementAndGet();
+        item.sendNum.addAndGet(100);
+        item.runningTasks.addAndGet(2);
+        //
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        StringBuilder beanName = new StringBuilder();
+        beanName.append(MetricUtils.getDomain(AgentMetricItem.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
+                .append("module=").append(MODULE).append(MetricItemMBean.PROPERTY_SEPARATOR)
+                .append("aspect=").append(ASPECT).append(MetricItemMBean.PROPERTY_SEPARATOR)
+                .append("tag=").append(TAG);
+        String strBeanName = beanName.toString();
+        ObjectName objName = new ObjectName(strBeanName);
+        {
+            Map<String, String> dimensions = (Map<String, String>) mbs.getAttribute(objName,
+                    MetricItemMBean.ATTRIBUTE_DIMENSIONS);
+            assertEquals(MODULE, dimensions.get("module"));
+            assertEquals(ASPECT, dimensions.get("aspect"));
+            assertEquals(TAG, dimensions.get("tag"));
+            Map<String, MetricValue> metricMap = (Map<String, MetricValue>) mbs.invoke(objName,
+                    MetricItemMBean.METHOD_SNAPSHOT, null, null);
+            assertEquals(1, metricMap.get("readNum").value);
+            assertEquals(100, metricMap.get("sendNum").value);
+            assertEquals(2, metricMap.get("runningTasks").value);
+        }
+        // increase
+        item.readNum.incrementAndGet();
+        item.sendNum.addAndGet(100);
+        item.runningTasks.addAndGet(2);
+        {
+            Map<String, String> dimensions = (Map<String, String>) mbs.getAttribute(objName,
+                    MetricItemMBean.ATTRIBUTE_DIMENSIONS);
+            assertEquals(MODULE, dimensions.get("module"));
+            assertEquals(ASPECT, dimensions.get("aspect"));
+            assertEquals(TAG, dimensions.get("tag"));
+            Map<String, MetricValue> metricMap = (Map<String, MetricValue>) mbs.invoke(objName,
+                    MetricItemMBean.METHOD_SNAPSHOT, null, null);
+            assertEquals(1, metricMap.get("readNum").value);
+            assertEquals(100, metricMap.get("sendNum").value);
+            assertEquals(4, metricMap.get("runningTasks").value);
+        }
+    }
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItem.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItem.java
new file mode 100644
index 0000000..b68550c
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItem.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.set;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+
+/**
+ * 
+ * DataProxyMetricItem
+ */
+@MetricDomain(name = "DataProxy")
+public class DataProxyMetricItem extends MetricItem {
+    @Dimension
+    public String setId;
+    @Dimension
+    public String containerName;
+    @Dimension
+    public String containerIp;
+    @Dimension
+    public String sourceId;
+    @Dimension
+    public String sourceDataId;
+    @Dimension
+    public String inlongGroupId;
+    @Dimension
+    public String inlongStreamId;
+    @Dimension
+    public String sinkId;
+    @Dimension
+    public String sinkDataId;
+    @CountMetric
+    public AtomicLong readSuccessCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readSuccessSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readFailCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readFailSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSuccessCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSuccessSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendFailCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendFailSize = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - sinkBeginTime(milliseconds)
+    public AtomicLong sinkDuration = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - sourceReceiveTime(milliseconds)
+    public AtomicLong nodeDuration = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - eventCreateTime(milliseconds)
+    public AtomicLong wholeDuration = new AtomicLong(0);
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
new file mode 100644
index 0000000..c50e5cd
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.set;
+
+import org.apache.inlong.commons.config.metrics.MetricItemSet;
+
+/**
+ * 
+ * DataProxyMetricItemSet
+ */
+public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+
+    private static DataProxyMetricItemSet instance;
+
+    /**
+     * Constructor
+     */
+    private DataProxyMetricItemSet() {
+    }
+
+    /**
+     * getInstance
+     * 
+     * @return
+     */
+    public static DataProxyMetricItemSet getInstance() {
+        if (instance != null) {
+            return instance;
+        }
+        synchronized (DataProxyMetricItemSet.class) {
+            if (instance == null) {
+                instance = new DataProxyMetricItemSet();
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * createItem
+     * 
+     * @return
+     */
+    @Override
+    protected DataProxyMetricItem createItem() {
+        return new DataProxyMetricItem();
+    }
+
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
new file mode 100644
index 0000000..4e70d63
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.set;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * TestMetricItemSetMBean
+ */
+public class TestMetricItemSetMBean {
+
+    public static final String SET_ID = "inlong5th_sz";
+    public static final String CONTAINER_NAME = "2222.inlong.DataProxy.sz100001";
+    public static final String CONTAINER_IP = "127.0.0.1";
+    private static final String SOURCE_ID = "agent-source";
+    private static final String SOURCE_DATA_ID = "12069";
+    private static final String INLONG_GROUP_ID1 = "03a00000026";
+    private static final String INLONG_GROUP_ID2 = "03a00000126";
+    private static final String INLONG_STREAM_ID = "";
+    private static final String SINK_ID = "inlong5th-pulsar-sz";
+    private static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
+    private static DataProxyMetricItemSet itemSet;
+    private static Map<String, String> dimSource;
+    private static Map<String, String> dimSink;
+
+    /**
+     * setup
+     */
+    @BeforeClass
+    public static void setup() {
+        itemSet = DataProxyMetricItemSet.getInstance();
+        MetricRegister.register(itemSet);
+        // prepare
+        DataProxyMetricItem itemSource = new DataProxyMetricItem();
+        itemSource.setId = SET_ID;
+        itemSource.containerName = CONTAINER_NAME;
+        itemSource.containerIp = CONTAINER_IP;
+        itemSource.sourceId = SOURCE_ID;
+        itemSource.sourceDataId = SOURCE_DATA_ID;
+        itemSource.inlongGroupId = INLONG_GROUP_ID1;
+        itemSource.inlongStreamId = INLONG_STREAM_ID;
+        dimSource = itemSource.getDimensions();
+        //
+        DataProxyMetricItem itemSink = new DataProxyMetricItem();
+        itemSink.setId = SET_ID;
+        itemSink.containerName = CONTAINER_NAME;
+        itemSink.containerIp = CONTAINER_IP;
+        itemSink.sinkId = SINK_ID;
+        itemSink.sinkDataId = SINK_DATA_ID;
+        itemSink.inlongGroupId = INLONG_GROUP_ID1;
+        itemSink.inlongStreamId = INLONG_STREAM_ID;
+        dimSink = itemSink.getDimensions();
+    }
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testResult() throws Exception {
+        // increase source
+        DataProxyMetricItem item = null;
+        item = itemSet.findMetricItem(dimSource);
+        item.readSuccessCount.incrementAndGet();
+        item.readSuccessSize.addAndGet(100);
+        String keySource1 = MetricUtils.getDimensionsKey(dimSource);
+        //
+        dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
+        item = itemSet.findMetricItem(dimSource);
+        item.readFailCount.addAndGet(20);
+        item.readFailSize.addAndGet(2000);
+        String keySource2 = MetricUtils.getDimensionsKey(dimSource);
+        // increase sink
+        item = itemSet.findMetricItem(dimSink);
+        item.sendCount.incrementAndGet();
+        item.sendSize.addAndGet(100);
+        item.sendSuccessCount.incrementAndGet();
+        item.sendSuccessSize.addAndGet(100);
+        String keySink1 = MetricUtils.getDimensionsKey(dimSink);
+        //
+        dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
+        item = itemSet.findMetricItem(dimSink);
+        item.sendCount.addAndGet(20);
+        item.sendSize.addAndGet(2000);
+        item.sendFailCount.addAndGet(20);
+        item.sendFailSize.addAndGet(2000);
+        String keySink2 = MetricUtils.getDimensionsKey(dimSink);
+        // report
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        StringBuilder beanName = new StringBuilder();
+        beanName.append(MetricUtils.getDomain(DataProxyMetricItemSet.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
+                .append("type=")
+                .append(DataProxyMetricItemSet.class.toString());
+        String strBeanName = beanName.toString();
+        ObjectName objName = new ObjectName(strBeanName);
+        {
+            List<MetricItem> items = (List<MetricItem>) mbs.invoke(objName, MetricItemSetMBean.METHOD_SNAPSHOT, null,
+                    null);
+            for (MetricItem itemObj : items) {
+                if (keySource1.equals(itemObj.getDimensionsKey())) {
+                    Map<String, MetricValue> metricMap = itemObj.snapshot();
+                    assertEquals(1, metricMap.get("readSuccessCount").value);
+                    assertEquals(100, metricMap.get("readSuccessSize").value);
+                } else if (keySource2.equals(itemObj.getDimensionsKey())) {
+                    Map<String, MetricValue> metricMap = itemObj.snapshot();
+                    assertEquals(20, metricMap.get("readFailCount").value);
+                    assertEquals(2000, metricMap.get("readFailSize").value);
+                } else if (keySink1.equals(itemObj.getDimensionsKey())) {
+                    Map<String, MetricValue> metricMap = itemObj.snapshot();
+                    assertEquals(1, metricMap.get("sendCount").value);
+                    assertEquals(100, metricMap.get("sendSize").value);
+                    assertEquals(1, metricMap.get("sendSuccessCount").value);
+                    assertEquals(100, metricMap.get("sendSuccessSize").value);
+                } else if (keySink2.equals(itemObj.getDimensionsKey())) {
+                    Map<String, MetricValue> metricMap = itemObj.snapshot();
+                    assertEquals(20, metricMap.get("sendCount").value);
+                    assertEquals(2000, metricMap.get("sendSize").value);
+                    assertEquals(20, metricMap.get("sendFailCount").value);
+                    assertEquals(2000, metricMap.get("sendFailSize").value);
+                } else {
+                    System.out.println("bad MetricItem:" + itemObj.getDimensionsKey());
+                }
+            }
+        }
+    }
+}