You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/10 11:22:11 UTC

[inlong] 01/03: [INLONG-7198][Sort] Support audit of Apache Hudi (#7181)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f58ea9f03e2e6c7f22e754d91d5545addcc5a8bf
Author: feat <fe...@outlook.com>
AuthorDate: Tue Jan 10 15:36:43 2023 +0800

    [INLONG-7198][Sort] Support audit of Apache Hudi (#7181)
---
 inlong-sort/sort-connectors/hudi/README.md         |  22 +++
 .../inlong/sort/hudi/metric/HudiAuditReporter.java | 173 +++++++++++++++++++++
 .../inlong/sort/hudi/metric/HudiMetricsConfig.java | 134 ++++++++++++++++
 .../inlong/sort/hudi/metric/HudiMetricsConst.java  |  31 ++++
 .../inlong/sort/hudi/metric/HudiMetricsUtil.java   |  32 ++++
 .../sort/hudi/metric/InLongHudiAuditReporter.java  |  90 +++++++++++
 .../sort/hudi/metric/HudiMetricsConfigTest.java    | 165 ++++++++++++++++++++
 7 files changed, 647 insertions(+)

diff --git a/inlong-sort/sort-connectors/hudi/README.md b/inlong-sort/sort-connectors/hudi/README.md
new file mode 100644
index 000000000..6180ea858
--- /dev/null
+++ b/inlong-sort/sort-connectors/hudi/README.md
@@ -0,0 +1,22 @@
+# Hudi Sort connector
+
+## Metric reporter settings
+
+```property
+hoodie.metrics.on=true
+hoodie.metrics.reporter.class=org.apache.inlong.sort.hudi.metric.InLongHudiAuditReporter
+hoodie.metrics.reporter.metricsname.prefix={custom metric name prefix}
+hoodie.metrics.inlonghudi.report.period.seconds=30
+inlong.metric.labels={inlong metric label}
+metrics.audit.proxy.hosts={inlong metric hosts}
+```
+
+| property                                        | option   | default value                                         | docs                                                                                                        |
+|-------------------------------------------------|----------|-------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| hoodie.metrics.on                               | required | false                                                 | must be 'true'                                                                                              |
+| hoodie.metrics.reporter.class                   | required | org.apache.inlong.sort.hudi.metric.InLongHudiAuditReporter | must be 'org.apache.inlong.sort.hudi.metric.InLongHudiAuditReporter'                                             |
+| hoodie.metrics.reporter.metricsname.prefix      | option   | -                                                     | The prefix given to the metrics names.                                                                      |
+| hoodie.metrics.inlonghudi.report.period.seconds | required | 30                                                    | InLongHudi reporting period in seconds. Default to 30.                                                      |
+| inlong.metric.labels                            | required | -                                                     | INLONG metric labels, format is 'key1=value1&key2=value2', default is 'groupId=xxx&streamId=xxx&nodeId=xxx' |
+| metrics.audit.proxy.hosts                       | required | -                                                     | Audit proxy host address for reporting audit metrics. e.g. 127.0.0.1:10081,0.0.0.1:10081                    | 
+
diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiAuditReporter.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiAuditReporter.java
new file mode 100644
index 000000000..83f5b4d56
--- /dev/null
+++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiAuditReporter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.metric;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hudi.com.codahale.metrics.Counter;
+import org.apache.hudi.com.codahale.metrics.Gauge;
+import org.apache.hudi.com.codahale.metrics.Histogram;
+import org.apache.hudi.com.codahale.metrics.Meter;
+import org.apache.hudi.com.codahale.metrics.MetricFilter;
+import org.apache.hudi.com.codahale.metrics.MetricRegistry;
+import org.apache.hudi.com.codahale.metrics.ScheduledReporter;
+import org.apache.hudi.com.codahale.metrics.Timer;
+import org.apache.inlong.audit.AuditOperator;
+import org.apache.inlong.sort.base.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.GROUP_ID;
+import static org.apache.inlong.sort.base.Constants.STREAM_ID;
+import static org.apache.inlong.sort.hudi.metric.HudiMetricsConst.ACTION_TYPES;
+import static org.apache.inlong.sort.hudi.metric.HudiMetricsConst.METRIC_TOTAL_BYTES_WRITTEN;
+import static org.apache.inlong.sort.hudi.metric.HudiMetricsConst.METRIC_TOTAL_RECORDS_WRITTEN;
+import static org.apache.inlong.sort.hudi.metric.HudiMetricsUtil.getMetricsName;
+
+/**
+ * The schedule reporter for submit rowCount and rowSize of writing hudi
+ */
+public class HudiAuditReporter extends ScheduledReporter {
+
+    private static final String IP_OR_HOST_PORT = "^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
+            + "2}|[1-9]\\d{"
+            + "3}|[1-5]\\d{"
+            + "4}|6[0-4]\\d{"
+            + "3}|65[0-4]\\d{"
+            + "2}|655[0-2]\\d|6553[0-5])$";
+    private static final Logger LOG = LoggerFactory.getLogger(HudiAuditReporter.class);
+    private final LinkedHashMap<String, String> labels;
+    private Set<String> byteMetricNames;
+    private Set<String> recordMetricNames;
+    private AuditOperator auditOperator;
+
+    protected HudiAuditReporter(
+            String inLongLabels,
+            String inLongAudit,
+            String metricNamePrefix,
+            MetricRegistry registry,
+            String name,
+            MetricFilter filter,
+            TimeUnit rateUnit,
+            TimeUnit durationUnit) {
+        super(registry, name, filter, rateUnit, durationUnit);
+        LOG.info("Create HudiAuditReporter, inLongLabels: {}, inLongAudit: {}, reportName: {}.",
+                inLongLabels, inLongAudit, name);
+
+        labels = new LinkedHashMap<>();
+        String[] inLongLabelArray = inLongLabels.split(DELIMITER);
+        Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")),
+                "InLong metric label format must be xxx=xxx");
+        Stream.of(inLongLabelArray).forEach(label -> {
+            String key = label.substring(0, label.indexOf('='));
+            String value = label.substring(label.indexOf('=') + 1);
+            labels.put(key, value);
+        });
+        String ipPorts = inLongAudit;
+        HashSet<String> ipPortList = new HashSet<>();
+
+        if (ipPorts != null) {
+            Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID),
+                    "groupId and streamId must be set when enable inlong audit collect.");
+            String[] ipPortStrs = inLongAudit.split(DELIMITER);
+            for (String ipPort : ipPortStrs) {
+                Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, ipPort),
+                        "Error inLong audit format: " + inLongAudit);
+                ipPortList.add(ipPort);
+            }
+        }
+
+        if (!ipPortList.isEmpty()) {
+            AuditOperator.getInstance().setAuditProxy(ipPortList);
+            auditOperator = AuditOperator.getInstance();
+        }
+
+        recordMetricNames = Arrays.stream(ACTION_TYPES)
+                .map(action -> getMetricsName(metricNamePrefix, action, METRIC_TOTAL_RECORDS_WRITTEN))
+                .collect(Collectors.toSet());
+        byteMetricNames = Arrays.stream(ACTION_TYPES)
+                .map(action -> getMetricsName(metricNamePrefix, action, METRIC_TOTAL_BYTES_WRITTEN))
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public void report(
+            SortedMap<String, Gauge> gaugeMap,
+            SortedMap<String, Counter> countMap,
+            SortedMap<String, Histogram> histogramMap,
+            SortedMap<String, Meter> meterMap,
+            SortedMap<String, Timer> timerMap) {
+
+        if (auditOperator != null && !gaugeMap.isEmpty()) {
+            long rowCount = getGaugeValue(gaugeMap, recordMetricNames);
+            long rowSize = getGaugeValue(gaugeMap, byteMetricNames);
+            auditOperator.add(
+                    Constants.AUDIT_SORT_OUTPUT,
+                    getGroupId(),
+                    getStreamId(),
+                    System.currentTimeMillis(),
+                    rowCount,
+                    rowSize);
+        }
+    }
+
+    private Long getGaugeValue(
+            SortedMap<String, Gauge> gaugeMap,
+            Set<String> metricNames) {
+        return metricNames
+                .stream()
+                .mapToLong(metricName -> getGaugeValue(gaugeMap, metricName))
+                .sum();
+    }
+
+    private Long getGaugeValue(
+            SortedMap<String, Gauge> gaugeMap,
+            String metricName) {
+        return Optional.ofNullable(gaugeMap.get(metricName))
+                .map(Gauge::getValue)
+                .map(v -> (Long) v)
+                .orElse(0L);
+    }
+
+    public LinkedHashMap<String, String> getLabels() {
+        return labels;
+    }
+
+    private String getStreamId() {
+        return getLabels().get(STREAM_ID);
+    }
+
+    private String getGroupId() {
+        return getLabels().get(GROUP_ID);
+    }
+
+    public boolean isReady() {
+        return auditOperator != null;
+    }
+}
diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfig.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfig.java
new file mode 100644
index 000000000..d5a3141e8
--- /dev/null
+++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfig.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.metric;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.Properties;
+
+import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX;
+
+/**
+ * Configs for InLongHudi reporter type.
+ */
+@Immutable
+@ConfigClassProperty(name = "Metrics Configurations for InLongHudi reporter", groupName = ConfigGroups.Names.METRICS, description = "Enables reporting on Hudi metrics using the InLongHudi reporter type. "
+        + "Hudi publishes metrics on every commit, clean, rollback etc.")
+public class HudiMetricsConfig extends HoodieConfig {
+
+    public static final Logger LOG = LoggerFactory.getLogger(HudiMetricsConfig.class);
+    public static final String METRIC_TYPE = "inlonghudi";
+
+    public static final String INLONG_HUDI_PREFIX = METRIC_PREFIX + "." + METRIC_TYPE;
+
+    public static final ConfigProperty<Integer> REPORT_PERIOD_IN_SECONDS = ConfigProperty
+            .key(INLONG_HUDI_PREFIX + ".report.period.seconds")
+            .defaultValue(30)
+            .sinceVersion("0.6.0")
+            .withDocumentation("InLongHudi reporting period in seconds. Default to 30.");
+
+    /**
+     * Get config from props
+     */
+    public static <T> T getConfig(
+            Properties props,
+            ConfigProperty<T> configProperty) {
+        return new GeneralConfig<T>(configProperty).getProperty(props);
+    }
+
+    /**
+     * Get config from props
+     */
+    public static <T> T getConfig(
+            Properties props,
+            ConfigOption<T> configProperty) {
+        return new GeneralConfig<T>(configProperty).getProperty(props);
+    }
+
+    /**
+     * The until for parse config.
+     */
+    public static class GeneralConfig<T> {
+
+        private ConfigOption<T> option;
+        private ConfigProperty<T> config;
+        private Type type;
+
+        public GeneralConfig(ConfigOption<T> configOption) {
+            this.option = configOption;
+        }
+
+        public GeneralConfig(ConfigProperty<T> configProperty) {
+            this.config = configProperty;
+        }
+
+        public String key() {
+            return option != null ? option.key() : config.key();
+        }
+
+        public T defaultValue() {
+            return option != null ? option.defaultValue() : config.defaultValue();
+        }
+
+        public T getProperty(Properties prop) {
+            String key = key();
+            T defaultValue = defaultValue();
+            Object o = prop.get(key);
+            if (o == null) {
+                return defaultValue;
+            }
+            Class<?> defaultValueClass = defaultValue.getClass();
+            if (o.getClass().isAssignableFrom(defaultValueClass)) {
+                return (T) o;
+            } else {
+                String property = String.valueOf(o);
+                try {
+                    return (T) parseValue(defaultValueClass, property);
+                } catch (InvocationTargetException | IllegalAccessException e) {
+                    throw new RuntimeException("Can not properly parse value:'" + property +
+                            "' to '" + defaultValueClass + "'", e);
+                }
+            }
+        }
+
+        private static <T> T parseValue(
+                Class<T> clazz,
+                String property) throws InvocationTargetException, IllegalAccessException {
+            Method valueOfMethod = null;
+            try {
+                valueOfMethod = clazz.getMethod("valueOf", String.class);
+            } catch (NoSuchMethodException e) {
+                LOG.error("Can not properly find 'valueOf' method of " + clazz, e);
+                throw new UnsupportedOperationException();
+            }
+            return (T) valueOfMethod.invoke(null, property);
+        }
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConst.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConst.java
new file mode 100644
index 000000000..bedc65978
--- /dev/null
+++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConst.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.metric;
+
+/**
+ * The constant values of hudi metric.
+ */
+public class HudiMetricsConst {
+
+    public static final String METRIC_TOTAL_RECORDS_WRITTEN = "totalRecordsWritten";
+
+    public static final String METRIC_TOTAL_BYTES_WRITTEN = "totalBytesWritten";
+
+    public static final String[] ACTION_TYPES = {"insert", "upsert"};
+
+}
diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsUtil.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsUtil.java
new file mode 100644
index 000000000..43969a355
--- /dev/null
+++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsUtil.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.metric;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.metrics.HoodieMetrics;
+
+public class HudiMetricsUtil {
+
+    /**
+     * Get metric name of hudi inner metrics.
+     * {@link HoodieMetrics#updateCommitMetrics(long, long, HoodieCommitMetadata, String)}
+     */
+    public static String getMetricsName(String metricsNamePrefix, String action, String metric) {
+        return String.format("%s.%s.%s", metricsNamePrefix, action, metric);
+    }
+}
diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/InLongHudiAuditReporter.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/InLongHudiAuditReporter.java
new file mode 100644
index 000000000..1ff8655a2
--- /dev/null
+++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/InLongHudiAuditReporter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.metric;
+
+import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_PREFIX;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.hudi.metric.HudiMetricsConfig.REPORT_PERIOD_IN_SECONDS;
+import static org.apache.inlong.sort.hudi.metric.HudiMetricsConfig.getConfig;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.util.StringUtils;
+import org.apache.hudi.com.codahale.metrics.MetricFilter;
+import org.apache.hudi.com.codahale.metrics.MetricRegistry;
+import org.apache.hudi.metrics.custom.CustomizableMetricsReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The main entry of hudi audit reporter.
+ */
+public class InLongHudiAuditReporter extends CustomizableMetricsReporter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InLongHudiAuditReporter.class);
+
+    private HudiAuditReporter hudiAuditReporter;
+    private Integer reportPeriodSeconds;
+
+    public InLongHudiAuditReporter(Properties props, MetricRegistry registry) {
+        super(props, registry);
+
+        String inLongLabels = getConfig(props, INLONG_METRIC);
+        String inLongAudit = getConfig(props, INLONG_AUDIT);
+        String metricNamePrefix = getConfig(props, METRICS_REPORTER_PREFIX);
+        this.reportPeriodSeconds = getConfig(props, REPORT_PERIOD_IN_SECONDS);
+
+        if (StringUtils.isNullOrWhitespaceOnly(inLongLabels)) {
+            LOG.error("Fatal error on create InLongHudiReporter, inLongLabels is empty!");
+            return;
+        }
+        if (StringUtils.isNullOrWhitespaceOnly(inLongAudit)) {
+            LOG.error("Fatal error on create InLongHudiReporter, inLongAudit is empty!");
+            return;
+        }
+
+        this.hudiAuditReporter = new HudiAuditReporter(
+                inLongLabels,
+                inLongAudit,
+                metricNamePrefix,
+                registry,
+                "inlong-hudi-audit-reporter",
+                MetricFilter.ALL,
+                TimeUnit.SECONDS,
+                TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void start() {
+        if (hudiAuditReporter != null && hudiAuditReporter.isReady()) {
+            this.hudiAuditReporter.start(reportPeriodSeconds, TimeUnit.SECONDS);
+        }
+    }
+
+    @Override
+    public void report() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}
diff --git a/inlong-sort/sort-connectors/hudi/src/test/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfigTest.java b/inlong-sort/sort-connectors/hudi/src/test/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfigTest.java
new file mode 100644
index 000000000..588fa4d13
--- /dev/null
+++ b/inlong-sort/sort-connectors/hudi/src/test/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfigTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.metric;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+/**
+ * The UnitTest for {@link HudiMetricsConfig#getConfig(Properties, ConfigOption)}
+ *
+ */
+public class HudiMetricsConfigTest {
+
+    @Test
+    public void testGetConfigByConfigProperty() {
+        Properties props = new Properties();
+
+        String stringPropKey = "string_prop";
+        String stringPropDefaultValue = "default_string_value";
+        String stringPropExpectedValue = "sample_value";
+
+        ConfigProperty<String> stringProp =
+                ConfigProperty.key(stringPropKey)
+                        .defaultValue(stringPropDefaultValue);
+
+        testGetValueByConfig(
+                props,
+                stringProp,
+                stringPropKey,
+                stringPropDefaultValue,
+                stringPropExpectedValue);
+
+        String intPropKey = "int_prop";
+        int intPropDefaultValue = 1;
+        int intPropExpectedValue = 3;
+        ConfigProperty<Integer> intProp =
+                ConfigProperty.key(intPropKey)
+                        .defaultValue(intPropDefaultValue);
+        testGetValueByConfig(props, intProp, intPropKey, intPropDefaultValue, intPropExpectedValue);
+
+        String longPropKey = "long_prop";
+        long longPropDefaultValue = 1L;
+        long longPropExpectedValue = 1000L;
+        ConfigProperty<Long> longProp =
+                ConfigProperty.key(longPropKey)
+                        .defaultValue(longPropDefaultValue);
+        testGetValueByConfig(props, longProp, longPropKey, longPropDefaultValue, longPropExpectedValue);
+
+        String floatPropKey = "float_prop";
+        float floatPropDefaultValue = 3.14f;
+        ConfigProperty<Float> floatProp =
+                ConfigProperty.key(floatPropKey)
+                        .defaultValue(floatPropDefaultValue);
+        float floatExpectedValue = 1000.0f;
+        testGetValueByConfig(props, floatProp, floatPropKey, floatPropDefaultValue, floatExpectedValue);
+
+        String doublePropKey = "double_prop";
+        double doublePropDefaultValue = 3.14159265358979;
+        double doublePropExpectedValue = 1000.1000;
+        ConfigProperty<Double> doubleProp =
+                ConfigProperty.key(doublePropKey)
+                        .defaultValue(doublePropDefaultValue);
+        testGetValueByConfig(props, doubleProp, doublePropKey, doublePropDefaultValue, doublePropExpectedValue);
+    }
+
+    private <T> void testGetValueByConfig(
+            Properties props,
+            ConfigProperty<T> property,
+            String key,
+            T defaultValue,
+            T expectedValue) {
+        T value = HudiMetricsConfig.getConfig(props, property);
+        Assert.assertEquals(value, defaultValue);
+
+        props.put(key, expectedValue);
+        value = HudiMetricsConfig.getConfig(props, property);
+        Assert.assertEquals(expectedValue, value);
+    }
+
+    private <T> void testGetValueByConfig(
+            Properties props,
+            ConfigOption<T> option,
+            String key,
+            T defaultValue,
+            T expectedValue) {
+        T value = HudiMetricsConfig.getConfig(props, option);
+        Assert.assertEquals(value, defaultValue);
+
+        props.put(key, expectedValue);
+        value = HudiMetricsConfig.getConfig(props, option);
+        Assert.assertEquals(expectedValue, value);
+    }
+
+    @Test
+    public void testGetConfigByConfigOption() {
+        Properties props = new Properties();
+
+        String stringPropKey = "string_prop";
+        String stringPropDefaultValue = "default_string_value";
+        String stringPropExpectedValue = "sample_value";
+
+        ConfigOption<String> stringProp =
+                ConfigOptions.key(stringPropKey)
+                        .defaultValue(stringPropDefaultValue);
+
+        testGetValueByConfig(
+                props,
+                stringProp,
+                stringPropKey,
+                stringPropDefaultValue,
+                stringPropExpectedValue);
+
+        String intPropKey = "int_prop";
+        int intPropDefaultValue = 1;
+        int intPropExpectedValue = 3;
+        ConfigOption<Integer> intProp =
+                ConfigOptions.key(intPropKey)
+                        .defaultValue(intPropDefaultValue);
+        testGetValueByConfig(props, intProp, intPropKey, intPropDefaultValue, intPropExpectedValue);
+
+        String longPropKey = "long_prop";
+        long longPropDefaultValue = 1L;
+        long longPropExpectedValue = 1000L;
+        ConfigOption<Long> longProp =
+                ConfigOptions.key(longPropKey)
+                        .defaultValue(longPropDefaultValue);
+        testGetValueByConfig(props, longProp, longPropKey, longPropDefaultValue, longPropExpectedValue);
+
+        String floatPropKey = "float_prop";
+        float floatPropDefaultValue = 3.14f;
+        ConfigOption<Float> floatProp =
+                ConfigOptions.key(floatPropKey)
+                        .defaultValue(floatPropDefaultValue);
+        float floatExpectedValue = 1000.0f;
+        testGetValueByConfig(props, floatProp, floatPropKey, floatPropDefaultValue, floatExpectedValue);
+
+        String doublePropKey = "double_prop";
+        double doublePropDefaultValue = 3.14159265358979;
+        double doublePropExpectedValue = 1000.1000;
+        ConfigOption<Double> doubleProp =
+                ConfigOptions.key(doublePropKey)
+                        .defaultValue(doublePropDefaultValue);
+        testGetValueByConfig(props, doubleProp, doublePropKey, doublePropDefaultValue, doublePropExpectedValue);
+    }
+}