You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/19 11:30:15 UTC
[inlong] 02/02: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive (#5906)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit eb00535bf0ac9fff42c58aebeb22b05b9b899829
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Mon Sep 19 19:25:03 2022 +0800
[INLONG-5903][Sort] Make InLong metric constructs factory more cohesive (#5906)
Co-authored-by: thesumery <15...@qq.com>
---
.../inlong/sort/configuration/Constants.java | 22 +++-
.../apache/inlong/sort/protocol/InlongMetric.java | 22 +---
.../org/apache/inlong/sort/base/Constants.java | 11 +-
.../apache/inlong/sort/base/metric/MetricData.java | 35 ++++-
.../inlong/sort/base/metric/MetricOption.java | 141 ++++++++++++++++-----
.../inlong/sort/base/metric/SinkMetricData.java | 128 +++++++++++--------
.../inlong/sort/base/metric/SourceMetricData.java | 105 +++++++--------
.../sort/base/util/ValidateMetricOptionUtils.java | 39 ------
.../sort/elasticsearch/ElasticsearchSinkBase.java | 26 ++--
.../table/RowElasticsearchSinkFunction.java | 55 ++------
.../sort/filesystem/FileSystemTableSink.java | 2 -
.../filesystem/stream/AbstractStreamingWriter.java | 25 ++--
.../sort/hbase/HBase2DynamicTableFactory.java | 2 -
.../inlong/sort/hbase/sink/HBaseSinkFunction.java | 46 +++----
.../hive/filesystem/AbstractStreamingWriter.java | 25 ++--
.../iceberg/flink/FlinkDynamicTableFactory.java | 11 +-
.../sort/iceberg/flink/IcebergTableSink.java | 2 +-
.../inlong/sort/iceberg/flink/sink/FlinkSink.java | 2 +-
.../iceberg/flink/sink/IcebergStreamWriter.java | 7 -
.../inlong/sort/iceberg/IcebergTableSink.java | 2 +-
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 2 +-
.../sort/iceberg/sink/IcebergStreamWriter.java | 25 ++--
.../jdbc/internal/JdbcBatchingOutputFormat.java | 76 +++--------
.../inlong/sort/kafka/FlinkKafkaConsumerBase.java | 52 +++-----
.../inlong/sort/kafka/FlinkKafkaProducer.java | 48 ++-----
.../table/DynamicKafkaDeserializationSchema.java | 8 +-
.../sort/cdc/mongodb/DebeziumSourceFunction.java | 32 ++---
.../mongodb/table/MongoDBTableSourceFactory.java | 2 -
.../sort/cdc/debezium/DebeziumSourceFunction.java | 34 ++---
.../inlong/sort/cdc/mysql/source/MySqlSource.java | 36 ++----
.../source/metrics/MySqlSourceReaderMetrics.java | 124 ++----------------
.../mysql/table/MySqlTableInlongSourceFactory.java | 2 -
.../sort/cdc/oracle/DebeziumSourceFunction.java | 32 ++---
.../cdc/oracle/table/OracleTableSourceFactory.java | 2 -
.../DebeziumSourceFunction.java | 43 ++-----
.../cdc/postgres/table/PostgreSQLTableFactory.java | 2 -
.../apache/inlong/sort/pulsar/table/Constants.java | 32 -----
.../table/DynamicPulsarDeserializationSchema.java | 41 ++----
.../pulsar/table/PulsarDynamicTableFactory.java | 2 +-
.../table/UpsertPulsarDynamicTableFactory.java | 2 +-
.../sqlserver/table/DebeziumSourceFunction.java | 56 ++------
.../inlong/sort/parser/impl/FlinkSqlParser.java | 25 ++--
42 files changed, 515 insertions(+), 871 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index b0979c62f..e4d6a3906 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -60,6 +60,12 @@ public class Constants {
public static final String HIVE_SINK_ORC_PREFIX = HIVE_SINK_PREFIX + "orc.";
+ public static final String GROUP_ID = "groupId";
+
+ public static final String STREAM_ID = "streamId";
+
+ public static final String NODE_ID = "nodeId";
+
// ------------------------------------------------------------------------
// Operator uid
// ------------------------------------------------------------------------
@@ -275,10 +281,18 @@ public class Constants {
.defaultValue(5)
.withDescription("minutes");
- public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS = key("metrics.audit.proxy.hosts")
- .noDefaultValue()
- .withDescription("Audit proxy host address for reporting audit metrics. "
- + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+ public static final ConfigOption<String> METRICS_LABELS =
+ ConfigOptions.key("inlong.metric.labels")
+ .noDefaultValue()
+ .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+ + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
+
+
+ public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS =
+ ConfigOptions.key("metrics.audit.proxy.hosts")
+ .noDefaultValue()
+ .withDescription("Audit proxy host address for reporting audit metrics. \n"
+ + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
// ------------------------------------------------------------------------
// Single tenant related
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
index 4afb46a89..b90a38ccb 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
@@ -19,29 +19,11 @@ package org.apache.inlong.sort.protocol;
/**
* The class is the abstract of Inlong Metric.
- * We agree that the key of the inlong metric report is `inlong.metric`,
+ * We agree that the key of the inlong metric report is
+ * {@link org.apache.inlong.sort.configuration.Constants#METRICS_GROUP_STREAM_NODE},
* and its value is format by `groupId&streamId&nodeId`.
* If node implements this interface, we will inject the key and value into the corresponding Sort-Connectors
* during flink sql parser
*/
public interface InlongMetric {
-
- /**
- * The key of metric, it must be `inlong.metric` here.
- */
- String METRIC_KEY = "inlong.metric";
-
- /**
- * The value format, it must be `groupId&streamId&nodeId` here.
- * The groupId is the id of Inlong Group
- * The streamId is the id of Inlong Stream
- * The nodeId is the id of Inlong Source or Sink
- */
- String METRIC_VALUE_FORMAT = "%s&%s&%s";
-
- /**
- * The key of InLong audit, the value should be ip:port&ip:port
- */
- String AUDIT_KEY = "inlong.audit";
-
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 18ff408f2..679bcc6c3 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -86,17 +86,18 @@ public final class Constants {
public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
public static final ConfigOption<String> INLONG_METRIC =
- ConfigOptions.key("inlong.metric")
+ ConfigOptions.key("inlong.metric.labels")
.stringType()
.noDefaultValue()
- .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
+ .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+ + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
public static final ConfigOption<String> INLONG_AUDIT =
- ConfigOptions.key("inlong.audit")
+ ConfigOptions.key("metrics.audit.proxy.hosts")
.stringType()
.noDefaultValue()
- .withDescription("INLONG AUDIT HOST + '&' + PORT");
+ .withDescription("Audit proxy host address for reporting audit metrics. \n"
+ + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
ConfigOptions.key("sink.ignore.changelog")
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
index 681318ff7..381fa5ac4 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
@@ -23,6 +23,8 @@ import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
+import java.util.Map;
+
import static org.apache.inlong.sort.base.Constants.GROUP_ID;
import static org.apache.inlong.sort.base.Constants.NODE_ID;
import static org.apache.inlong.sort.base.Constants.STREAM_ID;
@@ -40,26 +42,39 @@ public interface MetricData {
*/
MetricGroup getMetricGroup();
+ /**
+ * Get labels
+ *
+ * @return The labels defined in inlong
+ */
+ Map<String, String> getLabels();
+
/**
* Get group id
*
* @return The group id defined in inlong
*/
- String getGroupId();
+ default String getGroupId() {
+ return getLabels().get(GROUP_ID);
+ }
/**
* Get stream id
*
* @return The stream id defined in inlong
*/
- String getStreamId();
+ default String getStreamId() {
+ return getLabels().get(STREAM_ID);
+ }
/**
* Get node id
*
* @return The node id defined in inlong
*/
- String getNodeId();
+ default String getNodeId() {
+ return getLabels().get(NODE_ID);
+ }
/**
* Register a counter metric
@@ -69,8 +84,11 @@ public interface MetricData {
* @return Counter of registered
*/
default Counter registerCounter(String metricName, Counter counter) {
- return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
- .addGroup(NODE_ID, getNodeId()).counter(metricName, counter);
+ MetricGroup inlongMetricGroup = getMetricGroup();
+ for (Map.Entry<String, String> label : getLabels().entrySet()) {
+ inlongMetricGroup = inlongMetricGroup.addGroup(label.getKey(), label.getValue());
+ }
+ return inlongMetricGroup.counter(metricName, counter);
}
/**
@@ -90,8 +108,11 @@ public interface MetricData {
* @return Meter of registered
*/
default Meter registerMeter(String metricName, Counter counter) {
- return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
- .addGroup(NODE_ID, getNodeId()).meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS));
+ MetricGroup inlongMetricGroup = getMetricGroup();
+ for (Map.Entry<String, String> label : getLabels().entrySet()) {
+ inlongMetricGroup = inlongMetricGroup.addGroup(label.getKey(), label.getValue());
+ }
+ return inlongMetricGroup.meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS));
}
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index d2179ae54..f4c679f9c 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -19,13 +19,19 @@
package org.apache.inlong.sort.base.metric;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
+import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.GROUP_ID;
+import static org.apache.inlong.sort.base.Constants.STREAM_ID;
public class MetricOption {
private static final String IP_OR_HOST_PORT = "^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
@@ -35,55 +41,126 @@ public class MetricOption {
+ "3}|65[0-4]\\d{"
+ "2}|655[0-2]\\d|6553[0-5])$";
- private final String groupId;
- private final String streamId;
- private final String nodeId;
+ private Map<String, String> labels;
private final HashSet<String> ipPortList;
- private String ipPorts;
+ private Optional<String> ipPorts;
+ private RegisteredMetric registeredMetric;
+ private long initRecords;
+ private long initBytes;
- public MetricOption(String inLongMetric) {
- this(inLongMetric, null);
- }
+ private MetricOption(
+ String inlongLabels,
+ @Nullable String inlongAudit,
+ RegisteredMetric registeredMetric,
+ long initRecords,
+ long initBytes) {
+ Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
+ "Inlong labels must be set for register metric.");
- public MetricOption(String inLongMetric, @Nullable String inLongAudit) {
- ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inLongMetric, inLongAudit);
- String[] inLongMetricArray = inLongMetric.split(DELIMITER);
- Preconditions.checkArgument(inLongMetricArray.length == 3,
- "Error inLong metric format: " + inLongMetric);
- this.groupId = inLongMetricArray[0];
- this.streamId = inLongMetricArray[1];
- this.nodeId = inLongMetricArray[2];
- this.ipPortList = new HashSet<>();
- this.ipPorts = null;
+ this.initRecords = initRecords;
+ this.initBytes = initBytes;
+ this.labels = new LinkedHashMap<>();
+ String[] inLongLabelArray = inlongLabels.split(DELIMITER);
+ Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")),
+ "InLong metric label format must be xxx=xxx");
+ Stream.of(inLongLabelArray).forEach(label -> {
+ String key = label.substring(0, label.indexOf('='));
+ String value = label.substring(label.indexOf('=') + 1);
+ labels.put(key, value);
+ });
- if (inLongAudit != null) {
- String[] ipPortStrs = inLongAudit.split(DELIMITER);
- this.ipPorts = inLongAudit;
+ this.ipPortList = new HashSet<>();
+ this.ipPorts = Optional.ofNullable(inlongAudit);
+ if (ipPorts.isPresent()) {
+ Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID),
+ "groupId and streamId must be set when enable inlong audit collect.");
+ String[] ipPortStrs = inlongAudit.split(DELIMITER);
for (String ipPort : ipPortStrs) {
Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, ipPort),
- "Error inLong audit format: " + inLongAudit);
+ "Error inLong audit format: " + inlongAudit);
this.ipPortList.add(ipPort);
}
}
- }
- public String getGroupId() {
- return groupId;
- }
-
- public String getStreamId() {
- return streamId;
+ if (registeredMetric != null) {
+ this.registeredMetric = registeredMetric;
+ }
}
- public String getNodeId() {
- return nodeId;
+ public Map<String, String> getLabels() {
+ return labels;
}
public HashSet<String> getIpPortList() {
return ipPortList;
}
- public String getIpPorts() {
+ public Optional<String> getIpPorts() {
return ipPorts;
}
+
+ public RegisteredMetric getRegisteredMetric() {
+ return registeredMetric;
+ }
+
+ public long getInitRecords() {
+ return initRecords;
+ }
+
+ public long getInitBytes() {
+ return initBytes;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public enum RegisteredMetric {
+ ALL,
+ NORMAL,
+ DIRTY
+ }
+
+ public static class Builder {
+ private String inlongLabels;
+ private String inlongAudit;
+ private RegisteredMetric registeredMetric = RegisteredMetric.ALL;
+ private long initRecords = 0L;
+ private long initBytes = 0L;
+
+ private Builder() {
+ }
+
+ public MetricOption.Builder withInlongLabels(String inlongLabels) {
+ this.inlongLabels = inlongLabels;
+ return this;
+ }
+
+ public MetricOption.Builder withInlongAudit(String inlongAudit) {
+ this.inlongAudit = inlongAudit;
+ return this;
+ }
+
+ public MetricOption.Builder withRegisterMetric(RegisteredMetric registeredMetric) {
+ this.registeredMetric = registeredMetric;
+ return this;
+ }
+
+ public MetricOption.Builder withInitRecords(long initRecords) {
+ this.initRecords = initRecords;
+ return this;
+ }
+
+ public MetricOption.Builder withInitBytes(long initBytes) {
+ this.initBytes = initBytes;
+ return this;
+ }
+
+ public MetricOption build() {
+ if (inlongLabels == null && inlongAudit == null) {
+ return null;
+ }
+ return new MetricOption(inlongLabels, inlongAudit, registeredMetric, initRecords, initBytes);
+ }
+ }
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 4073ddd44..d065496e4 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -25,12 +25,9 @@ import org.apache.flink.metrics.SimpleCounter;
import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.sort.base.Constants;
-import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Map;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
@@ -45,10 +42,8 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
*/
public class SinkMetricData implements MetricData {
- private final MetricGroup metricGroup;
- private final String groupId;
- private final String streamId;
- private final String nodeId;
+ private MetricGroup metricGroup;
+ private Map<String, String> labels;
private AuditImp auditImp;
private Counter numRecordsOut;
private Counter numBytesOut;
@@ -59,23 +54,46 @@ public class SinkMetricData implements MetricData {
private Meter numRecordsOutPerSecond;
private Meter numBytesOutPerSecond;
- public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
- this(groupId, streamId, nodeId, metricGroup, null);
- }
-
public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
- this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
- }
-
- public SinkMetricData(
- String groupId, String streamId, String nodeId, MetricGroup metricGroup,
- @Nullable String auditHostAndPorts) {
this.metricGroup = metricGroup;
- this.groupId = groupId;
- this.streamId = streamId;
- this.nodeId = nodeId;
- if (auditHostAndPorts != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+ this.labels = option.getLabels();
+
+ ThreadSafeCounter recordsOutCounter = new ThreadSafeCounter();
+ ThreadSafeCounter bytesOutCounter = new ThreadSafeCounter();
+ switch (option.getRegisteredMetric()) {
+ case DIRTY:
+ registerMetricsForDirtyBytes(new ThreadSafeCounter());
+ registerMetricsForDirtyRecords(new ThreadSafeCounter());
+ break;
+ case NORMAL:
+ registerMetricsForNumBytesOut(new ThreadSafeCounter());
+ registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+ registerMetricsForNumBytesOutPerSecond();
+ registerMetricsForNumRecordsOutPerSecond();
+
+ recordsOutCounter.inc(option.getInitRecords());
+ bytesOutCounter.inc(option.getInitBytes());
+ registerMetricsForNumRecordsOutForMeter(recordsOutCounter);
+ registerMetricsForNumRecordsOutForMeter(bytesOutCounter);
+ break;
+ default:
+ registerMetricsForDirtyBytes(new ThreadSafeCounter());
+ registerMetricsForDirtyRecords(new ThreadSafeCounter());
+ registerMetricsForNumBytesOut(new ThreadSafeCounter());
+ registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+ registerMetricsForNumBytesOutPerSecond();
+ registerMetricsForNumRecordsOutPerSecond();
+
+ recordsOutCounter.inc(option.getInitRecords());
+ bytesOutCounter.inc(option.getInitBytes());
+ registerMetricsForNumRecordsOutForMeter(recordsOutCounter);
+ registerMetricsForNumRecordsOutForMeter(bytesOutCounter);
+ break;
+
+ }
+
+ if (option.getIpPorts().isPresent()) {
+ AuditImp.getInstance().setAuditProxy(option.getIpPortList());
this.auditImp = AuditImp.getInstance();
}
}
@@ -218,18 +236,8 @@ public class SinkMetricData implements MetricData {
}
@Override
- public String getGroupId() {
- return groupId;
- }
-
- @Override
- public String getStreamId() {
- return streamId;
- }
-
- @Override
- public String getNodeId() {
- return nodeId;
+ public Map<String, String> getLabels() {
+ return labels;
}
public Counter getNumRecordsOutForMeter() {
@@ -242,26 +250,26 @@ public class SinkMetricData implements MetricData {
public void invokeWithEstimate(Object o) {
long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
- this.numRecordsOut.inc();
- this.numBytesOut.inc(size);
- this.numRecordsOutForMeter.inc();
- this.numBytesOutForMeter.inc(size);
- if (auditImp != null) {
- auditImp.add(
- Constants.AUDIT_SORT_OUTPUT,
- getGroupId(),
- getStreamId(),
- System.currentTimeMillis(),
- 1,
- size);
- }
+ invoke(1, size);
}
public void invoke(long rowCount, long rowSize) {
- this.numRecordsOut.inc(rowCount);
- this.numBytesOut.inc(rowSize);
- this.numRecordsOutForMeter.inc(rowCount);
- this.numBytesOutForMeter.inc(rowSize);
+ if (numRecordsOut != null) {
+ numRecordsOut.inc(rowCount);
+ }
+
+ if (numBytesOut != null) {
+ numBytesOut.inc(rowSize);
+ }
+
+ if (numRecordsOutForMeter != null) {
+ numRecordsOutForMeter.inc(rowCount);
+ }
+
+ if (numBytesOutForMeter != null) {
+ numBytesOutForMeter.inc(rowCount);
+ }
+
if (auditImp != null) {
auditImp.add(
Constants.AUDIT_SORT_OUTPUT,
@@ -273,12 +281,22 @@ public class SinkMetricData implements MetricData {
}
}
+ public void invokeDirty(long rowCount, long rowSize) {
+ if (dirtyRecords != null) {
+ dirtyRecords.inc(rowCount);
+ }
+
+ if (dirtyBytes != null) {
+ dirtyBytes.inc(rowSize);
+ }
+ }
+
@Override
public String toString() {
return "SinkMetricData{"
- + "groupId='" + groupId + '\''
- + ", streamId='" + streamId + '\''
- + ", nodeId='" + nodeId + '\''
+ + "metricGroup=" + metricGroup
+ + ", labels=" + labels
+ + ", auditImp=" + auditImp
+ ", numRecordsOut=" + numRecordsOut.getCount()
+ ", numBytesOut=" + numBytesOut.getCount()
+ ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 5c25fcc75..3cffcfe54 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -25,12 +25,9 @@ import org.apache.flink.metrics.SimpleCounter;
import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.sort.base.Constants;
-import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
-import java.util.Arrays;
-import java.util.HashSet;
-
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
@@ -43,46 +40,39 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
*/
public class SourceMetricData implements MetricData {
- private final MetricGroup metricGroup;
- private final String groupId;
- private final String streamId;
- private final String nodeId;
+ private MetricGroup metricGroup;
+ private Map<String, String> labels;
private Counter numRecordsIn;
private Counter numBytesIn;
private Counter numRecordsInForMeter;
private Counter numBytesInForMeter;
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
- private final AuditImp auditImp;
-
- public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
- this(groupId, streamId, nodeId, metricGroup, (AuditImp) null);
- }
+ private AuditImp auditImp;
public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
- this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
- }
-
- public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
- AuditImp auditImp) {
- this.groupId = groupId;
- this.streamId = streamId;
- this.nodeId = nodeId;
this.metricGroup = metricGroup;
- this.auditImp = auditImp;
- }
+ this.labels = option.getLabels();
+
+ SimpleCounter recordsInCounter = new SimpleCounter();
+ SimpleCounter bytesInCounter = new SimpleCounter();
+ switch (option.getRegisteredMetric()) {
+ default:
+ registerMetricsForNumRecordsIn();
+ registerMetricsForNumBytesIn();
+ registerMetricsForNumBytesInPerSecond();
+ registerMetricsForNumRecordsInPerSecond();
+
+ recordsInCounter.inc(option.getInitRecords());
+ bytesInCounter.inc(option.getInitBytes());
+ registerMetricsForNumBytesInForMeter(recordsInCounter);
+ registerMetricsForNumRecordsInForMeter(bytesInCounter);
+ break;
+ }
- public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
- @Nullable String auditHostAndPorts) {
- this.groupId = groupId;
- this.streamId = streamId;
- this.nodeId = nodeId;
- this.metricGroup = metricGroup;
- if (auditHostAndPorts != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+ if (option.getIpPorts() != null) {
+ AuditImp.getInstance().setAuditProxy(option.getIpPortList());
this.auditImp = AuditImp.getInstance();
- } else {
- this.auditImp = null;
}
}
@@ -196,26 +186,32 @@ public class SourceMetricData implements MetricData {
}
@Override
- public String getGroupId() {
- return groupId;
- }
-
- @Override
- public String getStreamId() {
- return streamId;
+ public Map<String, String> getLabels() {
+ return labels;
}
- @Override
- public String getNodeId() {
- return nodeId;
+ public void outputMetricsWithEstimate(Object o) {
+ long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
+ outputMetrics(1, size);
}
public void outputMetrics(long rowCountSize, long rowDataSize) {
- outputMetricForFlink(rowCountSize, rowDataSize);
- outputMetricForAudit(rowCountSize, rowDataSize);
- }
+ if (numRecordsIn != null) {
+ this.numRecordsIn.inc(rowCountSize);
+ }
+
+ if (numBytesIn != null) {
+ this.numBytesIn.inc(rowDataSize);
+ }
+
+ if (numRecordsInForMeter != null) {
+ this.numRecordsInForMeter.inc(rowCountSize);
+ }
+
+ if (numBytesInForMeter != null) {
+ this.numBytesInForMeter.inc(rowDataSize);
+ }
- public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
if (auditImp != null) {
auditImp.add(
Constants.AUDIT_SORT_INPUT,
@@ -227,25 +223,18 @@ public class SourceMetricData implements MetricData {
}
}
- public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
- this.numBytesIn.inc(rowDataSize);
- this.numRecordsIn.inc(rowCountSize);
- this.numBytesInForMeter.inc(rowDataSize);
- this.numRecordsInForMeter.inc(rowCountSize);
- }
-
@Override
public String toString() {
return "SourceMetricData{"
- + "groupId='" + groupId + '\''
- + ", streamId='" + streamId + '\''
- + ", nodeId='" + nodeId + '\''
+ + "metricGroup=" + metricGroup
+ + ", labels=" + labels
+ ", numRecordsIn=" + numRecordsIn.getCount()
+ ", numBytesIn=" + numBytesIn.getCount()
+ ", numRecordsInForMeter=" + numRecordsInForMeter.getCount()
+ ", numBytesInForMeter=" + numBytesInForMeter.getCount()
+ ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+ + ", auditImp=" + auditImp
+ '}';
}
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
deleted file mode 100644
index bd58e31ae..000000000
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.base.util;
-
-import org.apache.flink.table.api.ValidationException;
-
-/**
- * validate option tool
- */
-public class ValidateMetricOptionUtils {
-
- /**
- * validate inlong metric when set inlong audit
- * @param inlongMetric inlong.metric option value
- * @param inlongAudit inlong.audit option value
- */
- public static void validateInlongMetricIfSetInlongAudit(String inlongMetric, String inlongAudit) {
- if (inlongAudit != null && inlongMetric == null) {
- throw new ValidationException("inlong metric is necessary when set inlong audit");
- }
- }
-
-}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 000e1c23a..cdea07fa5 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.DocWriteRequest;
@@ -50,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* Base class for all Flink Elasticsearch Sinks.
@@ -266,15 +267,14 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
@Override
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
- if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String groupId = inlongMetricArray[0];
- String streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
- sinkMetricData.registerMetricsForDirtyBytes();
- sinkMetricData.registerMetricsForDirtyRecords();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withRegisterMetric(RegisteredMetric.DIRTY)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
+
callBridge.verifyClientConnection(client);
bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
requestIndexer =
@@ -482,8 +482,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
if (failure != null) {
restStatus = itemResponse.getFailure().getStatus();
actionRequest = request.requests().get(i);
- if (sinkMetricData.getDirtyRecords() != null) {
- sinkMetricData.getDirtyRecords().inc();
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1, 0);
}
if (restStatus == null) {
if (actionRequest instanceof ActionRequest) {
@@ -526,8 +526,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
try {
for (DocWriteRequest writeRequest : request.requests()) {
- if (sinkMetricData.getDirtyRecords() != null) {
- sinkMetricData.getDirtyRecords().inc();
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1, 0);
}
if (writeRequest instanceof ActionRequest) {
failureHandler.onFailure(
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 0ae93231d..e4f1419c5 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -20,8 +20,8 @@ package org.apache.inlong.sort.elasticsearch.table;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.table.api.TableException;
@@ -37,13 +37,9 @@ import org.elasticsearch.common.xcontent.XContentType;
import javax.annotation.Nullable;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.Objects;
import java.util.function.Function;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
/** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */
public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
@@ -63,11 +59,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
private transient RuntimeContext runtimeContext;
private SinkMetricData sinkMetricData;
- private Long dataSize = 0L;
- private Long rowSize = 0L;
- private String groupId;
- private String streamId;
- private transient AuditImp auditImp;
public RowElasticsearchSinkFunction(
IndexGenerator indexGenerator,
@@ -94,44 +85,20 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
public void open(RuntimeContext ctx) {
indexGenerator.open();
this.runtimeContext = ctx;
- if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- groupId = inlongMetricArray[0];
- streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
- sinkMetricData.registerMetricsForNumBytesOut();
- sinkMetricData.registerMetricsForNumRecordsOut();
- sinkMetricData.registerMetricsForNumBytesOutPerSecond();
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
- }
-
- if (auditHostAndPorts != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
- }
- }
-
- private void outputMetricForAudit(long size) {
- if (auditImp != null) {
- auditImp.add(
- Constants.AUDIT_SORT_OUTPUT,
- groupId,
- streamId,
- System.currentTimeMillis(),
- 1,
- size);
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withRegisterMetric(RegisteredMetric.NORMAL)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
}
}
private void sendMetrics(byte[] document) {
- if (sinkMetricData.getNumBytesOut() != null) {
- sinkMetricData.getNumBytesOut().inc(document.length);
- }
- if (sinkMetricData.getNumRecordsOut() != null) {
- sinkMetricData.getNumRecordsOut().inc();
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(1, document.length);
}
- outputMetricForAudit(document.length);
}
@Override
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
index 60bcf9332..b18e7e32d 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
@@ -77,7 +77,6 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
import org.apache.inlong.sort.filesystem.stream.StreamingSink;
import javax.annotation.Nullable;
@@ -157,7 +156,6 @@ public class FileSystemTableSink extends AbstractFileSystemTable
this.configuredParallelism = tableOptions.get(FileSystemOptions.SINK_PARALLELISM);
this.inlongMetric = tableOptions.get(INLONG_METRIC);
this.inlongAudit = tableOptions.get(INLONG_AUDIT);
- ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
}
@Override
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 47516a270..95267c698 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -32,10 +32,9 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
-
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
@@ -102,19 +101,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
@Override
public void open() throws Exception {
super.open();
- if (inlongMetric != null) {
- String[] inLongMetricArray = inlongMetric.split(DELIMITER);
- String groupId = inLongMetricArray[0];
- String streamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
- metricData = new SinkMetricData(
- groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(), inlongAudit);
- metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
- metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOutPerSecond();
- metricData.registerMetricsForNumRecordsOutPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .withInlongAudit(inlongAudit)
+ .build();
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
index da14c947a..55d5d57d7 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.hadoop.conf.Configuration;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink;
import java.util.HashSet;
@@ -106,7 +105,6 @@ public class HBase2DynamicTableFactory
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
String inlongAudit = tableOptions.get(INLONG_AUDIT);
- ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
return new HBaseDynamicTableSink(
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral, inlongMetric, inlongAudit);
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 907b758ff..a1e9641d2 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -37,9 +37,9 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,19 +123,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
this.runtimeContext = getRuntimeContext();
- if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
- String groupId = inlongMetricArray[0];
- String streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup(),
- inlongAudit);
- sinkMetricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
- sinkMetricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
- sinkMetricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
- sinkMetricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
- sinkMetricData.registerMetricsForNumBytesOutPerSecond();
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(inlongAudit)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
}
this.mutationConverter.open();
this.numPendingRequests = new AtomicLong(0);
@@ -163,14 +157,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
}
try {
flush();
- sinkMetricData.invoke(rowSize, dataSize);
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowSize, dataSize);
+ }
resetStateAfterFlush();
} catch (Exception e) {
- if (sinkMetricData.getDirtyRecords() != null) {
- sinkMetricData.getDirtyRecords().inc(rowSize);
- }
- if (sinkMetricData.getDirtyBytes() != null) {
- sinkMetricData.getDirtyBytes().inc(dataSize);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(rowSize, dataSize);
}
resetStateAfterFlush();
// fail the sink and skip the rest of the items
@@ -236,14 +229,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
&& numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
try {
flush();
- sinkMetricData.invoke(rowSize, dataSize);
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowSize, dataSize);
+ }
resetStateAfterFlush();
} catch (Exception e) {
- if (sinkMetricData.getDirtyRecords() != null) {
- sinkMetricData.getDirtyRecords().inc(rowSize);
- }
- if (sinkMetricData.getDirtyBytes() != null) {
- sinkMetricData.getDirtyBytes().inc(dataSize);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(rowSize, dataSize);
}
resetStateAfterFlush();
throw e;
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 5d0ca7bf0..ab2e845f2 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -32,12 +32,11 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
import javax.annotation.Nullable;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
/**
* Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
* file and bucket information to downstream.
@@ -111,19 +110,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
@Override
public void open() throws Exception {
super.open();
- if (inlongMetric != null) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String inlongGroupId = inlongMetricArray[0];
- String inlongStreamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- metricData = new SinkMetricData(
- inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
- metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
- metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOutPerSecond();
- metricData.registerMetricsForNumRecordsOutPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
index 35d3a6c76..7840e3ea7 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
@@ -135,12 +135,11 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
Map<String, String> tableProps = catalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
SyncRewriteDataFilesActionOption compactOption = new SyncRewriteDataFilesActionOption(tableProps);
- MetricOption metricOption = null;
- if (tableProps.containsKey(INLONG_METRIC.key())) {
- metricOption = new MetricOption(
- tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
- tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()));
- }
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()))
+ .withInlongAudit(tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
+ .build();
+
boolean appendMode = tableProps.containsKey(ICEBERG_IGNORE_ALL_CHANGELOG.key())
? Boolean.parseBoolean(tableProps.get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
: ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue();
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
index 112b6a36b..df0ced054 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
@@ -43,7 +43,7 @@ import java.util.Map;
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
* Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
* Add a table property `write.compact.enable` to support small file compact.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit
*/
public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
private static final Logger LOG = LoggerFactory.getLogger(IcebergTableSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
index 0b48fb169..c24d5a8c6 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
@@ -74,7 +74,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
* Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
* Add a table property `write.compact.enable` to support small file compact.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit.
*/
public class FlinkSink {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index 9f7cffbcf..bb00b7808 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -30,7 +30,6 @@ import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -76,12 +75,6 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
// Initialize metric
if (metricOption != null) {
metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
- metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
- metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOutPerSecond();
- metricData.registerMetricsForNumRecordsOutPerSecond();
}
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 088622278..37eb2670b 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -46,7 +46,7 @@ import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_A
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
* Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit
*/
public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 65929bc34..8662722d8 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -71,7 +71,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
* Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit.
*/
public class FlinkSink {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 79df9049f..8318c7177 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -28,14 +28,13 @@ import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
@@ -79,19 +78,13 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
this.writer = taskWriterFactory.create();
// Initialize metric
- if (inlongMetric != null) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String inlongGroupId = inlongMetricArray[0];
- String inlongStreamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- metricData = new SinkMetricData(
- inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
- metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
- metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOutPerSecond();
- metricData.registerMetricsForNumRecordsOutPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 66af78c4d..e32d4094b 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -33,7 +33,8 @@ import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +44,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -53,8 +52,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* A JDBC outputFormat that supports batching records before writing records to database.
@@ -80,9 +77,6 @@ public class JdbcBatchingOutputFormat<
private transient RuntimeContext runtimeContext;
private SinkMetricData sinkMetricData;
- private String inlongGroupId;
- private String inlongStreamId;
- private transient AuditImp auditImp;
private Long dataSize = 0L;
private Long rowSize = 0L;
@@ -130,22 +124,13 @@ public class JdbcBatchingOutputFormat<
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
this.runtimeContext = getRuntimeContext();
- if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- inlongGroupId = inlongMetricArray[0];
- inlongStreamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- sinkMetricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, runtimeContext.getMetricGroup());
- sinkMetricData.registerMetricsForDirtyBytes();
- sinkMetricData.registerMetricsForDirtyRecords();
- sinkMetricData.registerMetricsForNumBytesOut();
- sinkMetricData.registerMetricsForNumRecordsOut();
- sinkMetricData.registerMetricsForNumBytesOutPerSecond();
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
- }
- if (auditHostAndPorts != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
}
jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -159,20 +144,13 @@ public class JdbcBatchingOutputFormat<
if (!closed) {
try {
flush();
- if (sinkMetricData.getNumRecordsOut() != null) {
- sinkMetricData.getNumRecordsOut().inc(rowSize);
- }
- if (sinkMetricData.getNumBytesOut() != null) {
- sinkMetricData.getNumBytesOut()
- .inc(dataSize);
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowSize, dataSize);
}
resetStateAfterFlush();
} catch (Exception e) {
- if (sinkMetricData.getDirtyRecords() != null) {
- sinkMetricData.getDirtyRecords().inc(rowSize);
- }
- if (sinkMetricData.getDirtyBytes() != null) {
- sinkMetricData.getDirtyBytes().inc(dataSize);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(rowSize, dataSize);
}
resetStateAfterFlush();
flushException = e;
@@ -203,46 +181,26 @@ public class JdbcBatchingOutputFormat<
}
}
- private void outputMetricForAudit(long length) {
- if (auditImp != null) {
- auditImp.add(
- AUDIT_SORT_INPUT,
- inlongGroupId,
- inlongStreamId,
- System.currentTimeMillis(),
- 1,
- length);
- }
- }
-
@Override
public final synchronized void writeRecord(In record) throws IOException {
checkFlushException();
rowSize++;
dataSize = dataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
- outputMetricForAudit(dataSize);
try {
addToBatch(record, jdbcRecordExtractor.apply(record));
batchCount++;
if (executionOptions.getBatchSize() > 0
&& batchCount >= executionOptions.getBatchSize()) {
flush();
- if (sinkMetricData.getNumRecordsOut() != null) {
- sinkMetricData.getNumRecordsOut().inc(rowSize);
- }
- if (sinkMetricData.getNumBytesOut() != null) {
- sinkMetricData.getNumBytesOut()
- .inc(dataSize);
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowSize, dataSize);
}
resetStateAfterFlush();
}
} catch (Exception e) {
- if (sinkMetricData.getDirtyRecords() != null) {
- sinkMetricData.getDirtyRecords().inc(rowSize);
- }
- if (sinkMetricData.getDirtyBytes() != null) {
- sinkMetricData.getDirtyBytes().inc(dataSize);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(rowSize, dataSize);
}
resetStateAfterFlush();
throw new IOException("Writing records to JDBC failed.", e);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
index 0d0ab4544..b655a10cd 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -19,7 +19,6 @@
package org.apache.inlong.sort.kafka;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
@@ -63,10 +62,10 @@ import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWat
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SourceMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -74,10 +73,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -90,7 +87,6 @@ import static org.apache.flink.streaming.connectors.kafka.internals.metrics.Kafk
import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
@@ -831,36 +827,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
-
- if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String groupId = inlongMetricArray[0];
- String streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- AuditImp auditImp = null;
- if (inlongAudit != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
- }
- sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(),
- auditImp);
- ThreadSafeCounter recordsInCounter = new ThreadSafeCounter();
- ThreadSafeCounter bytesInCounter = new ThreadSafeCounter();
- if (metricState != null) {
- recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
- bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
- }
- sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
- sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
- sourceMetricData.registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter());
- sourceMetricData.registerMetricsForNumBytesInForMeter(new ThreadSafeCounter());
- sourceMetricData.registerMetricsForNumBytesInPerSecond();
- sourceMetricData.registerMetricsForNumRecordsInPerSecond();
- if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
- DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
- (DynamicKafkaDeserializationSchema) deserializer;
- dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
- }
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(inlongAudit)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+ .build();
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
+ }
+ if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
+ DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
+ (DynamicKafkaDeserializationSchema) deserializer;
+ dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
}
if (subscribedPartitionsToStartOffsets == null) {
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 3f0902c0c..d3ca629cb 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -57,10 +57,10 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
@@ -97,7 +97,6 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -243,18 +242,6 @@ public class FlinkKafkaProducer<IN>
*/
@Nullable
protected transient volatile Exception asyncException;
- /**
- * audit implement
- */
- private transient AuditImp auditImp;
- /**
- * inLong groupId
- */
- private String inlongGroupId;
- /**
- * inLong streamId
- */
- private String inlongStreamId;
/**
* sink metric data
*/
@@ -914,25 +901,15 @@ public class FlinkKafkaProducer<IN>
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
}
- if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- inlongGroupId = inlongMetricArray[0];
- inlongStreamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup(),
- auditHostAndPorts);
- metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
- metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
- metricData.registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOutPerSecond();
- metricData.registerMetricsForNumRecordsOutPerSecond();
- }
- if (metricState != null && metricData != null) {
- metricData.getNumBytesOut().inc(metricState.getMetricValue(NUM_BYTES_OUT));
- metricData.getNumRecordsOut().inc(metricState.getMetricValue(NUM_RECORDS_OUT));
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption, ctx.getMetricGroup());
}
super.open(configuration);
}
@@ -945,8 +922,7 @@ public class FlinkKafkaProducer<IN>
private void sendDirtyMetrics(Long rowSize, Long dataSize) {
if (metricData != null) {
- metricData.getDirtyRecords().inc(rowSize);
- metricData.getDirtyBytes().inc(dataSize);
+ metricData.invokeDirty(rowSize, dataSize);
}
}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index c6b5c11a9..13e3d2103 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -120,7 +120,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(record.value(), collector);
// output metrics
- outputMetrics(record);
+ if (metricData != null) {
+ metricData.outputMetrics(1, record.value().length);
+ }
return;
}
@@ -140,7 +142,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc
} else {
valueDeserialization.deserialize(record.value(), outputCollector);
// output metrics
- outputMetrics(record);
+ if (metricData != null) {
+ metricData.outputMetrics(1, record.value().length);
+ }
}
keyCollector.buffer.clear();
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 42bb53732..932e249d5 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -58,7 +57,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -68,9 +68,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,7 +79,6 @@ import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -416,21 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
(Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
- if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String groupId = inlongMetricArray[0];
- String streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- AuditImp auditImp = null;
- if (inlongAudit != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
- }
- metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
- metricData.registerMetricsForNumRecordsIn();
- metricData.registerMetricsForNumBytesIn();
- metricData.registerMetricsForNumBytesInPerSecond();
- metricData.registerMetricsForNumRecordsInPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(inlongAudit)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -470,8 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
if (metricData != null) {
- metricData.outputMetrics(1L,
- record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ metricData.outputMetricsWithEstimate(record.value());
}
deserializer.deserialize(record, out);
}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 919c227e4..20f20b65f 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
import java.time.ZoneId;
import java.util.HashSet;
@@ -226,7 +225,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
: ZoneId.of(zoneId);
final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
final String inlongAudit = config.get(INLONG_AUDIT);
- ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index e7084d2fa..1ac48f97a 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -25,7 +25,6 @@ import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -47,7 +46,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
@@ -66,9 +66,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -77,7 +75,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
@@ -416,23 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
(Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
- if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String groupId = inlongMetricArray[0];
- String streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- AuditImp auditImp = null;
- if (inlongAudit != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
- }
- sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
- sourceMetricData.registerMetricsForNumRecordsIn();
- sourceMetricData.registerMetricsForNumBytesIn();
- sourceMetricData.registerMetricsForNumBytesInForMeter();
- sourceMetricData.registerMetricsForNumRecordsInForMeter();
- sourceMetricData.registerMetricsForNumBytesInPerSecond();
- sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(inlongAudit)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
properties.setProperty("name", "engine");
@@ -473,8 +460,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
if (sourceMetricData != null) {
- sourceMetricData.outputMetrics(1L,
- record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ sourceMetricData.outputMetricsWithEstimate(record.value());
}
deserializer.deserialize(record, out);
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index dcb510044..ea102f429 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.mysql.source;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -36,8 +35,8 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.mysql.MySqlValidator;
import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
@@ -62,12 +61,9 @@ import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
import org.apache.kafka.connect.source.SourceRecord;
import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.function.Supplier;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables;
import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
@@ -142,28 +138,18 @@ public class MySqlSource<T>
final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext);
final MySqlSourceReaderMetrics sourceReaderMetrics =
new MySqlSourceReaderMetrics(metricGroup);
- sourceReaderMetrics.registerMetrics();
- MySqlSourceReaderContext mySqlSourceReaderContext =
- new MySqlSourceReaderContext(readerContext);
// create source config for the given subtask (e.g. unique server id)
MySqlSourceConfig sourceConfig =
configFactory.createConfig(readerContext.getIndexOfSubtask());
- String inlongMetric = sourceConfig.getInlongMetric();
- String inlongAudit = sourceConfig.getInlongAudit();
- if (StringUtils.isNotEmpty(inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- sourceReaderMetrics.setInlongGroupId(inlongMetricArray[0]);
- sourceReaderMetrics.setInlongSteamId(inlongMetricArray[1]);
- sourceReaderMetrics.setNodeId(inlongMetricArray[2]);
- if (inlongAudit != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
- sourceReaderMetrics.setAuditImp(AuditImp.getInstance());
- }
- sourceReaderMetrics.registerMetricsForNumBytesIn(Constants.NUM_BYTES_IN);
- sourceReaderMetrics.registerMetricsForNumRecordsIn(Constants.NUM_RECORDS_IN);
- sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(Constants.NUM_BYTES_IN_PER_SECOND);
- sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(Constants.NUM_RECORDS_IN_PER_SECOND);
- }
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(sourceConfig.getInlongMetric())
+ .withInlongAudit(sourceConfig.getInlongAudit())
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ sourceReaderMetrics.registerMetrics(metricOption);
+ MySqlSourceReaderContext mySqlSourceReaderContext =
+ new MySqlSourceReaderContext(readerContext);
+
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
Supplier<MySqlSplitReader> splitReaderSupplier =
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index b301ed572..45c81e560 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -18,13 +18,10 @@
package org.apache.inlong.sort.cdc.mysql.source.metrics;
-import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
/**
@@ -53,56 +50,21 @@ public class MySqlSourceReaderMetrics {
*/
private volatile long emitDelay = 0L;
- private Counter numRecordsIn;
- private Counter numBytesIn;
- private Meter numRecordsInPerSecond;
- private Meter numBytesInPerSecond;
- private static Integer TIME_SPAN_IN_SECONDS = 60;
- private static String STREAM_ID = "streamId";
- private static String GROUP_ID = "groupId";
- private static String NODE_ID = "nodeId";
- private String inlongGroupId;
- private String inlongSteamId;
- private String nodeId;
- private AuditImp auditImp;
+ private SourceMetricData sourceMetricData;
public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
- public void registerMetrics() {
+ public void registerMetrics(MetricOption metricOption) {
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+ }
metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) this::getFetchDelay);
metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) this::getEmitDelay);
metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
}
- public void registerMetricsForNumRecordsIn(String metricName) {
- numRecordsIn =
- metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
- .addGroup(NODE_ID, this.nodeId)
- .counter(metricName);
- }
-
- public void registerMetricsForNumBytesIn(String metricName) {
- numBytesIn =
- metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
- .addGroup(NODE_ID, this.nodeId)
- .counter(metricName);
- }
-
- public void registerMetricsForNumRecordsInPerSecond(String metricName) {
- numRecordsInPerSecond =
- metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
- .addGroup(NODE_ID, nodeId)
- .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
- }
-
- public void registerMetricsForNumBytesInPerSecond(String metricName) {
- numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
- .addGroup(NODE_ID, this.nodeId)
- .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
- }
-
public long getFetchDelay() {
return fetchDelay;
}
@@ -131,77 +93,9 @@ public class MySqlSourceReaderMetrics {
this.emitDelay = emitDelay;
}
- public Counter getNumRecordsIn() {
- return numRecordsIn;
- }
-
- public Counter getNumBytesIn() {
- return numBytesIn;
- }
-
- public Meter getNumRecordsInPerSecond() {
- return numRecordsInPerSecond;
- }
-
- public Meter getNumBytesInPerSecond() {
- return numBytesInPerSecond;
- }
-
- public String getInlongGroupId() {
- return inlongGroupId;
- }
-
- public String getInlongSteamId() {
- return inlongSteamId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId;
- }
-
- public void setInlongSteamId(String inlongSteamId) {
- this.inlongSteamId = inlongSteamId;
- }
-
- public void setNodeId(String nodeId) {
- this.nodeId = nodeId;
- }
-
- public AuditImp getAuditImp() {
- return auditImp;
- }
-
- public void setAuditImp(AuditImp auditImp) {
- this.auditImp = auditImp;
- }
-
public void outputMetrics(long rowCountSize, long rowDataSize) {
- outputMetricForFlink(rowCountSize, rowDataSize);
- outputMetricForAudit(rowCountSize, rowDataSize);
- }
-
- public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
- if (this.auditImp != null) {
- this.auditImp.add(
- Constants.AUDIT_SORT_INPUT,
- getInlongGroupId(),
- getInlongSteamId(),
- System.currentTimeMillis(),
- rowCountSize,
- rowDataSize);
- }
- }
-
- public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
- if (this.numBytesIn != null) {
- numBytesIn.inc(rowDataSize);
- }
- if (this.numRecordsIn != null) {
- this.numRecordsIn.inc(rowCountSize);
+ if (sourceMetricData != null) {
+ sourceMetricData.outputMetrics(rowCountSize, rowDataSize);
}
}
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index c8a70b8bd..b4610d959 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
import org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange;
@@ -124,7 +123,6 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
final ReadableConfig config = helper.getOptions();
final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
final String inlongAudit = config.get(INLONG_AUDIT);
- ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
final String hostname = config.get(HOSTNAME);
final String username = config.get(USERNAME);
final String password = config.get(PASSWORD);
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 1458693fb..869a0a1cd 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -58,7 +57,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -68,9 +68,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,7 +79,6 @@ import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -416,21 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
(Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
- if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String groupId = inlongMetricArray[0];
- String streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- AuditImp auditImp = null;
- if (inlongAudit != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
- }
- metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
- metricData.registerMetricsForNumRecordsIn();
- metricData.registerMetricsForNumBytesIn();
- metricData.registerMetricsForNumBytesInPerSecond();
- metricData.registerMetricsForNumRecordsInPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(inlongAudit)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -470,8 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
if (metricData != null) {
- metricData.outputMetrics(1L,
- record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ metricData.outputMetricsWithEstimate(record.value());
}
deserializer.deserialize(record, out);
}
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index 0b4471ae3..d020d03f9 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
import java.util.HashSet;
import java.util.Set;
@@ -114,7 +113,6 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String inlongAudit = config.get(INLONG_AUDIT);
- ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
return new OracleTableSource(
physicalSchema,
port,
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index 4ef40bcc8..2ccf92421 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -51,7 +50,6 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -60,7 +58,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
@@ -72,9 +71,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -85,7 +82,6 @@ import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
@@ -441,29 +437,15 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
(Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
- if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String groupId = inlongMetricArray[0];
- String streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- AuditImp auditImp = null;
- if (inlongAudit != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
- }
- sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
- SimpleCounter recordsInCounter = new SimpleCounter();
- SimpleCounter bytesInCounter = new SimpleCounter();
- if (metricState != null) {
- recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
- bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
- }
- sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
- sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
- sourceMetricData.registerMetricsForNumRecordsInForMeter();
- sourceMetricData.registerMetricsForNumBytesInForMeter();
- sourceMetricData.registerMetricsForNumBytesInPerSecond();
- sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(inlongAudit)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -504,8 +486,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
deserializer.deserialize(record, out);
if (sourceMetricData != null) {
- sourceMetricData.outputMetrics(1L,
- record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ sourceMetricData.outputMetricsWithEstimate(record.value());
}
}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index a886f8aef..02c624290 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
import java.util.HashSet;
import java.util.Set;
@@ -127,7 +126,6 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String inlongAudit = config.get(INLONG_AUDIT);
- ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
return new PostgreSQLTableSource(
physicalSchema,
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
deleted file mode 100644
index 18fd19955..000000000
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.pulsar.table;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-public class Constants {
-
- public static final ConfigOption<String> INLONG_METRIC =
- ConfigOptions.key("inlong.metric")
- .stringType()
- .defaultValue("")
- .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
-}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 6d88a303b..bd4ecaaee 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -33,7 +33,8 @@ import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
import org.apache.pulsar.client.api.Message;
@@ -44,14 +45,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
/**
* A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
*/
@@ -76,12 +72,6 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
private String inlongMetric;
private String auditHostAndPorts;
- private AuditImp auditImp;
-
- private String inlongGroupId;
-
- private String inlongStreamId;
-
DynamicPulsarDeserializationSchema(
int physicalArity,
@Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -120,23 +110,14 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
}
valueDeserialization.open(context);
- if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- inlongGroupId = inlongMetricArray[0];
- inlongStreamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, getMetricGroup(context));
- sourceMetricData.registerMetricsForNumBytesIn();
- sourceMetricData.registerMetricsForNumBytesInPerSecond();
- sourceMetricData.registerMetricsForNumRecordsIn();
- sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption, getMetricGroup(context));
}
-
- if (auditHostAndPorts != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
- }
-
}
/**
@@ -191,7 +172,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
- sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
+ sourceMetricData.outputMetricsWithEstimate(inputRow);
collector.collect(inputRow);
}));
return;
@@ -212,7 +193,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
- sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
+ sourceMetricData.outputMetricsWithEstimate(inputRow);
outputCollector.collect(inputRow);
}));
}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index bf74dc8d2..5a3ddfe51 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -78,7 +78,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
/**
* Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index ff0361b16..e4793dec7 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -68,7 +68,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
/**
* Upsert-Pulsar factory.
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
index c28115846..16ee84362 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.sqlserver.table;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.Validator;
@@ -42,9 +41,7 @@ import io.debezium.heartbeat.Heartbeat;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -54,7 +51,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -76,8 +72,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -217,14 +213,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private SourceMetricData metricData;
- private String inlongGroupId;
-
private String auditHostAndPorts;
- private String inlongStreamId;
-
- private transient AuditImp auditImp;
-
// ---------------------------------------------------------------------------------------
public DebeziumSourceFunction(
@@ -412,20 +402,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
(Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
- if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- inlongGroupId = inlongMetricArray[0];
- inlongStreamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, metricGroup);
- metricData.registerMetricsForNumRecordsIn();
- metricData.registerMetricsForNumBytesIn();
- metricData.registerMetricsForNumBytesInPerSecond();
- metricData.registerMetricsForNumRecordsInPerSecond();
- }
- if (auditHostAndPorts != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SourceMetricData(metricOption, metricGroup);
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -464,7 +447,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
new DebeziumDeserializationSchema<T>() {
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
- outputMetrics(record);
+ if (metricData != null) {
+ metricData.outputMetricsWithEstimate(record.value());
+ }
deserializer.deserialize(record, out);
}
@@ -502,23 +487,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
debeziumChangeFetcher.runFetchLoop();
}
- private void outputMetrics(SourceRecord record) {
- if (metricData != null) {
- metricData.getNumRecordsIn().inc(1L);
- metricData.getNumBytesIn()
- .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
- }
- if (auditImp != null) {
- auditImp.add(
- Constants.AUDIT_SORT_INPUT,
- inlongGroupId,
- inlongStreamId,
- System.currentTimeMillis(),
- 1,
- record.value().toString().getBytes(StandardCharsets.UTF_8).length);
- }
- }
-
@Override
public void notifyCheckpointComplete(long checkpointId) {
if (!debeziumStarted) {
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 3cc166601..53a8e217f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.parser.impl;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.function.EncryptFunction;
@@ -64,6 +65,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Flink sql parse handler
@@ -149,7 +151,7 @@ public class FlinkSqlParser implements Parser {
Preconditions.checkNotNull(streamInfo.getRelations(), "relations is null");
Preconditions.checkState(!streamInfo.getRelations().isEmpty(), "relations is empty");
log.info("start parse stream, streamId:{}", streamInfo.getStreamId());
- // Inject the `inlong.metric` for ExtractNode or LoadNode
+ // Inject the metric option for ExtractNode or LoadNode
injectInlongMetric(streamInfo);
Map<String, Node> nodeMap = new HashMap<>(streamInfo.getNodes().size());
streamInfo.getNodes().forEach(s -> {
@@ -169,7 +171,7 @@ public class FlinkSqlParser implements Parser {
}
/**
- * Inject the `inlong.metric` for ExtractNode or LoadNode
+ * Inject the metric option for ExtractNode or LoadNode
*
* @param streamInfo The encapsulation of nodes and node relations
*/
@@ -183,16 +185,19 @@ public class FlinkSqlParser implements Parser {
} else if (node instanceof ExtractNode) {
((ExtractNode) node).setProperties(properties);
} else {
- throw new UnsupportedOperationException(String.format("Unsupported inlong metric for: %s",
- node.getClass().getSimpleName()));
+ throw new UnsupportedOperationException(String.format(
+ "Unsupported inlong group stream node for: %s", node.getClass().getSimpleName()));
}
}
- properties.put(InlongMetric.METRIC_KEY,
- String.format(InlongMetric.METRIC_VALUE_FORMAT, groupInfo.getGroupId(),
- streamInfo.getStreamId(), node.getId()));
- if (StringUtils.isNotEmpty(groupInfo.getProperties().get(InlongMetric.AUDIT_KEY))) {
- properties.put(InlongMetric.AUDIT_KEY,
- groupInfo.getProperties().get(InlongMetric.AUDIT_KEY));
+ properties.put(Constants.METRICS_LABELS.key(),
+ Stream.of(Constants.GROUP_ID + "=" + groupInfo.getGroupId(),
+ Constants.STREAM_ID + "=" + streamInfo.getStreamId(),
+ Constants.NODE_ID + "=" + node.getId())
+ .collect(Collectors.joining("&")));
+ // METRICS_AUDIT_PROXY_HOSTS depends on INLONG_GROUP_STREAM_NODE
+ if (StringUtils.isNotEmpty(groupInfo.getProperties().get(Constants.METRICS_AUDIT_PROXY_HOSTS.key()))) {
+ properties.put(Constants.METRICS_AUDIT_PROXY_HOSTS.key(),
+ groupInfo.getProperties().get(Constants.METRICS_AUDIT_PROXY_HOSTS.key()));
}
});
}