You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/18 07:20:11 UTC

[inlong] branch master updated: [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d1857468c [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
d1857468c is described below

commit d1857468ccd2f7839b18d477fde33f669f60e2ad
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Sun Sep 18 15:20:05 2022 +0800

    [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
---
 .../org/apache/inlong/sort/base/Constants.java     |   6 +
 .../inlong/sort/base/metric/MetricState.java       |  65 +++++++++++
 .../inlong/sort/base/metric/SourceMetricData.java  |  69 ++++++++++-
 .../inlong/sort/base/util/MetricStateUtils.java    | 128 +++++++++++++++++++++
 .../sort/base/util/MetricStateUtilsTest.java       |  64 +++++++++++
 .../DebeziumSourceFunction.java                    |  39 ++++++-
 6 files changed, 366 insertions(+), 5 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 45023d38b..9daed86e0 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -45,6 +45,10 @@ public final class Constants {
 
     public static final String NUM_BYTES_IN = "numBytesIn";
 
+    public static final String NUM_RECORDS_IN_FOR_METER = "numRecordsInForMeter";
+
+    public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter";
+
     public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
 
     public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond";
@@ -75,6 +79,8 @@ public final class Constants {
     // sort send successfully
     public static final Integer AUDIT_SORT_OUTPUT = 8;
 
+    public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
+
     public static final ConfigOption<String> INLONG_METRIC =
         ConfigOptions.key("inlong.metric")
             .stringType()
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
new file mode 100644
index 000000000..9240c0c8a
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
@@ -0,0 +1,65 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * metric state for supporting {@link org.apache.flink.metrics.Counter} metric snapshot and restore
+ */
+public class MetricState implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Integer subtaskIndex;
+
+    private Map<String, Long> metrics;
+
+    public MetricState() {
+    }
+
+    public MetricState(Integer subtaskIndex, Map<String, Long> metrics) {
+        this.subtaskIndex = subtaskIndex;
+        this.metrics = metrics;
+    }
+
+    public Integer getSubtaskIndex() {
+        return subtaskIndex;
+    }
+
+    public void setSubtaskIndex(Integer subtaskIndex) {
+        this.subtaskIndex = subtaskIndex;
+    }
+
+    public Map<String, Long> getMetrics() {
+        return metrics;
+    }
+
+    public void setMetrics(Map<String, Long> metrics) {
+        this.metrics = metrics;
+    }
+
+    public Long getMetricValue(String metricName) {
+        if (metrics != null) {
+            return metrics.getOrDefault(metricName, 0L);
+        }
+        return 0L;
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index d97efc9f5..5c25fcc75 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -32,8 +32,10 @@ import java.util.HashSet;
 
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
 
 /**
@@ -47,6 +49,8 @@ public class SourceMetricData implements MetricData {
     private final String nodeId;
     private Counter numRecordsIn;
     private Counter numBytesIn;
+    private Counter numRecordsInForMeter;
+    private Counter numBytesInForMeter;
     private Meter numRecordsInPerSecond;
     private Meter numBytesInPerSecond;
     private final AuditImp auditImp;
@@ -82,6 +86,42 @@ public class SourceMetricData implements MetricData {
         }
     }
 
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsInForMeter() {
+        registerMetricsForNumRecordsInForMeter(new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsInForMeter(Counter counter) {
+        numRecordsInForMeter = registerCounter(NUM_RECORDS_IN_FOR_METER, counter);
+    }
+
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesInForMeter() {
+        registerMetricsForNumBytesInForMeter(new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesInForMeter(Counter counter) {
+        numBytesInForMeter = registerCounter(NUM_BYTES_IN_FOR_METER, counter);
+    }
+
     /**
      * Default counter is {@link SimpleCounter}
      * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
@@ -119,11 +159,11 @@ public class SourceMetricData implements MetricData {
     }
 
     public void registerMetricsForNumRecordsInPerSecond() {
-        numRecordsInPerSecond = registerMeter(NUM_RECORDS_IN_PER_SECOND, this.numRecordsIn);
+        numRecordsInPerSecond = registerMeter(NUM_RECORDS_IN_PER_SECOND, this.numRecordsInForMeter);
     }
 
     public void registerMetricsForNumBytesInPerSecond() {
-        numBytesInPerSecond = registerMeter(NUM_BYTES_IN_PER_SECOND, this.numBytesIn);
+        numBytesInPerSecond = registerMeter(NUM_BYTES_IN_PER_SECOND, this.numBytesInForMeter);
     }
 
     public Counter getNumRecordsIn() {
@@ -142,6 +182,14 @@ public class SourceMetricData implements MetricData {
         return numBytesInPerSecond;
     }
 
+    public Counter getNumRecordsInForMeter() {
+        return numRecordsInForMeter;
+    }
+
+    public Counter getNumBytesInForMeter() {
+        return numBytesInForMeter;
+    }
+
     @Override
     public MetricGroup getMetricGroup() {
         return metricGroup;
@@ -182,5 +230,22 @@ public class SourceMetricData implements MetricData {
     public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
         this.numBytesIn.inc(rowDataSize);
         this.numRecordsIn.inc(rowCountSize);
+        this.numBytesInForMeter.inc(rowDataSize);
+        this.numRecordsInForMeter.inc(rowCountSize);
+    }
+
+    @Override
+    public String toString() {
+        return "SourceMetricData{"
+                + "groupId='" + groupId + '\''
+                + ", streamId='" + streamId + '\''
+                + ", nodeId='" + nodeId + '\''
+                + ", numRecordsIn=" + numRecordsIn.getCount()
+                + ", numBytesIn=" + numBytesIn.getCount()
+                + ", numRecordsInForMeter=" + numRecordsInForMeter.getCount()
+                + ", numBytesInForMeter=" + numBytesInForMeter.getCount()
+                + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
+                + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+                + '}';
     }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
new file mode 100644
index 000000000..d878381ba
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
@@ -0,0 +1,128 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+
+/**
+ * metric state for {@link MetricState} supporting snapshot and restore
+ */
+@Slf4j
+public class MetricStateUtils {
+
+    /**
+     *
+     * restore metric state data
+     * @param metricStateListState state data list
+     * @param subtaskIndex current subtask index
+     * @param currentSubtaskNum number of current parallel subtask
+     * @return metric state
+     * @throws Exception throw exception metricStateListState.get()
+     */
+    public static MetricState restoreMetricState(ListState<MetricState> metricStateListState, Integer subtaskIndex,
+            Integer currentSubtaskNum) throws Exception {
+        if (metricStateListState == null || metricStateListState.get() == null) {
+            return null;
+        }
+        log.info("restoreMetricState:{}, subtaskIndex:{}, currentSubtaskNum:{}", metricStateListState, subtaskIndex,
+                currentSubtaskNum);
+        MetricState currentMetricState;
+        Map<Integer, MetricState> map = new HashMap<>(16);
+        for (MetricState metricState : metricStateListState.get()) {
+            map.put(metricState.getSubtaskIndex(), metricState);
+        }
+        int previousSubtaskNum = map.size();
+        if (currentSubtaskNum >= previousSubtaskNum) {
+            currentMetricState = map.get(subtaskIndex);
+        } else {
+            Map<String, Long> metrics = new HashMap<>(4);
+            currentMetricState = new MetricState(subtaskIndex, metrics);
+            List<Integer> indexList = computeIndexList(subtaskIndex, currentSubtaskNum, previousSubtaskNum);
+            for (Integer index : indexList) {
+                MetricState metricState = map.get(index);
+                for (Map.Entry<String, Long> entry : metricState.getMetrics().entrySet()) {
+                    if (metrics.containsKey(entry.getKey())) {
+                        metrics.put(entry.getKey(), metrics.get(entry.getKey()) + entry.getValue());
+                    } else {
+                        metrics.put(entry.getKey(), entry.getValue());
+                    }
+                }
+            }
+        }
+        return currentMetricState;
+    }
+
+    /**
+     *
+     * Assignment previous subtask index to current subtask when reduce parallelism
+     * n = N/m, get n old task per new subtask, mth new subtask get (N - (m - 1) * n) old task
+     * @param subtaskIndex current subtask index
+     * @param currentSubtaskNum number of current parallel subtask
+     * @param previousSubtaskNum number of previous parallel subtask
+     * @return index list
+     */
+    public static List<Integer> computeIndexList(Integer subtaskIndex, Integer currentSubtaskNum,
+            Integer previousSubtaskNum) {
+        List<Integer> indexList = new ArrayList<>();
+        int assignTaskNum = previousSubtaskNum / currentSubtaskNum;
+        if (subtaskIndex == currentSubtaskNum - 1) {
+            for (int i = subtaskIndex * assignTaskNum; i < previousSubtaskNum; i++) {
+                indexList.add(i);
+            }
+        } else {
+            for (int i = 1; i <= assignTaskNum; i++) {
+                indexList.add(i + subtaskIndex * assignTaskNum - 1);
+            }
+        }
+        return indexList;
+    }
+
+    /**
+     *
+     * Snapshot metric state data for {@link SourceMetricData}
+     * @param metricStateListState state data list
+     * @param sourceMetricData {@link SourceMetricData} A collection class for handling metrics
+     * @param subtaskIndex subtask index
+     * @throws Exception throw exception when add metric state
+     */
+    public static void snapshotMetricStateForSourceMetricData(ListState<MetricState> metricStateListState,
+            SourceMetricData sourceMetricData, Integer subtaskIndex)
+            throws Exception {
+        log.info("snapshotMetricStateForSourceMetricData:{}, sourceMetricData:{}, subtaskIndex:{}",
+                metricStateListState, sourceMetricData, subtaskIndex);
+        metricStateListState.clear();
+        Map<String, Long> metricDataMap = new HashMap<>();
+        metricDataMap.put(NUM_RECORDS_IN, sourceMetricData.getNumRecordsIn().getCount());
+        metricDataMap.put(NUM_BYTES_IN, sourceMetricData.getNumBytesIn().getCount());
+        MetricState metricState = new MetricState(subtaskIndex, metricDataMap);
+        metricStateListState.add(metricState);
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java
new file mode 100644
index 000000000..cec538d61
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java
@@ -0,0 +1,64 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MetricStateUtilsTest {
+
+    /**
+     * test assignment previous subtask index to current subtask when reduce parallelism
+     */
+    @Test
+    public void testComputeIndexList() {
+        List<Integer> expectList1 = Arrays.asList(0, 1, 2, 3, 4);
+        List<Integer> currentList1 = MetricStateUtils.computeIndexList(0, 2, 10);
+        assertEquals(expectList1.toString(), currentList1.toString());
+
+        List<Integer> expectList2 = Arrays.asList(0, 1);
+        List<Integer> currentList2 = MetricStateUtils.computeIndexList(0, 4, 10);
+        assertEquals(expectList2.toString(), currentList2.toString());
+
+        List<Integer> expectList3 = Arrays.asList(2, 3);
+        List<Integer> currentList3 = MetricStateUtils.computeIndexList(1, 4, 10);
+        assertEquals(expectList3.toString(), currentList3.toString());
+
+        List<Integer> expectList4 = Arrays.asList(4, 5);
+        List<Integer> currentList4 = MetricStateUtils.computeIndexList(2, 4, 10);
+        assertEquals(expectList4.toString(), currentList4.toString());
+
+        List<Integer> expectList5 = Arrays.asList(6, 7, 8, 9);
+        List<Integer> currentList5 = MetricStateUtils.computeIndexList(3, 4, 10);
+        assertEquals(expectList5.toString(), currentList5.toString());
+
+        List<Integer> expectList7 = Arrays.asList(0);
+        List<Integer> currentList7 = MetricStateUtils.computeIndexList(0, 3, 4);
+        assertEquals(expectList7.toString(), currentList7.toString());
+
+        List<Integer> expectList8 = Arrays.asList(2, 3);
+        List<Integer> currentList8 = MetricStateUtils.computeIndexList(2, 3, 4);
+        assertEquals(expectList8.toString(), currentList8.toString());
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index cd78621db..4ef40bcc8 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -45,11 +45,13 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -59,7 +61,9 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,6 +86,9 @@ import java.util.concurrent.TimeUnit;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -230,6 +237,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private SourceMetricData sourceMetricData;
 
+    private transient ListState<MetricState> metricStateListState;
+
+    private MetricState metricState;
+
     // ---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
@@ -273,9 +284,19 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                         new ListStateDescriptor<>(
                                 HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
 
+        if (this.inlongMetric != null) {
+            this.metricStateListState =
+                    stateStore.getUnionListState(
+                            new ListStateDescriptor<>(
+                                    INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
+
         if (context.isRestored()) {
             restoreOffsetState();
             restoreHistoryRecordsState();
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
         } else {
             if (specificOffset != null) {
                 byte[] serializedOffset =
@@ -345,6 +366,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         } else {
             snapshotOffsetState(functionSnapshotContext.getCheckpointId());
             snapshotHistoryRecordsState();
+            if (sourceMetricData != null && metricStateListState != null) {
+                MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
         }
     }
 
@@ -427,8 +452,16 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 auditImp = AuditImp.getInstance();
             }
             sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            sourceMetricData.registerMetricsForNumRecordsIn();
-            sourceMetricData.registerMetricsForNumBytesIn();
+            SimpleCounter recordsInCounter = new SimpleCounter();
+            SimpleCounter bytesInCounter = new SimpleCounter();
+            if (metricState != null) {
+                recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
+                bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
+            }
+            sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
+            sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
+            sourceMetricData.registerMetricsForNumRecordsInForMeter();
+            sourceMetricData.registerMetricsForNumBytesInForMeter();
             sourceMetricData.registerMetricsForNumBytesInPerSecond();
             sourceMetricData.registerMetricsForNumRecordsInPerSecond();
         }
@@ -469,11 +502,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                         new DebeziumDeserializationSchema<T>() {
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
+                                deserializer.deserialize(record, out);
                                 if (sourceMetricData != null) {
                                     sourceMetricData.outputMetrics(1L,
                                             record.value().toString().getBytes(StandardCharsets.UTF_8).length);
                                 }
-                                deserializer.deserialize(record, out);
                             }
 
                             @Override