You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/10 12:07:55 UTC
[inlong] branch master updated: [INLONG-5464][Sort] Optimize the implementation of the sort-connector-base metric common abstract (#5465)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 576e469f7 [INLONG-5464][Sort] Optimize the implementation of the sort-connector-base metric common abstract (#5465)
576e469f7 is described below
commit 576e469f7c9765a5666e1e3ae58de292945997ed
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Wed Aug 10 20:07:51 2022 +0800
[INLONG-5464][Sort] Optimize the implementation of the sort-connector-base metric common abstract (#5465)
---
.../{connector-base => base}/pom.xml | 4 +-
.../org/apache/inlong/sort/base/Constants.java | 17 +-
.../apache/inlong/sort/base/metric/MetricData.java | 97 +++++++++++
.../inlong/sort/base/metric/SinkMetricData.java | 170 ++++++++++++++++++
.../inlong/sort/base/metric/SourceMetricData.java | 131 ++++++++++++++
.../inlong/sort/base/metric/ThreadSafeCounter.java | 0
.../sort/base/metric/ThreadSafeCounterTest.java | 0
.../inlong/sort/base/metric/SinkMetricData.java | 192 ---------------------
.../inlong/sort/base/metric/SourceMetricData.java | 138 ---------------
.../inlong/sort/hbase/sink/HBaseSinkFunction.java | 27 +--
.../jdbc/internal/JdbcBatchingOutputFormat.java | 27 +--
.../inlong/sort/kafka/FlinkKafkaProducer.java | 53 ++----
.../table/DynamicKafkaDeserializationSchema.java | 51 +++---
.../sort/cdc/mongodb/DebeziumSourceFunction.java | 35 ++--
.../sort/cdc/oracle/DebeziumSourceFunction.java | 35 ++--
inlong-sort/sort-connectors/pom.xml | 2 +-
.../DebeziumSourceFunction.java | 17 +-
.../table/DynamicPulsarDeserializationSchema.java | 65 +++----
18 files changed, 527 insertions(+), 534 deletions(-)
diff --git a/inlong-sort/sort-connectors/connector-base/pom.xml b/inlong-sort/sort-connectors/base/pom.xml
similarity index 91%
rename from inlong-sort/sort-connectors/connector-base/pom.xml
rename to inlong-sort/sort-connectors/base/pom.xml
index e26db29ff..7106980d1 100644
--- a/inlong-sort/sort-connectors/connector-base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -16,8 +16,8 @@
~ limitations under the License.
-->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sort-connectors</artifactId>
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
similarity index 85%
rename from inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
rename to inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index e5a755aa6..54c911e77 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -48,7 +48,22 @@ public final class Constants {
public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond";
-
+ /**
+ * Time span in seconds
+ */
+ public static final Integer TIME_SPAN_IN_SECONDS = 60;
+ /**
+ * Stream id used in inlong metric
+ */
+ public static final String STREAM_ID = "streamId";
+ /**
+ * Group id used in inlong metric
+ */
+ public static final String GROUP_ID = "groupId";
+ /**
+ * Node id used in inlong metric
+ */
+ public static final String NODE_ID = "nodeId";
/**
* It is used for inlong.metric
*/
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
new file mode 100644
index 000000000..681318ff7
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+
+import static org.apache.inlong.sort.base.Constants.GROUP_ID;
+import static org.apache.inlong.sort.base.Constants.NODE_ID;
+import static org.apache.inlong.sort.base.Constants.STREAM_ID;
+import static org.apache.inlong.sort.base.Constants.TIME_SPAN_IN_SECONDS;
+
+/**
+ * This class is the top-level interface of metric data, it defines common metric data methods.
+ */
+public interface MetricData {
+
+ /**
+ * Get metric group
+ *
+ * @return The MetricGroup
+ */
+ MetricGroup getMetricGroup();
+
+ /**
+ * Get group id
+ *
+ * @return The group id defined in inlong
+ */
+ String getGroupId();
+
+ /**
+ * Get stream id
+ *
+ * @return The stream id defined in inlong
+ */
+ String getStreamId();
+
+ /**
+ * Get node id
+ *
+ * @return The node id defined in inlong
+ */
+ String getNodeId();
+
+ /**
+ * Register a counter metric
+ *
+ * @param metricName The metric name
+ * @param counter The counter of metric
+ * @return Counter of registered
+ */
+ default Counter registerCounter(String metricName, Counter counter) {
+ return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
+ .addGroup(NODE_ID, getNodeId()).counter(metricName, counter);
+ }
+
+ /**
+ * Register a counter metric
+ *
+ * @param metricName The metric name
+ * @return Counter of registered
+ */
+ default Counter registerCounter(String metricName) {
+ return registerCounter(metricName, new SimpleCounter());
+ }
+
+ /**
+ * Register a meter metric
+ *
+ * @param metricName The metric name
+ * @return Meter of registered
+ */
+ default Meter registerMeter(String metricName, Counter counter) {
+ return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
+ .addGroup(NODE_ID, getNodeId()).meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS));
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
new file mode 100644
index 000000000..f68ad9906
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
+
+/**
+ * A collection class for handling metrics
+ */
+public class SinkMetricData implements MetricData {
+
+ private final MetricGroup metricGroup;
+ private final String groupId;
+ private final String streamId;
+ private final String nodeId;
+ private Counter numRecordsOut;
+ private Counter numBytesOut;
+ private Counter dirtyRecords;
+ private Counter dirtyBytes;
+ private Meter numRecordsOutPerSecond;
+ private Meter numBytesOutPerSecond;
+
+ public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
+ this.groupId = groupId;
+ this.streamId = streamId;
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumRecordsOut() {
+ registerMetricsForNumRecordsOut(new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumRecordsOut(Counter counter) {
+ numRecordsOut = registerCounter(NUM_RECORDS_OUT, counter);
+ }
+
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumBytesOut() {
+ registerMetricsForNumBytesOut(new SimpleCounter());
+
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumBytesOut(Counter counter) {
+ numBytesOut = registerCounter(NUM_BYTES_OUT, counter);
+ }
+
+ public void registerMetricsForNumRecordsOutPerSecond() {
+ numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOut);
+ }
+
+ public void registerMetricsForNumBytesOutPerSecond() {
+ numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOut);
+ }
+
+ public void registerMetricsForDirtyRecords() {
+ registerMetricsForDirtyRecords(new SimpleCounter());
+ }
+
+ public void registerMetricsForDirtyRecords(Counter counter) {
+ dirtyRecords = registerCounter(DIRTY_RECORDS, counter);
+ }
+
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForDirtyBytes() {
+ registerMetricsForDirtyBytes(new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForDirtyBytes(Counter counter) {
+ dirtyBytes = registerCounter(DIRTY_BYTES, counter);
+ }
+
+ public Counter getNumRecordsOut() {
+ return numRecordsOut;
+ }
+
+ public Counter getNumBytesOut() {
+ return numBytesOut;
+ }
+
+ public Counter getDirtyRecords() {
+ return dirtyRecords;
+ }
+
+ public Counter getDirtyBytes() {
+ return dirtyBytes;
+ }
+
+ public Meter getNumRecordsOutPerSecond() {
+ return numRecordsOutPerSecond;
+ }
+
+ public Meter getNumBytesOutPerSecond() {
+ return numBytesOutPerSecond;
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return metricGroup;
+ }
+
+ @Override
+ public String getGroupId() {
+ return groupId;
+ }
+
+ @Override
+ public String getStreamId() {
+ return streamId;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
new file mode 100644
index 000000000..e548784dc
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
+
+/**
+ * A collection class for handling metrics
+ */
+public class SourceMetricData implements MetricData {
+
+ private final MetricGroup metricGroup;
+ private final String groupId;
+ private final String streamId;
+ private final String nodeId;
+ private Counter numRecordsIn;
+ private Counter numBytesIn;
+ private Meter numRecordsInPerSecond;
+ private Meter numBytesInPerSecond;
+
+ public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
+ this.groupId = groupId;
+ this.streamId = streamId;
+ this.nodeId = nodeId;
+ this.metricGroup = metricGroup;
+ }
+
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumRecordsIn() {
+ registerMetricsForNumRecordsIn(new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumRecordsIn(Counter counter) {
+ numRecordsIn = registerCounter(NUM_RECORDS_IN, counter);
+ }
+
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumBytesIn() {
+ registerMetricsForNumBytesIn(new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumBytesIn(Counter counter) {
+ numBytesIn = registerCounter(NUM_BYTES_IN, counter);
+ }
+
+ public void registerMetricsForNumRecordsInPerSecond() {
+ numRecordsInPerSecond = registerMeter(NUM_RECORDS_IN_PER_SECOND, this.numRecordsIn);
+ }
+
+ public void registerMetricsForNumBytesInPerSecond() {
+ numBytesInPerSecond = registerMeter(NUM_BYTES_IN_PER_SECOND, this.numBytesIn);
+ }
+
+ public Counter getNumRecordsIn() {
+ return numRecordsIn;
+ }
+
+ public Counter getNumBytesIn() {
+ return numBytesIn;
+ }
+
+ public Meter getNumRecordsInPerSecond() {
+ return numRecordsInPerSecond;
+ }
+
+ public Meter getNumBytesInPerSecond() {
+ return numBytesInPerSecond;
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return metricGroup;
+ }
+
+ @Override
+ public String getGroupId() {
+ return groupId;
+ }
+
+ @Override
+ public String getStreamId() {
+ return streamId;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+}
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/ThreadSafeCounter.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/ThreadSafeCounter.java
similarity index 100%
rename from inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/ThreadSafeCounter.java
rename to inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/ThreadSafeCounter.java
diff --git a/inlong-sort/sort-connectors/connector-base/src/test/java/org/apache/inlong/sort/base/metric/ThreadSafeCounterTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/metric/ThreadSafeCounterTest.java
similarity index 100%
rename from inlong-sort/sort-connectors/connector-base/src/test/java/org/apache/inlong/sort/base/metric/ThreadSafeCounterTest.java
rename to inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/metric/ThreadSafeCounterTest.java
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
deleted file mode 100644
index 27c3a2194..000000000
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.base.metric;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-
-/**
- * A collection class for handling metrics
- */
-public class SinkMetricData {
-
- private final MetricGroup metricGroup;
-
- private Counter numRecordsOut;
- private Counter numBytesOut;
- private Counter dirtyRecords;
- private Counter dirtyBytes;
- private Meter numRecordsOutPerSecond;
- private Meter numBytesOutPerSecond;
- private static Integer TIME_SPAN_IN_SECONDS = 60;
- private static String STREAM_ID = "streamId";
- private static String GROUP_ID = "groupId";
- private static String NODE_ID = "nodeId";
-
- public SinkMetricData(MetricGroup metricGroup) {
- this.metricGroup = metricGroup;
- }
-
- /**
- * Default counter is {@link SimpleCounter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
- registerMetricsForNumRecordsOut(groupId, streamId, nodeId, metricName, new SimpleCounter());
- }
-
- /**
- * User can use custom counter that extends from {@link Counter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName,
- Counter counter) {
- numRecordsOut =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName, counter);
- }
-
- /**
- * Default counter is {@link SimpleCounter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
- registerMetricsForNumBytesOut(groupId, streamId, nodeId, metricName, new SimpleCounter());
- }
-
- /**
- * User can use custom counter that extends from {@link Counter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName,
- Counter counter) {
- numBytesOut =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName, counter);
- }
-
- public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
- String metricName) {
- numRecordsOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
- nodeId)
- .meter(metricName, new MeterView(this.numRecordsOut, TIME_SPAN_IN_SECONDS));
- }
-
- public void registerMetricsForNumBytesOutPerSecond(String groupId, String streamId, String nodeId,
- String metricName) {
- numBytesOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
- .addGroup(NODE_ID, nodeId)
- .meter(metricName, new MeterView(this.numBytesOut, TIME_SPAN_IN_SECONDS));
- }
-
- public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
- String metricName) {
- registerMetricsForDirtyRecords(groupId, streamId, nodeId, metricName, new SimpleCounter());
- }
-
- public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
- String metricName, Counter counter) {
- dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName, counter);
- }
-
- /**
- * Default counter is {@link SimpleCounter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
- String metricName) {
- registerMetricsForDirtyBytes(groupId, streamId, nodeId, metricName, new SimpleCounter());
- }
-
- /**
- * User can use custom counter that extends from {@link Counter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
- String metricName, Counter counter) {
- dirtyBytes =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName, counter);
- }
-
- public Counter getNumRecordsOut() {
- return numRecordsOut;
- }
-
- public Counter getNumBytesOut() {
- return numBytesOut;
- }
-
- public Counter getDirtyRecords() {
- return dirtyRecords;
- }
-
- public Counter getDirtyBytes() {
- return dirtyBytes;
- }
-
- public Meter getNumRecordsOutPerSecond() {
- return numRecordsOutPerSecond;
- }
-
- public Meter getNumBytesOutPerSecond() {
- return numBytesOutPerSecond;
- }
-
-}
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
deleted file mode 100644
index a962e2938..000000000
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.base.metric;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-
-/**
- * A collection class for handling metrics
- */
-public class SourceMetricData {
-
- private static Integer TIME_SPAN_IN_SECONDS = 60;
- private static String STREAM_ID = "streamId";
- private static String GROUP_ID = "groupId";
- private static String NODE_ID = "nodeId";
- private final MetricGroup metricGroup;
- private Counter numRecordsIn;
- private Counter numBytesIn;
- private Meter numRecordsInPerSecond;
- private Meter numBytesInPerSecond;
-
- public SourceMetricData(MetricGroup metricGroup) {
- this.metricGroup = metricGroup;
- }
-
- /**
- * Default counter is {@link SimpleCounter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) {
- registerMetricsForNumRecordsIn(groupId, streamId, nodeId, metricName, new SimpleCounter());
- }
-
- /**
- * User can use custom counter that extends from {@link Counter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName,
- Counter counter) {
- numRecordsIn =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName, counter);
- }
-
- /**
- * Default counter is {@link SimpleCounter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) {
- registerMetricsForNumBytesIn(groupId, streamId, nodeId, metricName, new SimpleCounter());
- }
-
- /**
- * User can use custom counter that extends from {@link Counter}
- * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
- * prometheus
- *
- * @param groupId inlong groupId
- * @param streamId inlong streamId
- * @param nodeId inlong nodeId
- * @param metricName metric name
- */
- public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName,
- Counter counter) {
- numBytesIn =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName, counter);
- }
-
- public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId,
- String metricName) {
- numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
- nodeId)
- .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
- }
-
- public void registerMetricsForNumBytesInPerSecond(String groupId, String streamId, String nodeId,
- String metricName) {
- numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
- .addGroup(NODE_ID, nodeId)
- .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
- }
-
- public Counter getNumRecordsIn() {
- return numRecordsIn;
- }
-
- public Counter getNumBytesIn() {
- return numBytesIn;
- }
-
- public Meter getNumRecordsInPerSecond() {
- return numRecordsInPerSecond;
- }
-
- public Meter getNumBytesInPerSecond() {
- return numBytesInPerSecond;
- }
-
-}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 9ed713119..e4760866c 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -52,13 +52,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
-import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
-
/**
* The sink function for HBase.
*
@@ -127,24 +120,18 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
this.runtimeContext = getRuntimeContext();
- sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
String[] inLongMetricArray = inLongMetric.split(Constants.DELIMITER);
String groupId = inLongMetricArray[0];
String streamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, DIRTY_BYTES,
- new ThreadSafeCounter());
- sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, DIRTY_RECORDS,
- new ThreadSafeCounter());
- sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, NUM_BYTES_OUT,
- new ThreadSafeCounter());
- sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, NUM_RECORDS_OUT,
- new ThreadSafeCounter());
- sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId,
- NUM_BYTES_OUT_PER_SECOND);
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
- NUM_RECORDS_OUT_PER_SECOND);
+ sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
+ sinkMetricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
+ sinkMetricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
+ sinkMetricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
+ sinkMetricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
}
this.mutationConverter.open();
this.numPendingRequests = new AtomicLong(0);
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 55474def8..98f0f0cf6 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -51,17 +51,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
-import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
-import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
/**
* A JDBC outputFormat that supports batching records before writing records to database.
@@ -137,24 +130,18 @@ public class JdbcBatchingOutputFormat<
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
this.runtimeContext = getRuntimeContext();
- sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
String[] inLongMetricArray = inLongMetric.split(DELIMITER);
inLongGroupId = inLongMetricArray[0];
inLongStreamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- sinkMetricData.registerMetricsForDirtyBytes(inLongGroupId, inLongStreamId,
- nodeId, DIRTY_BYTES);
- sinkMetricData.registerMetricsForDirtyRecords(inLongGroupId, inLongStreamId,
- nodeId, DIRTY_RECORDS);
- sinkMetricData.registerMetricsForNumBytesOut(inLongGroupId, inLongStreamId,
- nodeId, NUM_BYTES_OUT);
- sinkMetricData.registerMetricsForNumRecordsOut(inLongGroupId, inLongStreamId,
- nodeId, NUM_RECORDS_OUT);
- sinkMetricData.registerMetricsForNumBytesOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
- NUM_BYTES_OUT_PER_SECOND);
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
- NUM_RECORDS_OUT_PER_SECOND);
+ sinkMetricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, runtimeContext.getMetricGroup());
+ sinkMetricData.registerMetricsForDirtyBytes();
+ sinkMetricData.registerMetricsForDirtyRecords();
+ sinkMetricData.registerMetricsForNumBytesOut();
+ sinkMetricData.registerMetricsForNumRecordsOut();
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
}
if (auditHostAndPorts != null) {
AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 2df2b0893..93e7a8b31 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -93,16 +93,9 @@ import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
-import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
-import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
/**
* Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
@@ -227,19 +220,6 @@ public class FlinkKafkaProducer<IN>
* audit host and ports
*/
private final String auditHostAndPorts;
- /**
- * audit implement
- */
- private transient AuditImp auditImp;
- /**
- * inLong groupId
- */
- private String inLongGroupId;
- /**
- * inLong streamId
- */
- private String inLongStreamId;
-
/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
*/
@@ -258,6 +238,18 @@ public class FlinkKafkaProducer<IN>
*/
@Nullable
protected transient volatile Exception asyncException;
+ /**
+ * audit implement
+ */
+ private transient AuditImp auditImp;
+ /**
+ * inLong groupId
+ */
+ private String inLongGroupId;
+ /**
+ * inLong streamId
+ */
+ private String inLongStreamId;
/**
* sink metric data
*/
@@ -913,25 +905,18 @@ public class FlinkKafkaProducer<IN>
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
}
-
- metricData = new SinkMetricData(ctx.getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
String[] inLongMetricArray = inLongMetric.split(DELIMITER);
inLongGroupId = inLongMetricArray[0];
inLongStreamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- metricData.registerMetricsForDirtyBytes(inLongGroupId, inLongStreamId, nodeId, DIRTY_BYTES,
- new ThreadSafeCounter());
- metricData.registerMetricsForDirtyRecords(inLongGroupId, inLongStreamId, nodeId, DIRTY_RECORDS,
- new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOut(inLongGroupId, inLongStreamId, nodeId, NUM_BYTES_OUT,
- new ThreadSafeCounter());
- metricData.registerMetricsForNumRecordsOut(inLongGroupId, inLongStreamId, nodeId, NUM_RECORDS_OUT,
- new ThreadSafeCounter());
- metricData.registerMetricsForNumBytesOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
- NUM_BYTES_OUT_PER_SECOND);
- metricData.registerMetricsForNumRecordsOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
- NUM_RECORDS_OUT_PER_SECOND);
+ metricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, ctx.getMetricGroup());
+ metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
+ metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
+ metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
+ metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+ metricData.registerMetricsForNumBytesOutPerSecond();
+ metricData.registerMetricsForNumRecordsOutPerSecond();
}
if (auditHostAndPorts != null) {
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index f09b16b5b..ef8660280 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -18,14 +18,6 @@
package org.apache.inlong.sort.kafka.table;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
@@ -41,6 +33,14 @@ import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
/**
* deserialization schema for {@link KafkaDynamicSource}.
*/
@@ -48,7 +48,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
private static final long serialVersionUID = 1L;
- private final @Nullable DeserializationSchema<RowData> keyDeserialization;
+ private final @Nullable
+ DeserializationSchema<RowData> keyDeserialization;
private final DeserializationSchema<RowData> valueDeserialization;
@@ -85,7 +86,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode,
String inLongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
@@ -120,14 +121,11 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
inLongGroupId = inLongMetricArray[0];
inLongStreamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- metricData = new SourceMetricData(context.getMetricGroup());
- metricData.registerMetricsForNumBytesIn(inLongGroupId, inLongStreamId, nodeId, "numBytesIn");
- metricData.registerMetricsForNumBytesInPerSecond(inLongGroupId, inLongStreamId,
- nodeId, "numBytesInPerSecond");
- metricData.registerMetricsForNumRecordsIn(inLongGroupId, inLongStreamId,
- nodeId, "numRecordsIn");
- metricData.registerMetricsForNumRecordsInPerSecond(inLongGroupId, inLongStreamId,
- nodeId, "numRecordsInPerSecond");
+ metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, context.getMetricGroup());
+ metricData.registerMetricsForNumBytesIn();
+ metricData.registerMetricsForNumBytesInPerSecond();
+ metricData.registerMetricsForNumRecordsIn();
+ metricData.registerMetricsForNumRecordsInPerSecond();
}
if (auditHostAndPorts != null) {
AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
@@ -187,12 +185,12 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
private void outputMetricForAudit(ConsumerRecord<byte[], byte[]> record) {
if (auditImp != null) {
auditImp.add(
- Constants.AUDIT_SORT_INPUT,
- inLongGroupId,
- inLongStreamId,
- System.currentTimeMillis(),
- 1,
- record.value().length);
+ Constants.AUDIT_SORT_INPUT,
+ inLongGroupId,
+ inLongStreamId,
+ System.currentTimeMillis(),
+ 1,
+ record.value().length);
}
}
@@ -211,6 +209,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
// --------------------------------------------------------------------------------------------
interface MetadataConverter extends Serializable {
+
Object read(ConsumerRecord<?, ?> record);
}
@@ -311,8 +310,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
} else {
throw new DeserializationException(
"Invalid null value received "
- + "in non-upsert mode. Could not to "
- + "set row kind for output record.");
+ + "in non-upsert mode. Could not to "
+ + "set row kind for output record.");
}
} else {
rowKind = physicalValueRow.getRowKind();
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 141cc5672..db41472ed 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -76,7 +76,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
@@ -112,40 +111,33 @@ import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHisto
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
- private static final long serialVersionUID = -5808108641062931623L;
-
- protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
-
/**
* State name of the consumer's partition offset states.
*/
public static final String OFFSETS_STATE_NAME = "offset-states";
-
/**
* State name of the consumer's history records state.
*/
public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
-
/**
* The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
*/
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
/**
* The configuration represents the Debezium MySQL Connector uses the legacy implementation or
* not.
*/
public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
-
/**
* The configuration value represents legacy implementation.
*/
public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+ protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+ private static final long serialVersionUID = -5808108641062931623L;
// ---------------------------------------------------------------------------------------
// Properties
// ---------------------------------------------------------------------------------------
-
/**
* The schema to convert from Debezium's messages into Flink's objects.
*/
@@ -166,21 +158,18 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
* Data for pending but uncommitted offsets.
*/
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
- /**
- * Flag indicating whether the Debezium Engine is started.
- */
- private volatile boolean debeziumStarted = false;
-
/**
* Validator to validate the connected database satisfies the cdc connector's requirements.
*/
private final Validator validator;
+ /**
+ * Flag indicating whether the Debezium Engine is started.
+ */
+ private volatile boolean debeziumStarted = false;
// ---------------------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------------------
-
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
@@ -425,13 +414,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- metricData = new SourceMetricData(metricGroup);
- metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, Constants.NUM_RECORDS_IN);
- metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, Constants.NUM_BYTES_IN);
- metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId,
- Constants.NUM_BYTES_IN_PER_SECOND);
- metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
- Constants.NUM_RECORDS_IN_PER_SECOND);
+ metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+ metricData.registerMetricsForNumRecordsIn();
+ metricData.registerMetricsForNumBytesIn();
+ metricData.registerMetricsForNumBytesInPerSecond();
+ metricData.registerMetricsForNumRecordsInPerSecond();
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 166f7f1b7..5d7eb10ff 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -76,7 +76,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
@@ -112,40 +111,33 @@ import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHisto
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
- private static final long serialVersionUID = -5808108641062931623L;
-
- protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
-
/**
* State name of the consumer's partition offset states.
*/
public static final String OFFSETS_STATE_NAME = "offset-states";
-
/**
* State name of the consumer's history records state.
*/
public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
-
/**
* The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
*/
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
/**
* The configuration represents the Debezium MySQL Connector uses the legacy implementation or
* not.
*/
public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
-
/**
* The configuration value represents legacy implementation.
*/
public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+ protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+ private static final long serialVersionUID = -5808108641062931623L;
// ---------------------------------------------------------------------------------------
// Properties
// ---------------------------------------------------------------------------------------
-
/**
* The schema to convert from Debezium's messages into Flink's objects.
*/
@@ -166,21 +158,18 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
* Data for pending but uncommitted offsets.
*/
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
- /**
- * Flag indicating whether the Debezium Engine is started.
- */
- private volatile boolean debeziumStarted = false;
-
/**
* Validator to validate the connected database satisfies the cdc connector's requirements.
*/
private final Validator validator;
+ /**
+ * Flag indicating whether the Debezium Engine is started.
+ */
+ private volatile boolean debeziumStarted = false;
// ---------------------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------------------
-
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
@@ -425,13 +414,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- metricData = new SourceMetricData(metricGroup);
- metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, Constants.NUM_RECORDS_IN);
- metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, Constants.NUM_BYTES_IN);
- metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId,
- Constants.NUM_BYTES_IN_PER_SECOND);
- metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
- Constants.NUM_RECORDS_IN_PER_SECOND);
+ metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+ metricData.registerMetricsForNumRecordsIn();
+ metricData.registerMetricsForNumBytesIn();
+ metricData.registerMetricsForNumBytesInPerSecond();
+ metricData.registerMetricsForNumRecordsInPerSecond();
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index fa6f26b80..fe6503227 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -35,7 +35,7 @@
<packaging>pom</packaging>
<modules>
- <module>connector-base</module>
+ <module>base</module>
<module>hive</module>
<module>mysql-cdc</module>
<module>kafka</module>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index cc02fd1e2..e53ff76dc 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -76,13 +76,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -419,13 +414,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- sourceMetricData = new SourceMetricData(metricGroup);
- sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, NUM_RECORDS_IN);
- sourceMetricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, NUM_BYTES_IN);
- sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId, streamId,
- nodeId, NUM_BYTES_IN_PER_SECOND);
- sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
- nodeId, NUM_RECORDS_IN_PER_SECOND);
+ sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+ sourceMetricData.registerMetricsForNumRecordsIn();
+ sourceMetricData.registerMetricsForNumBytesIn();
+ sourceMetricData.registerMetricsForNumBytesInPerSecond();
+ sourceMetricData.registerMetricsForNumRecordsInPerSecond();
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 7f6b1fb95..ea0c995c8 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,17 +18,6 @@
package org.apache.inlong.sort.pulsar.table;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
@@ -45,38 +34,36 @@ import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
/**
* A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
*/
class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
-
+ private static final ThreadLocal<SimpleCollector<RowData>> tlsCollector =
+ new ThreadLocal<SimpleCollector<RowData>>() {
+ @Override
+ public SimpleCollector initialValue() {
+ return new SimpleCollector();
+ }
+ };
@Nullable
private final DeserializationSchema<RowData> keyDeserialization;
-
private final DeserializationSchema<RowData> valueDeserialization;
-
private final boolean hasMetadata;
-
private final OutputProjectionCollector outputCollector;
-
private final TypeInformation<RowData> producedTypeInfo;
-
private final boolean upsertMode;
-
private SourceMetricData sourceMetricData;
-
private String inlongMetric;
- private static final ThreadLocal<SimpleCollector<RowData>> tlsCollector =
- new ThreadLocal<SimpleCollector<RowData>>() {
- @Override
- public SimpleCollector initialValue() {
- return new SimpleCollector();
- }
- };
-
DynamicPulsarDeserializationSchema(
int physicalArity,
@Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -87,7 +74,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode,
- String inlongMetric) {
+ String inlongMetric) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
@@ -115,19 +102,15 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
valueDeserialization.open(context);
if (inlongMetric != null && !inlongMetric.isEmpty()) {
- sourceMetricData = new SourceMetricData(context.getMetricGroup());
String[] inLongMetricArray = inlongMetric.split(DELIMITER);
String groupId = inLongMetricArray[0];
String streamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- sourceMetricData.registerMetricsForNumBytesIn(groupId,
- streamId, nodeId, NUM_BYTES_IN);
- sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId,
- streamId, nodeId, NUM_BYTES_IN_PER_SECOND);
- sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId,
- nodeId, NUM_RECORDS_IN);
- sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
- nodeId, NUM_RECORDS_IN_PER_SECOND);
+ sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, context.getMetricGroup());
+ sourceMetricData.registerMetricsForNumBytesIn();
+ sourceMetricData.registerMetricsForNumBytesInPerSecond();
+ sourceMetricData.registerMetricsForNumRecordsIn();
+ sourceMetricData.registerMetricsForNumRecordsInPerSecond();
}
}
@@ -153,7 +136,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
if (sourceMetricData != null) {
sourceMetricData.getNumRecordsIn().inc(1L);
sourceMetricData.getNumBytesIn()
- .inc(message.getData().length);
+ .inc(message.getData().length);
}
return;
}
@@ -176,7 +159,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
if (sourceMetricData != null) {
sourceMetricData.getNumRecordsIn().inc(1L);
sourceMetricData.getNumBytesIn()
- .inc(message.getData().length);
+ .inc(message.getData().length);
}
}
@@ -196,6 +179,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
// --------------------------------------------------------------------------------------------
interface MetadataConverter extends Serializable {
+
Object read(Message<?> message);
}
@@ -219,6 +203,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
}
private static class SimpleCollector<T> implements Collector<T> {
+
private T record;
@Override
@@ -320,7 +305,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
} else {
throw new DeserializationException(
"Invalid null value received in non-upsert mode. "
- + "Could not to set row kind for output record.");
+ + "Could not to set row kind for output record.");
}
} else {
rowKind = physicalValueRow.getRowKind();