You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/12 11:58:48 UTC
[incubator-inlong] branch master updated: [INLONG-1786]
Inlong-common provide monitor indicator #1786 (#1788)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6353d91 [INLONG-1786] Inlong-common provide monitor indicator #1786 (#1788)
6353d91 is described below
commit 6353d9175944765ff71ca437cd18303ec4609508
Author: 卢春亮 <94...@qq.com>
AuthorDate: Fri Nov 12 19:58:40 2021 +0800
[INLONG-1786] Inlong-common provide monitor indicator #1786 (#1788)
---
.../inlong/commons/config/metrics/CountMetric.java | 41 +++++
.../inlong/commons/config/metrics/Dimension.java | 40 +++++
.../inlong/commons/config/metrics/GaugeMetric.java | 41 +++++
.../commons/config/metrics/MetricDomain.java | 41 +++++
.../inlong/commons/config/metrics/MetricItem.java | 189 +++++++++++++++++++++
.../commons/config/metrics/MetricItemMBean.java | 58 +++++++
.../commons/config/metrics/MetricItemSet.java | 72 ++++++++
.../commons/config/metrics/MetricItemSetMBean.java | 41 +++++
.../commons/config/metrics/MetricRegister.java | 75 ++++++++
.../inlong/commons/config/metrics/MetricUtils.java | 91 ++++++++++
.../inlong/commons/config/metrics/MetricValue.java | 48 ++++++
.../config/metrics/item/AgentMetricItem.java | 62 +++++++
.../config/metrics/item/TestMetricItemMBean.java | 108 ++++++++++++
.../config/metrics/set/DataProxyMetricItem.java | 80 +++++++++
.../config/metrics/set/DataProxyMetricItemSet.java | 63 +++++++
.../config/metrics/set/TestMetricItemSetMBean.java | 160 +++++++++++++++++
16 files changed, 1210 insertions(+)
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/CountMetric.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/CountMetric.java
new file mode 100644
index 0000000..490d40d
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/CountMetric.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target(FIELD)
+
+/**
+ *
+ * CountMetric
+ */
+public @interface CountMetric {
+
+ /**
+ * metric name of count type, default value is field name.
+ *
+ * @return metric name
+ */
+ String name() default "";
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/Dimension.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/Dimension.java
new file mode 100644
index 0000000..31e9022
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/Dimension.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target(FIELD)
+/**
+ *
+ * Dimension
+ */
+public @interface Dimension {
+
+ /**
+ * dimension name, default value is field name.
+ *
+ * @return dimension name
+ */
+ String name() default "";
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/GaugeMetric.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/GaugeMetric.java
new file mode 100644
index 0000000..86ce90a
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/GaugeMetric.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target(FIELD)
+
+/**
+ *
+ * GaugeMetric
+ */
+public @interface GaugeMetric {
+
+ /**
+ * metric name of gauge type, default value is field name.
+ *
+ * @return metric name
+ */
+ String name() default "";
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricDomain.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricDomain.java
new file mode 100644
index 0000000..48317b6
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricDomain.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target(TYPE)
+
+/**
+ *
+ * MetricDomain
+ */
+public @interface MetricDomain {
+
+ /**
+ * MBean domain name, default value is class name.
+ *
+ * @return metric name
+ */
+ String name() default "";
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItem.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItem.java
new file mode 100644
index 0000000..e5b81a2
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItem.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * MetricItem
+ */
+public abstract class MetricItem implements MetricItemMBean {
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(MetricItem.class);
+
+ private String key;
+ private Map<String, String> dimensions;
+ private Map<String, AtomicLong> countMetrics;
+ private Map<String, AtomicLong> gaugeMetrics;
+
+ /**
+ * getDimensionsKey
+ *
+ * @return
+ */
+ @Override
+ public String getDimensionsKey() {
+ if (key != null) {
+ return key;
+ }
+ //
+ Map<String, String> dimensionMap = this.getDimensions();
+ this.key = MetricUtils.getDimensionsKey(dimensionMap);
+ return key;
+ }
+
+ /**
+ * getDimensions
+ *
+ * @return
+ */
+ @Override
+ public Map<String, String> getDimensions() {
+ if (dimensions != null) {
+ return dimensions;
+ }
+ dimensions = new HashMap<>();
+
+ // get all fields
+ List<Field> fields = getDeclaredFieldsIncludingInherited(this.getClass());
+
+ // filter dimension fields
+ for (Field field : fields) {
+ field.setAccessible(true);
+ for (Annotation fieldAnnotation : field.getAnnotations()) {
+ if (fieldAnnotation instanceof Dimension) {
+ Dimension dimension = (Dimension) fieldAnnotation;
+ String name = dimension.name();
+ name = (name != null && name.length() > 0) ? name : field.getName();
+ try {
+ Object fieldValue = field.get(this);
+ String value = (fieldValue == null) ? "" : fieldValue.toString();
+ dimensions.put(name, value);
+ } catch (Throwable t) {
+ LOGGER.error(t.getMessage(), t);
+ }
+ break;
+ }
+ }
+ }
+ return dimensions;
+ }
+
+ /**
+ * set dimensions
+ *
+ * @param dimensions the dimensions to set
+ */
+ public void setDimensions(Map<String, String> dimensions) {
+ this.dimensions = new HashMap<String, String>();
+ this.dimensions.putAll(dimensions);
+ }
+
+ /**
+ * snapshot
+ *
+ * @return
+ */
+ @Override
+ public Map<String, MetricValue> snapshot() {
+ if (this.countMetrics == null || this.gaugeMetrics == null) {
+ this.initMetricField();
+ }
+ //
+ Map<String, MetricValue> metrics = new HashMap<>();
+ this.countMetrics.forEach((key, value) -> {
+ metrics.put(key, MetricValue.of(key, value.getAndSet(0)));
+ });
+ this.gaugeMetrics.forEach((key, value) -> {
+ metrics.put(key, MetricValue.of(key, value.get()));
+ });
+ return metrics;
+ }
+
+ /**
+ * initMetricField
+ */
+ protected void initMetricField() {
+ this.countMetrics = new HashMap<>();
+ this.gaugeMetrics = new HashMap<>();
+
+ // get all fields
+ List<Field> fields = getDeclaredFieldsIncludingInherited(this.getClass());
+
+ // filter metric fields
+ for (Field field : fields) {
+ field.setAccessible(true);
+ for (Annotation fieldAnnotation : field.getAnnotations()) {
+ if (fieldAnnotation instanceof CountMetric) {
+ CountMetric countMetric = (CountMetric) fieldAnnotation;
+ String name = countMetric.name();
+ name = (name != null && name.length() > 0) ? name : field.getName();
+ try {
+ Object fieldValue = field.get(this);
+ if (fieldValue instanceof AtomicLong) {
+ this.countMetrics.put(name, (AtomicLong) fieldValue);
+ }
+ } catch (Throwable t) {
+ LOGGER.error(t.getMessage(), t);
+ }
+ break;
+ } else if (fieldAnnotation instanceof GaugeMetric) {
+ GaugeMetric gaugeMetric = (GaugeMetric) fieldAnnotation;
+ String name = gaugeMetric.name();
+ name = (name != null && name.length() > 0) ? name : field.getName();
+ try {
+ Object fieldValue = field.get(this);
+ if (fieldValue instanceof AtomicLong) {
+ this.gaugeMetrics.put(name, (AtomicLong) fieldValue);
+ }
+ } catch (Throwable t) {
+ LOGGER.error(t.getMessage(), t);
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Get declare fields.
+ *
+ * @param clazz
+ * @return
+ */
+ public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) {
+ List<Field> fields = new ArrayList<Field>();
+ // check whether parent exists
+ while (clazz != null) {
+ fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
+ clazz = clazz.getSuperclass();
+ }
+ return fields;
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemMBean.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemMBean.java
new file mode 100644
index 0000000..ef6cce9
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemMBean.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.util.Map;
+
+/**
+ * MetricItemMBean<br>
+ * Provide access interface of a metric item with JMX.<br>
+ * Decouple between metric item and monitor system, in particular scene, <br>
+ * inlong can depend on user-defined monitor system.
+ */
+public interface MetricItemMBean {
+
+ String ATTRIBUTE_KEY = "DimensionsKey";
+ String ATTRIBUTE_DIMENSIONS = "Dimensions";
+ String METHOD_SNAPSHOT = "snapshot";
+ char DOMAIN_SEPARATOR = ':';
+ char PROPERTY_SEPARATOR = ',';
+ char PROPERTY_EQUAL = '=';
+
+ /**
+ * getDimensionsKey
+ *
+ * @return key string composed of key/value pair of dimensions.
+ */
+ String getDimensionsKey();
+
+ /**
+ * getDimensions
+ *
+ * @return key/value pair of all dimensions.
+ */
+ Map<String, String> getDimensions();
+
+ /**
+ * snapshot
+ *
+ * @return get snapshot all metric of item, CountMetric will get metric value and set 0 to value, <br>
+ * GaugeMetric will only get metric value.
+ */
+ Map<String, MetricValue> snapshot();
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
new file mode 100644
index 0000000..20fea7f
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * MetricItemSet
+ */
+public abstract class MetricItemSet<T extends MetricItem> implements MetricItemSetMBean {
+
+ protected Map<String, T> itemMap = new ConcurrentHashMap<>();
+
+ /**
+ * createItem
+ *
+ * @return
+ */
+ protected abstract T createItem();
+
+ /**
+ * findMetricItem
+ *
+ * @param item
+ */
+ public T findMetricItem(Map<String, String> dimensions) {
+ String key = MetricUtils.getDimensionsKey(dimensions);
+ T currentItem = this.itemMap.get(key);
+ if (currentItem != null) {
+ return currentItem;
+ }
+ currentItem = createItem();
+ currentItem.setDimensions(dimensions);
+ T oldItem = this.itemMap.putIfAbsent(key, currentItem);
+ T returnItem = (oldItem == null) ? currentItem : oldItem;
+ return returnItem;
+ }
+
+ /**
+ * snapshot
+ *
+ * @return
+ */
+ @Override
+ public List<MetricItem> snapshot() {
+ Map<String, T> oldItemMap = itemMap;
+ this.itemMap = new ConcurrentHashMap<>();
+ MetricUtils.sleepOneInterval();
+ List<MetricItem> result = new ArrayList<>(oldItemMap.size());
+ result.addAll(oldItemMap.values());
+ return result;
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSetMBean.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSetMBean.java
new file mode 100644
index 0000000..ffd0031
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSetMBean.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.util.List;
+
+/**
+ *
+ * MetricItemSetMBean<br>
+ * Provide access interface of metric items with JMX.<br>
+ * Decouple between metric item and monitor system, in particular scene, <br>
+ * inlong can depend on user-defined monitor system.
+ */
+public interface MetricItemSetMBean {
+
+ String METHOD_SNAPSHOT = "snapshot";
+
+ /**
+ * snapshot
+ *
+ * @return get snapshot all metric of item, CountMetric will get metric value and set 0 to value, <br>
+ * GaugeMetric will only get metric value.
+ */
+ List<MetricItem> snapshot();
+
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
new file mode 100644
index 0000000..d4a5fc5
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * MetricRegister
+ */
+public class MetricRegister {
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(MetricRegister.class);
+
+ /**
+ * register MetricItem
+ *
+ * @param obj
+ */
+ public static void register(MetricItem obj) {
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ StringBuilder beanName = new StringBuilder();
+ beanName.append(MetricUtils.getDomain(obj.getClass())).append(MetricItemMBean.DOMAIN_SEPARATOR)
+ .append(obj.getDimensionsKey());
+ String strBeanName = beanName.toString();
+ try {
+ ObjectName objName = new ObjectName(strBeanName);
+ mbs.registerMBean(obj, objName);
+ } catch (Exception ex) {
+ LOGGER.error("exception while register mbean:{},error:{}", strBeanName, ex.getMessage());
+ LOGGER.error(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * register MetricItemSet
+ *
+ * @param obj
+ */
+ public static void register(MetricItemSet<? extends MetricItem> obj) {
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ StringBuilder beanName = new StringBuilder();
+ beanName.append(MetricUtils.getDomain(obj.getClass())).append(MetricItemMBean.DOMAIN_SEPARATOR).append("type=")
+ .append(obj.getClass().toString());
+ String strBeanName = beanName.toString();
+ try {
+ ObjectName objName = new ObjectName(strBeanName);
+ mbs.registerMBean(obj, objName);
+ } catch (Exception ex) {
+ LOGGER.error("exception while register mbean:{},error:{}", strBeanName, ex.getMessage());
+ LOGGER.error(ex.getMessage(), ex);
+ }
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricUtils.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricUtils.java
new file mode 100644
index 0000000..df17fdf
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricUtils.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+import java.lang.annotation.Annotation;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * MetricUtils
+ */
+public class MetricUtils {
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(MetricUtils.class);
+
+ /**
+ * getDomain
+ *
+ * @param cls
+ * @return
+ */
+ public static String getDomain(Class<?> cls) {
+ for (Annotation annotation : cls.getAnnotations()) {
+ if (annotation instanceof MetricDomain) {
+ MetricDomain domain = (MetricDomain) annotation;
+ String name = domain.name();
+ name = (name != null && name.length() > 0) ? name : cls.getName();
+ return name;
+ }
+ }
+ return cls.getName();
+ }
+
+ /**
+ * getDimensionsKey
+ *
+ * @return
+ */
+ public static String getDimensionsKey(Map<String, String> dimensionMap) {
+ StringBuilder builder = new StringBuilder();
+ if (dimensionMap.size() <= 0) {
+ return "";
+ }
+ //
+ Set<String> fieldKeySet = dimensionMap.keySet();
+ List<String> fieldKeyList = new ArrayList<>(fieldKeySet.size());
+ fieldKeyList.addAll(fieldKeySet);
+ Collections.sort(fieldKeyList);
+ for (String fieldKey : fieldKeyList) {
+ String fieldValue = dimensionMap.get(fieldKey);
+ fieldValue = (fieldValue == null) ? "" : fieldValue;
+ builder.append(fieldKey).append(MetricItemMBean.PROPERTY_EQUAL).append(fieldValue)
+ .append(MetricItemMBean.PROPERTY_SEPARATOR);
+ }
+ String key = builder.substring(0, builder.length() - 1);
+ return key;
+ }
+
+ /**
+ * sleepOneInterval
+ */
+ public static void sleepOneInterval() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricValue.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricValue.java
new file mode 100644
index 0000000..b3e7751
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricValue.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics;
+
+/**
+ *
+ * MetricValue
+ */
+public class MetricValue {
+
+ public String name;
+ public long value;
+
+ /**
+ * Constructor
+ *
+ * @param value
+ */
+ private MetricValue(String name, long value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ /**
+ * of
+ *
+ * @param value
+ * @return
+ */
+ public static MetricValue of(String name, long value) {
+ return new MetricValue(name, value);
+ }
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/AgentMetricItem.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/AgentMetricItem.java
new file mode 100644
index 0000000..be0d0ec
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/AgentMetricItem.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.item;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.GaugeMetric;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+
+/**
+ *
+ * AgentMetricItem, like PluginMetric
+ */
+@MetricDomain(name = "Agent")
+public class AgentMetricItem extends MetricItem {
+
+ @Dimension
+ public String module;
+ @Dimension
+ public String aspect;
+ @Dimension
+ public String tag;
+
+ @CountMetric
+ public AtomicLong readNum = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong sendNum = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong sendFailedNum = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong readFailedNum = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong readSuccessNum = new AtomicLong(0);
+
+ @CountMetric
+ public AtomicLong sendSuccessNum = new AtomicLong(0);
+
+ @GaugeMetric
+ public AtomicLong runningTasks = new AtomicLong(0);
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/TestMetricItemMBean.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/TestMetricItemMBean.java
new file mode 100644
index 0000000..e7bf473
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/item/TestMetricItemMBean.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.item;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * TestMetricItem
+ */
+public class TestMetricItemMBean {
+
+ public static final String MODULE = "Plugin";
+ public static final String ASPECT = "PluginSummary";
+ public static final String TAG = "agent1";
+ private static AgentMetricItem item;
+
+ /**
+ * setup
+ */
+ @BeforeClass
+ public static void setup() {
+ item = new AgentMetricItem();
+ item.module = MODULE;
+ item.aspect = ASPECT;
+ item.tag = TAG;
+ MetricRegister.register(item);
+ }
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResult() throws Exception {
+ // increase
+ item.readNum.incrementAndGet();
+ item.sendNum.addAndGet(100);
+ item.runningTasks.addAndGet(2);
+ //
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ StringBuilder beanName = new StringBuilder();
+ beanName.append(MetricUtils.getDomain(AgentMetricItem.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
+ .append("module=").append(MODULE).append(MetricItemMBean.PROPERTY_SEPARATOR)
+ .append("aspect=").append(ASPECT).append(MetricItemMBean.PROPERTY_SEPARATOR)
+ .append("tag=").append(TAG);
+ String strBeanName = beanName.toString();
+ ObjectName objName = new ObjectName(strBeanName);
+ {
+ Map<String, String> dimensions = (Map<String, String>) mbs.getAttribute(objName,
+ MetricItemMBean.ATTRIBUTE_DIMENSIONS);
+ assertEquals(MODULE, dimensions.get("module"));
+ assertEquals(ASPECT, dimensions.get("aspect"));
+ assertEquals(TAG, dimensions.get("tag"));
+ Map<String, MetricValue> metricMap = (Map<String, MetricValue>) mbs.invoke(objName,
+ MetricItemMBean.METHOD_SNAPSHOT, null, null);
+ assertEquals(1, metricMap.get("readNum").value);
+ assertEquals(100, metricMap.get("sendNum").value);
+ assertEquals(2, metricMap.get("runningTasks").value);
+ }
+ // increase
+ item.readNum.incrementAndGet();
+ item.sendNum.addAndGet(100);
+ item.runningTasks.addAndGet(2);
+ {
+ Map<String, String> dimensions = (Map<String, String>) mbs.getAttribute(objName,
+ MetricItemMBean.ATTRIBUTE_DIMENSIONS);
+ assertEquals(MODULE, dimensions.get("module"));
+ assertEquals(ASPECT, dimensions.get("aspect"));
+ assertEquals(TAG, dimensions.get("tag"));
+ Map<String, MetricValue> metricMap = (Map<String, MetricValue>) mbs.invoke(objName,
+ MetricItemMBean.METHOD_SNAPSHOT, null, null);
+ assertEquals(1, metricMap.get("readNum").value);
+ assertEquals(100, metricMap.get("sendNum").value);
+ assertEquals(4, metricMap.get("runningTasks").value);
+ }
+ }
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItem.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItem.java
new file mode 100644
index 0000000..b68550c
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItem.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.set;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+
+/**
+ *
+ * DataProxyMetricItem
+ */
+@MetricDomain(name = "DataProxy")
+public class DataProxyMetricItem extends MetricItem {
+ @Dimension
+ public String setId;
+ @Dimension
+ public String containerName;
+ @Dimension
+ public String containerIp;
+ @Dimension
+ public String sourceId;
+ @Dimension
+ public String sourceDataId;
+ @Dimension
+ public String inlongGroupId;
+ @Dimension
+ public String inlongStreamId;
+ @Dimension
+ public String sinkId;
+ @Dimension
+ public String sinkDataId;
+ @CountMetric
+ public AtomicLong readSuccessCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readSuccessSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readFailCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readFailSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSuccessCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSuccessSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendFailCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendFailSize = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - sinkBeginTime(milliseconds)
+ public AtomicLong sinkDuration = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - sourceReceiveTime(milliseconds)
+ public AtomicLong nodeDuration = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - eventCreateTime(milliseconds)
+ public AtomicLong wholeDuration = new AtomicLong(0);
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
new file mode 100644
index 0000000..c50e5cd
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.set;
+
+import org.apache.inlong.commons.config.metrics.MetricItemSet;
+
+/**
+ *
+ * DataProxyMetricItemSet
+ */
+public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+
+ private static DataProxyMetricItemSet instance;
+
+ /**
+ * Constructor
+ */
+ private DataProxyMetricItemSet() {
+ }
+
+ /**
+ * getInstance
+ *
+ * @return
+ */
+ public static DataProxyMetricItemSet getInstance() {
+ if (instance != null) {
+ return instance;
+ }
+ synchronized (DataProxyMetricItemSet.class) {
+ if (instance == null) {
+ instance = new DataProxyMetricItemSet();
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * createItem
+ *
+ * @return
+ */
+ @Override
+ protected DataProxyMetricItem createItem() {
+ return new DataProxyMetricItem();
+ }
+
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
new file mode 100644
index 0000000..4e70d63
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.commons.config.metrics.set;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * TestMetricItemSetMBean
+ */
+public class TestMetricItemSetMBean {
+
+ public static final String SET_ID = "inlong5th_sz";
+ public static final String CONTAINER_NAME = "2222.inlong.DataProxy.sz100001";
+ public static final String CONTAINER_IP = "127.0.0.1";
+ private static final String SOURCE_ID = "agent-source";
+ private static final String SOURCE_DATA_ID = "12069";
+ private static final String INLONG_GROUP_ID1 = "03a00000026";
+ private static final String INLONG_GROUP_ID2 = "03a00000126";
+ private static final String INLONG_STREAM_ID = "";
+ private static final String SINK_ID = "inlong5th-pulsar-sz";
+ private static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
+ private static DataProxyMetricItemSet itemSet;
+ private static Map<String, String> dimSource;
+ private static Map<String, String> dimSink;
+
+ /**
+ * setup
+ */
+ @BeforeClass
+ public static void setup() {
+ itemSet = DataProxyMetricItemSet.getInstance();
+ MetricRegister.register(itemSet);
+ // prepare
+ DataProxyMetricItem itemSource = new DataProxyMetricItem();
+ itemSource.setId = SET_ID;
+ itemSource.containerName = CONTAINER_NAME;
+ itemSource.containerIp = CONTAINER_IP;
+ itemSource.sourceId = SOURCE_ID;
+ itemSource.sourceDataId = SOURCE_DATA_ID;
+ itemSource.inlongGroupId = INLONG_GROUP_ID1;
+ itemSource.inlongStreamId = INLONG_STREAM_ID;
+ dimSource = itemSource.getDimensions();
+ //
+ DataProxyMetricItem itemSink = new DataProxyMetricItem();
+ itemSink.setId = SET_ID;
+ itemSink.containerName = CONTAINER_NAME;
+ itemSink.containerIp = CONTAINER_IP;
+ itemSink.sinkId = SINK_ID;
+ itemSink.sinkDataId = SINK_DATA_ID;
+ itemSink.inlongGroupId = INLONG_GROUP_ID1;
+ itemSink.inlongStreamId = INLONG_STREAM_ID;
+ dimSink = itemSink.getDimensions();
+ }
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResult() throws Exception {
+ // increase source
+ DataProxyMetricItem item = null;
+ item = itemSet.findMetricItem(dimSource);
+ item.readSuccessCount.incrementAndGet();
+ item.readSuccessSize.addAndGet(100);
+ String keySource1 = MetricUtils.getDimensionsKey(dimSource);
+ //
+ dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
+ item = itemSet.findMetricItem(dimSource);
+ item.readFailCount.addAndGet(20);
+ item.readFailSize.addAndGet(2000);
+ String keySource2 = MetricUtils.getDimensionsKey(dimSource);
+ // increase sink
+ item = itemSet.findMetricItem(dimSink);
+ item.sendCount.incrementAndGet();
+ item.sendSize.addAndGet(100);
+ item.sendSuccessCount.incrementAndGet();
+ item.sendSuccessSize.addAndGet(100);
+ String keySink1 = MetricUtils.getDimensionsKey(dimSink);
+ //
+ dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
+ item = itemSet.findMetricItem(dimSink);
+ item.sendCount.addAndGet(20);
+ item.sendSize.addAndGet(2000);
+ item.sendFailCount.addAndGet(20);
+ item.sendFailSize.addAndGet(2000);
+ String keySink2 = MetricUtils.getDimensionsKey(dimSink);
+ // report
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ StringBuilder beanName = new StringBuilder();
+ beanName.append(MetricUtils.getDomain(DataProxyMetricItemSet.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
+ .append("type=")
+ .append(DataProxyMetricItemSet.class.toString());
+ String strBeanName = beanName.toString();
+ ObjectName objName = new ObjectName(strBeanName);
+ {
+ List<MetricItem> items = (List<MetricItem>) mbs.invoke(objName, MetricItemSetMBean.METHOD_SNAPSHOT, null,
+ null);
+ for (MetricItem itemObj : items) {
+ if (keySource1.equals(itemObj.getDimensionsKey())) {
+ Map<String, MetricValue> metricMap = itemObj.snapshot();
+ assertEquals(1, metricMap.get("readSuccessCount").value);
+ assertEquals(100, metricMap.get("readSuccessSize").value);
+ } else if (keySource2.equals(itemObj.getDimensionsKey())) {
+ Map<String, MetricValue> metricMap = itemObj.snapshot();
+ assertEquals(20, metricMap.get("readFailCount").value);
+ assertEquals(2000, metricMap.get("readFailSize").value);
+ } else if (keySink1.equals(itemObj.getDimensionsKey())) {
+ Map<String, MetricValue> metricMap = itemObj.snapshot();
+ assertEquals(1, metricMap.get("sendCount").value);
+ assertEquals(100, metricMap.get("sendSize").value);
+ assertEquals(1, metricMap.get("sendSuccessCount").value);
+ assertEquals(100, metricMap.get("sendSuccessSize").value);
+ } else if (keySink2.equals(itemObj.getDimensionsKey())) {
+ Map<String, MetricValue> metricMap = itemObj.snapshot();
+ assertEquals(20, metricMap.get("sendCount").value);
+ assertEquals(2000, metricMap.get("sendSize").value);
+ assertEquals(20, metricMap.get("sendFailCount").value);
+ assertEquals(2000, metricMap.get("sendFailSize").value);
+ } else {
+ System.out.println("bad MetricItem:" + itemObj.getDimensionsKey());
+ }
+ }
+ }
+ }
+}