You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/18 07:20:11 UTC
[inlong] branch master updated: [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d1857468c [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
d1857468c is described below
commit d1857468ccd2f7839b18d477fde33f669f60e2ad
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Sun Sep 18 15:20:05 2022 +0800
[INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
---
.../org/apache/inlong/sort/base/Constants.java | 6 +
.../inlong/sort/base/metric/MetricState.java | 65 +++++++++++
.../inlong/sort/base/metric/SourceMetricData.java | 69 ++++++++++-
.../inlong/sort/base/util/MetricStateUtils.java | 128 +++++++++++++++++++++
.../sort/base/util/MetricStateUtilsTest.java | 64 +++++++++++
.../DebeziumSourceFunction.java | 39 ++++++-
6 files changed, 366 insertions(+), 5 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 45023d38b..9daed86e0 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -45,6 +45,10 @@ public final class Constants {
public static final String NUM_BYTES_IN = "numBytesIn";
+ public static final String NUM_RECORDS_IN_FOR_METER = "numRecordsInForMeter";
+
+ public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter";
+
public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond";
@@ -75,6 +79,8 @@ public final class Constants {
// sort send successfully
public static final Integer AUDIT_SORT_OUTPUT = 8;
+ public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
+
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric")
.stringType()
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
new file mode 100644
index 000000000..9240c0c8a
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * metric state for supporting {@link org.apache.flink.metrics.Counter} metric snapshot and restore
+ */
+public class MetricState implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Integer subtaskIndex;
+
+ private Map<String, Long> metrics;
+
+ public MetricState() {
+ }
+
+ public MetricState(Integer subtaskIndex, Map<String, Long> metrics) {
+ this.subtaskIndex = subtaskIndex;
+ this.metrics = metrics;
+ }
+
+ public Integer getSubtaskIndex() {
+ return subtaskIndex;
+ }
+
+ public void setSubtaskIndex(Integer subtaskIndex) {
+ this.subtaskIndex = subtaskIndex;
+ }
+
+ public Map<String, Long> getMetrics() {
+ return metrics;
+ }
+
+ public void setMetrics(Map<String, Long> metrics) {
+ this.metrics = metrics;
+ }
+
+ public Long getMetricValue(String metricName) {
+ if (metrics != null) {
+ return metrics.getOrDefault(metricName, 0L);
+ }
+ return 0L;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index d97efc9f5..5c25fcc75 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -32,8 +32,10 @@ import java.util.HashSet;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
/**
@@ -47,6 +49,8 @@ public class SourceMetricData implements MetricData {
private final String nodeId;
private Counter numRecordsIn;
private Counter numBytesIn;
+ private Counter numRecordsInForMeter;
+ private Counter numBytesInForMeter;
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
private final AuditImp auditImp;
@@ -82,6 +86,42 @@ public class SourceMetricData implements MetricData {
}
}
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumRecordsInForMeter() {
+ registerMetricsForNumRecordsInForMeter(new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumRecordsInForMeter(Counter counter) {
+ numRecordsInForMeter = registerCounter(NUM_RECORDS_IN_FOR_METER, counter);
+ }
+
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumBytesInForMeter() {
+ registerMetricsForNumBytesInForMeter(new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumBytesInForMeter(Counter counter) {
+ numBytesInForMeter = registerCounter(NUM_BYTES_IN_FOR_METER, counter);
+ }
+
/**
* Default counter is {@link SimpleCounter}
* groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
@@ -119,11 +159,11 @@ public class SourceMetricData implements MetricData {
}
public void registerMetricsForNumRecordsInPerSecond() {
- numRecordsInPerSecond = registerMeter(NUM_RECORDS_IN_PER_SECOND, this.numRecordsIn);
+ numRecordsInPerSecond = registerMeter(NUM_RECORDS_IN_PER_SECOND, this.numRecordsInForMeter);
}
public void registerMetricsForNumBytesInPerSecond() {
- numBytesInPerSecond = registerMeter(NUM_BYTES_IN_PER_SECOND, this.numBytesIn);
+ numBytesInPerSecond = registerMeter(NUM_BYTES_IN_PER_SECOND, this.numBytesInForMeter);
}
public Counter getNumRecordsIn() {
@@ -142,6 +182,14 @@ public class SourceMetricData implements MetricData {
return numBytesInPerSecond;
}
+ public Counter getNumRecordsInForMeter() {
+ return numRecordsInForMeter;
+ }
+
+ public Counter getNumBytesInForMeter() {
+ return numBytesInForMeter;
+ }
+
@Override
public MetricGroup getMetricGroup() {
return metricGroup;
@@ -182,5 +230,22 @@ public class SourceMetricData implements MetricData {
public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
this.numBytesIn.inc(rowDataSize);
this.numRecordsIn.inc(rowCountSize);
+ this.numBytesInForMeter.inc(rowDataSize);
+ this.numRecordsInForMeter.inc(rowCountSize);
+ }
+
+ @Override
+ public String toString() {
+ return "SourceMetricData{"
+ + "groupId='" + groupId + '\''
+ + ", streamId='" + streamId + '\''
+ + ", nodeId='" + nodeId + '\''
+ + ", numRecordsIn=" + numRecordsIn.getCount()
+ + ", numBytesIn=" + numBytesIn.getCount()
+ + ", numRecordsInForMeter=" + numRecordsInForMeter.getCount()
+ + ", numBytesInForMeter=" + numBytesInForMeter.getCount()
+ + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
+ + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+ + '}';
}
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
new file mode 100644
index 000000000..d878381ba
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+
+/**
+ * metric state for {@link MetricState} supporting snapshot and restore
+ */
+@Slf4j
+public class MetricStateUtils {
+
+ /**
+ *
+ * restore metric state data
+ * @param metricStateListState state data list
+ * @param subtaskIndex current subtask index
+ * @param currentSubtaskNum number of current parallel subtask
+ * @return metric state
+ * @throws Exception throw exception metricStateListState.get()
+ */
+ public static MetricState restoreMetricState(ListState<MetricState> metricStateListState, Integer subtaskIndex,
+ Integer currentSubtaskNum) throws Exception {
+ if (metricStateListState == null || metricStateListState.get() == null) {
+ return null;
+ }
+ log.info("restoreMetricState:{}, subtaskIndex:{}, currentSubtaskNum:{}", metricStateListState, subtaskIndex,
+ currentSubtaskNum);
+ MetricState currentMetricState;
+ Map<Integer, MetricState> map = new HashMap<>(16);
+ for (MetricState metricState : metricStateListState.get()) {
+ map.put(metricState.getSubtaskIndex(), metricState);
+ }
+ int previousSubtaskNum = map.size();
+ if (currentSubtaskNum >= previousSubtaskNum) {
+ currentMetricState = map.get(subtaskIndex);
+ } else {
+ Map<String, Long> metrics = new HashMap<>(4);
+ currentMetricState = new MetricState(subtaskIndex, metrics);
+ List<Integer> indexList = computeIndexList(subtaskIndex, currentSubtaskNum, previousSubtaskNum);
+ for (Integer index : indexList) {
+ MetricState metricState = map.get(index);
+ for (Map.Entry<String, Long> entry : metricState.getMetrics().entrySet()) {
+ if (metrics.containsKey(entry.getKey())) {
+ metrics.put(entry.getKey(), metrics.get(entry.getKey()) + entry.getValue());
+ } else {
+ metrics.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+ return currentMetricState;
+ }
+
+ /**
+ *
+ * Assignment previous subtask index to current subtask when reduce parallelism
+ * n = N/m, get n old task per new subtask, mth new subtask get (N - (m - 1) * n) old task
+ * @param subtaskIndex current subtask index
+ * @param currentSubtaskNum number of current parallel subtask
+ * @param previousSubtaskNum number of previous parallel subtask
+ * @return index list
+ */
+ public static List<Integer> computeIndexList(Integer subtaskIndex, Integer currentSubtaskNum,
+ Integer previousSubtaskNum) {
+ List<Integer> indexList = new ArrayList<>();
+ int assignTaskNum = previousSubtaskNum / currentSubtaskNum;
+ if (subtaskIndex == currentSubtaskNum - 1) {
+ for (int i = subtaskIndex * assignTaskNum; i < previousSubtaskNum; i++) {
+ indexList.add(i);
+ }
+ } else {
+ for (int i = 1; i <= assignTaskNum; i++) {
+ indexList.add(i + subtaskIndex * assignTaskNum - 1);
+ }
+ }
+ return indexList;
+ }
+
+ /**
+ *
+ * Snapshot metric state data for {@link SourceMetricData}
+ * @param metricStateListState state data list
+ * @param sourceMetricData {@link SourceMetricData} A collection class for handling metrics
+ * @param subtaskIndex subtask index
+ * @throws Exception throw exception when add metric state
+ */
+ public static void snapshotMetricStateForSourceMetricData(ListState<MetricState> metricStateListState,
+ SourceMetricData sourceMetricData, Integer subtaskIndex)
+ throws Exception {
+ log.info("snapshotMetricStateForSourceMetricData:{}, sourceMetricData:{}, subtaskIndex:{}",
+ metricStateListState, sourceMetricData, subtaskIndex);
+ metricStateListState.clear();
+ Map<String, Long> metricDataMap = new HashMap<>();
+ metricDataMap.put(NUM_RECORDS_IN, sourceMetricData.getNumRecordsIn().getCount());
+ metricDataMap.put(NUM_BYTES_IN, sourceMetricData.getNumBytesIn().getCount());
+ MetricState metricState = new MetricState(subtaskIndex, metricDataMap);
+ metricStateListState.add(metricState);
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java
new file mode 100644
index 000000000..cec538d61
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MetricStateUtilsTest {
+
+ /**
+ * test assignment previous subtask index to current subtask when reduce parallelism
+ */
+ @Test
+ public void testComputeIndexList() {
+ List<Integer> expectList1 = Arrays.asList(0, 1, 2, 3, 4);
+ List<Integer> currentList1 = MetricStateUtils.computeIndexList(0, 2, 10);
+ assertEquals(expectList1.toString(), currentList1.toString());
+
+ List<Integer> expectList2 = Arrays.asList(0, 1);
+ List<Integer> currentList2 = MetricStateUtils.computeIndexList(0, 4, 10);
+ assertEquals(expectList2.toString(), currentList2.toString());
+
+ List<Integer> expectList3 = Arrays.asList(2, 3);
+ List<Integer> currentList3 = MetricStateUtils.computeIndexList(1, 4, 10);
+ assertEquals(expectList3.toString(), currentList3.toString());
+
+ List<Integer> expectList4 = Arrays.asList(4, 5);
+ List<Integer> currentList4 = MetricStateUtils.computeIndexList(2, 4, 10);
+ assertEquals(expectList4.toString(), currentList4.toString());
+
+ List<Integer> expectList5 = Arrays.asList(6, 7, 8, 9);
+ List<Integer> currentList5 = MetricStateUtils.computeIndexList(3, 4, 10);
+ assertEquals(expectList5.toString(), currentList5.toString());
+
+ List<Integer> expectList7 = Arrays.asList(0);
+ List<Integer> currentList7 = MetricStateUtils.computeIndexList(0, 3, 4);
+ assertEquals(expectList7.toString(), currentList7.toString());
+
+ List<Integer> expectList8 = Arrays.asList(2, 3);
+ List<Integer> currentList8 = MetricStateUtils.computeIndexList(2, 3, 4);
+ assertEquals(expectList8.toString(), currentList8.toString());
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index cd78621db..4ef40bcc8 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -45,11 +45,13 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -59,7 +61,9 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +86,9 @@ import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -230,6 +237,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private SourceMetricData sourceMetricData;
+ private transient ListState<MetricState> metricStateListState;
+
+ private MetricState metricState;
+
// ---------------------------------------------------------------------------------------
public DebeziumSourceFunction(
@@ -273,9 +284,19 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
new ListStateDescriptor<>(
HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+ if (this.inlongMetric != null) {
+ this.metricStateListState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+
if (context.isRestored()) {
restoreOffsetState();
restoreHistoryRecordsState();
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
} else {
if (specificOffset != null) {
byte[] serializedOffset =
@@ -345,6 +366,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
+ if (sourceMetricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
}
}
@@ -427,8 +452,16 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
auditImp = AuditImp.getInstance();
}
sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
- sourceMetricData.registerMetricsForNumRecordsIn();
- sourceMetricData.registerMetricsForNumBytesIn();
+ SimpleCounter recordsInCounter = new SimpleCounter();
+ SimpleCounter bytesInCounter = new SimpleCounter();
+ if (metricState != null) {
+ recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
+ bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
+ }
+ sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
+ sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
+ sourceMetricData.registerMetricsForNumRecordsInForMeter();
+ sourceMetricData.registerMetricsForNumBytesInForMeter();
sourceMetricData.registerMetricsForNumBytesInPerSecond();
sourceMetricData.registerMetricsForNumRecordsInPerSecond();
}
@@ -469,11 +502,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
new DebeziumDeserializationSchema<T>() {
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
+ deserializer.deserialize(record, out);
if (sourceMetricData != null) {
sourceMetricData.outputMetrics(1L,
record.value().toString().getBytes(StandardCharsets.UTF_8).length);
}
- deserializer.deserialize(record, out);
}
@Override