You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/11/03 02:24:12 UTC

[flink] branch release-1.15 updated: [FLINK-29567][connector/common] Change numRecordsSend / numBytesSend / numRecordsSendError back to numRecordsOut / numBytesOut / numRecordsOutError in sink

This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 221e9f7c5e5 [FLINK-29567][connector/common] Change numRecordsSend / numBytesSend / numRecordsSendError back to numRecordsOut / numBytesOut / numRecordsOutError in sink
221e9f7c5e5 is described below

commit 221e9f7c5e59265b7448eba70aea393b60a1fe42
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Fri Oct 14 16:32:52 2022 +0800

    [FLINK-29567][connector/common] Change numRecordsSend / numBytesSend / numRecordsSendError back to numRecordsOut / numBytesOut / numRecordsOutError in sink
---
 .../firehose/sink/KinesisFirehoseSinkWriter.java   |  9 +-------
 .../kinesis/sink/KinesisStreamsSinkWriter.java     |  8 +------
 .../base/sink/writer/AsyncSinkWriter.java          | 12 +++++-----
 .../base/sink/writer/TestSinkInitContext.java      |  4 ++--
 .../connector/file/sink/writer/FileWriter.java     |  7 +++---
 .../connector/file/sink/writer/FileWriterTest.java |  2 +-
 .../flink/connector/kafka/sink/KafkaWriter.java    | 18 ++++++---------
 .../connector/kafka/sink/KafkaWriterITCase.java    | 22 ++++++++----------
 .../metrics/groups/SinkWriterMetricGroup.java      | 17 ++++++++++----
 .../apache/flink/runtime/metrics/MetricNames.java  |  3 +--
 .../groups/InternalOperatorIOMetricGroup.java      |  6 +++--
 .../groups/InternalSinkWriterMetricGroup.java      | 16 ++++++++-----
 .../api/operators/AbstractStreamOperator.java      | 11 +++++----
 .../runtime/operators/sink/SinkWriterOperator.java | 18 +++++++++++++++
 .../test/streaming/runtime/SinkMetricsITCase.java  | 26 ++++++++++++++--------
 15 files changed, 101 insertions(+), 78 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 24a17eee292..a60990676bb 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -95,11 +95,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
                     RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
                     getSdkClientMisconfiguredExceptionClassifier());
 
-    // deprecated, use numRecordsSendErrorsCounter instead.
-    @Deprecated private final Counter numRecordsOutErrorsCounter;
-
-    /* A counter for the total number of records that have encountered an error during put */
-    private final Counter numRecordsSendErrorsCounter;
+    private final Counter numRecordsOutErrorsCounter;
 
     /* Name of the delivery stream in Kinesis Data Firehose */
     private final String deliveryStreamName;
@@ -170,7 +166,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
         this.deliveryStreamName = deliveryStreamName;
         this.metrics = context.metricGroup();
         this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
-        this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
         this.httpClient = createHttpClient(firehoseClientProperties);
         this.firehoseClient = createFirehoseClient(firehoseClientProperties, httpClient);
     }
@@ -218,7 +213,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
                 requestEntries.get(0).toString(),
                 err);
         numRecordsOutErrorsCounter.inc(requestEntries.size());
-        numRecordsSendErrorsCounter.inc(requestEntries.size());
 
         if (isRetryable(err)) {
             requestResult.accept(requestEntries);
@@ -234,7 +228,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
                 requestEntries.size(),
                 requestEntries.get(0).toString());
         numRecordsOutErrorsCounter.inc(response.failedPutCount());
-        numRecordsSendErrorsCounter.inc(response.failedPutCount());
 
         if (failOnError) {
             getFatalExceptionCons()
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index a3be9136c3b..fe264e43d69 100644
--- a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -76,11 +76,7 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
                     RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
                     getSdkClientMisconfiguredExceptionClassifier());
 
-    // deprecated, use numRecordsSendErrorsCounter instead.
-    @Deprecated private final Counter numRecordsOutErrorsCounter;
-
-    /* A counter for the total number of records that have encountered an error during put */
-    private final Counter numRecordsSendErrorsCounter;
+    private final Counter numRecordsOutErrorsCounter;
 
     /* Name of the stream in Kinesis Data Streams */
     private final String streamName;
@@ -151,7 +147,6 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
         this.streamName = streamName;
         this.metrics = context.metricGroup();
         this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
-        this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
         this.httpClient = AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
         this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
     }
@@ -204,7 +199,6 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
                 requestEntries.size(),
                 err);
         numRecordsOutErrorsCounter.inc(requestEntries.size());
-        numRecordsSendErrorsCounter.inc(requestEntries.size());
 
         if (isRetryable(err)) {
             requestResult.accept(requestEntries);
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index f7a649b1636..812b36d4d7d 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -69,10 +69,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
     private final SinkWriterMetricGroup metrics;
 
     /* Counter for number of bytes this sink has attempted to send to the destination. */
-    private final Counter numBytesSendCounter;
+    private final Counter numBytesOutCounter;
 
     /* Counter for number of records this sink has attempted to send to the destination. */
-    private final Counter numRecordsSendCounter;
+    private final Counter numRecordsOutCounter;
 
     /**
      * Rate limiting strategy {@code inflightMessages} at any given time, {@code
@@ -295,8 +295,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
 
         this.metrics = context.metricGroup();
         this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
-        this.numBytesSendCounter = this.metrics.getNumBytesSendCounter();
-        this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter();
+        this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
+        this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
 
         this.fatalExceptionCons =
                 exception ->
@@ -417,8 +417,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
             batchSizeBytes += requestEntrySize;
         }
 
-        numRecordsSendCounter.inc(batch.size());
-        numBytesSendCounter.inc(batchSizeBytes);
+        numRecordsOutCounter.inc(batch.size());
+        numBytesOutCounter.inc(batchSizeBytes);
 
         return batch;
     }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index a7e4979efdf..b1461903fc8 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -146,10 +146,10 @@ public class TestSinkInitContext implements Sink.InitContext {
     }
 
     public Counter getNumRecordsOutCounter() {
-        return metricGroup.getNumRecordsSendCounter();
+        return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
     }
 
     public Counter getNumBytesOutCounter() {
-        return metricGroup.getNumBytesSendCounter();
+        return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
     }
 }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index 51cc6d8c10f..1d0de3b28e5 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -92,7 +92,7 @@ public class FileWriter<IN>
 
     private final OutputFileConfig outputFileConfig;
 
-    private final Counter numRecordsSendCounter;
+    private final Counter numRecordsOutCounter;
 
     private boolean endOfInput;
 
@@ -128,7 +128,8 @@ public class FileWriter<IN>
         this.activeBuckets = new HashMap<>();
         this.bucketerContext = new BucketerContext();
 
-        this.numRecordsSendCounter = checkNotNull(metricGroup).getNumRecordsSendCounter();
+        this.numRecordsOutCounter =
+                checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
         this.processingTimeService = checkNotNull(processingTimeService);
         checkArgument(
                 bucketCheckInterval > 0,
@@ -195,7 +196,7 @@ public class FileWriter<IN>
         final String bucketId = bucketAssigner.getBucketId(element, bucketerContext);
         final FileWriterBucket<IN> bucket = getOrCreateBucketForBucketId(bucketId);
         bucket.write(element, processingTimeService.getCurrentProcessingTime());
-        numRecordsSendCounter.inc();
+        numRecordsOutCounter.inc();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index c0a5ac66776..6a63a7957a6 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -298,7 +298,7 @@ public class FileWriterTest {
                 InternalSinkWriterMetricGroup.mock(
                         metricListener.getMetricGroup(), operatorIOMetricGroup);
 
-        Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter();
+        Counter recordsCounter = sinkWriterMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
         SinkWriter.Context context = new ContextImpl();
         FileWriter<String> fileWriter =
                 createWriter(
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index cd46d67f786..53e48d9cd6e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -93,11 +93,9 @@ class KafkaWriter<IN>
     private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
     private final SinkWriterMetricGroup metricGroup;
     private final boolean disabledMetrics;
-    private final Counter numRecordsSendCounter;
-    private final Counter numBytesSendCounter;
-    // deprecated, use numRecordsSendErrorsCounter instead.
-    @Deprecated private final Counter numRecordsOutErrorsCounter;
-    private final Counter numRecordsSendErrorsCounter;
+    private final Counter numRecordsOutCounter;
+    private final Counter numBytesOutCounter;
+    private final Counter numRecordsOutErrorsCounter;
     private final ProcessingTimeService timeService;
 
     // Number of outgoing bytes at the latest metric sync
@@ -155,10 +153,9 @@ class KafkaWriter<IN>
         checkNotNull(sinkInitContext, "sinkInitContext");
         this.timeService = sinkInitContext.getProcessingTimeService();
         this.metricGroup = sinkInitContext.metricGroup();
-        this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
-        this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
+        this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+        this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
         this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
-        this.numRecordsSendErrorsCounter = metricGroup.getNumRecordsSendErrorsCounter();
         this.kafkaSinkContext =
                 new DefaultKafkaSinkContext(
                         sinkInitContext.getSubtaskId(),
@@ -198,7 +195,7 @@ class KafkaWriter<IN>
         final ProducerRecord<byte[], byte[]> record =
                 recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
         currentProducer.send(record, deliveryCallback);
-        numRecordsSendCounter.inc();
+        numRecordsOutCounter.inc();
     }
 
     @Override
@@ -391,7 +388,7 @@ class KafkaWriter<IN>
                     long outgoingBytesUntilNow = ((Number) byteOutMetric.metricValue()).longValue();
                     long outgoingBytesSinceLastUpdate =
                             outgoingBytesUntilNow - latestOutgoingByteTotal;
-                    numBytesSendCounter.inc(outgoingBytesSinceLastUpdate);
+                    numBytesOutCounter.inc(outgoingBytesSinceLastUpdate);
                     latestOutgoingByteTotal = outgoingBytesUntilNow;
                     lastSync = time;
                     registerMetricSync();
@@ -417,7 +414,6 @@ class KafkaWriter<IN>
                 mailboxExecutor.execute(
                         () -> {
                             numRecordsOutErrorsCounter.inc();
-                            numRecordsSendErrorsCounter.inc();
                             throwException(metadata, exception, producer);
                         },
                         "Failed to send data to Kafka");
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 8c7e50aa704..bea559c107e 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -140,21 +140,21 @@ public class KafkaWriterITCase {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
                         getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
-            final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
-            final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter();
-            final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter();
+            final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+            final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+            final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
             final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter();
-            assertThat(numBytesSend.getCount()).isEqualTo(0L);
-            assertThat(numRecordsSend.getCount()).isEqualTo(0);
-            assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
+            assertThat(numBytesOut.getCount()).isEqualTo(0L);
+            assertThat(numRecordsOut.getCount()).isEqualTo(0);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
             assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
             writer.write(1, SINK_WRITER_CONTEXT);
             timeService.trigger();
-            assertThat(numRecordsSend.getCount()).isEqualTo(1);
-            assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
+            assertThat(numRecordsOut.getCount()).isEqualTo(1);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
             assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
-            assertThat(numBytesSend.getCount()).isGreaterThan(0L);
+            assertThat(numBytesOut.getCount()).isGreaterThan(0L);
         }
     }
 
@@ -198,13 +198,10 @@ public class KafkaWriterITCase {
                 createWriterWithConfiguration(
                         properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
             final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
-            final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter();
             assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
-            assertThat(numRecordsSendErrors.getCount()).isEqualTo(0L);
 
             writer.write(1, SINK_WRITER_CONTEXT);
             assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
-            assertThat(numRecordsSendErrors.getCount()).isEqualTo(0L);
 
             final String transactionalId = writer.getCurrentProducer().getTransactionalId();
 
@@ -221,7 +218,6 @@ public class KafkaWriterITCase {
             writer.flush(false);
             writer.prepareCommit();
             assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
-            assertThat(numRecordsSendErrors.getCount()).isEqualTo(1L);
         }
     }
 
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
index 1da6c4b8a29..f3f81201436 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
@@ -29,11 +29,14 @@ import org.apache.flink.metrics.Gauge;
 @PublicEvolving
 public interface SinkWriterMetricGroup extends OperatorMetricGroup {
 
-    /** @deprecated use {@link #getNumRecordsSendErrorsCounter()} instead. */
-    @Deprecated
+    /** The total number of records failed to send. */
     Counter getNumRecordsOutErrorsCounter();
 
-    /** The total number of records failed to send. */
+    /**
+     * The total number of records failed to send.
+     *
+     * <p>This metric has the same value as {@code numRecordsOutError}.
+     */
     Counter getNumRecordsSendErrorsCounter();
 
     /**
@@ -44,10 +47,16 @@ public interface SinkWriterMetricGroup extends OperatorMetricGroup {
      * may have issue to perform the persistence action within its scope. Therefore, this count may
      * include the number of records that are failed to write by the downstream system, which should
      * be counted by {@link #getNumRecordsSendErrorsCounter()}.
+     *
+     * <p>This metric has the same value as {@code numRecordsOut} of the operator.
      */
     Counter getNumRecordsSendCounter();
 
-    /** The total number of output send bytes since the task started. */
+    /**
+     * The total number of output send bytes since the task started.
+     *
+     * <p>This metric has the same value as {@code numBytesOut} of the operator
+     */
     Counter getNumBytesSendCounter();
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index ee922d11649..1ac400510c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -84,8 +84,7 @@ public class MetricNames {
     public static final String DEBLOATED_BUFFER_SIZE = "debloatedBufferSize";
 
     // FLIP-33 sink
-    // deprecated use NUM_RECORDS_SEND_ERRORS instead.
-    @Deprecated public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
+    public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
     public static final String NUM_RECORDS_SEND_ERRORS = "numRecordsSendErrors";
     public static final String CURRENT_SEND_TIME = "currentSendTime";
     public static final String NUM_RECORDS_SEND = "numRecordsSend";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
index f03ab5206d1..31cf560ce78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
@@ -52,8 +52,10 @@ public class InternalOperatorIOMetricGroup extends ProxyMetricGroup<InternalOper
         numRecordsOutRate =
                 parentMetricGroup.meter(
                         MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut));
-        numBytesIn = parentMetricGroup.getTaskIOMetricGroup().getNumBytesInCounter();
-        numBytesOut = parentMetricGroup.getTaskIOMetricGroup().getNumBytesOutCounter();
+        numBytesIn = parentMetricGroup.counter(MetricNames.IO_NUM_BYTES_IN);
+        numBytesOut = parentMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT);
+        parentMetricGroup.meter(MetricNames.IO_NUM_BYTES_IN_RATE, new MeterView(numBytesIn));
+        parentMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut));
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
index 9243e32cc4a..81aa8d78ce3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
@@ -34,8 +34,7 @@ import org.apache.flink.runtime.metrics.MetricNames;
 public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
         implements SinkWriterMetricGroup {
 
-    // deprecated, use numRecordsSendErrors instead.
-    @Deprecated private final Counter numRecordsOutErrors;
+    private final Counter numRecordsOutErrors;
     private final Counter numRecordsSendErrors;
     private final Counter numRecordsWritten;
     private final Counter numBytesWritten;
@@ -45,9 +44,15 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
             MetricGroup parentMetricGroup, OperatorIOMetricGroup operatorIOMetricGroup) {
         super(parentMetricGroup);
         numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
-        numRecordsSendErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS);
-        numRecordsWritten = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND);
-        numBytesWritten = parentMetricGroup.counter(MetricNames.NUM_BYTES_SEND);
+        numRecordsSendErrors =
+                parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors);
+        numRecordsWritten =
+                parentMetricGroup.counter(
+                        MetricNames.NUM_RECORDS_SEND,
+                        operatorIOMetricGroup.getNumRecordsOutCounter());
+        numBytesWritten =
+                parentMetricGroup.counter(
+                        MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter());
         this.operatorIOMetricGroup = operatorIOMetricGroup;
     }
 
@@ -73,7 +78,6 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
         return operatorIOMetricGroup;
     }
 
-    @Deprecated
     @Override
     public Counter getNumRecordsOutErrorsCounter() {
         return numRecordsOutErrors;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 3b103c22a22..a1afe279833 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -164,10 +164,7 @@ public abstract class AbstractStreamOperator<OUT>
                     environment
                             .getMetricGroup()
                             .getOrAddOperator(config.getOperatorID(), config.getOperatorName());
-            this.output =
-                    new CountingOutput<>(
-                            output,
-                            operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+            this.output = registerCounterOnOutput(output, operatorMetricGroup);
             if (config.isChainEnd()) {
                 operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
             }
@@ -649,4 +646,10 @@ public abstract class AbstractStreamOperator<OUT>
     protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
         return Optional.ofNullable(timeServiceManager);
     }
+
+    protected Output<StreamRecord<OUT>> registerCounterOnOutput(
+            Output<StreamRecord<OUT>> output, OperatorMetricGroup operatorMetricGroup) {
+        return new CountingOutput<>(
+                output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index e593616abf7..9029584f986 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -41,6 +42,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -216,6 +218,22 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
         closeAll(sinkWriter, super::close);
     }
 
+    /**
+     * Skip registering numRecordsOut counter on output.
+     *
+     * <p>Metric "numRecordsOut" is defined as the total number of records written to the external
+     * system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the number of
+     * records sent to downstream operators, which is number of Committable batches sent to
+     * SinkCommitter. So we skip registering this metric on output and leave this metric to sink
+     * writer implementations to report.
+     */
+    @Override
+    protected Output<StreamRecord<CommittableMessage<CommT>>> registerCounterOnOutput(
+            Output<StreamRecord<CommittableMessage<CommT>>> output,
+            OperatorMetricGroup operatorMetricGroup) {
+        return output;
+    }
+
     private void emit(
             int indexOfThisSubtask,
             int numberOfParallelSubtasks,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index 90b696dd8eb..d2232010546 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -133,18 +132,17 @@ public class SinkMetricsITCase extends TestLogger {
         for (OperatorMetricGroup group : groups) {
             Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
             // There are only 2 splits assigned; so two groups will not update metrics.
-            // There is no other way to access the counter via OperatorMetricGroup, we have to use
-            // metrics from the reporter.
-            if (((Counter) metrics.get(MetricNames.NUM_RECORDS_SEND)).getCount() == 0) {
+            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
                 continue;
             }
             subtaskWithMetrics++;
+
             // SinkWriterMetricGroup metrics
             assertThat(
-                    metrics.get(MetricNames.NUM_RECORDS_SEND),
+                    metrics.get(MetricNames.IO_NUM_RECORDS_OUT),
                     isCounter(equalTo(processedRecordsPerSubtask)));
             assertThat(
-                    metrics.get(MetricNames.NUM_BYTES_SEND),
+                    metrics.get(MetricNames.IO_NUM_BYTES_OUT),
                     isCounter(
                             equalTo(
                                     processedRecordsPerSubtask
@@ -153,6 +151,17 @@ public class SinkMetricsITCase extends TestLogger {
             assertThat(
                     metrics.get(MetricNames.NUM_RECORDS_OUT_ERRORS),
                     isCounter(equalTo((processedRecordsPerSubtask + 1) / 2)));
+
+            // Test "send" metric series has the same value as "out" metric series.
+            assertThat(
+                    metrics.get(MetricNames.NUM_RECORDS_SEND),
+                    isCounter(equalTo(processedRecordsPerSubtask)));
+            assertThat(
+                    metrics.get(MetricNames.NUM_BYTES_SEND),
+                    isCounter(
+                            equalTo(
+                                    processedRecordsPerSubtask
+                                            * MetricWriter.RECORD_SIZE_IN_BYTES)));
             assertThat(
                     metrics.get(MetricNames.NUM_RECORDS_SEND_ERRORS),
                     isCounter(equalTo((processedRecordsPerSubtask + 1) / 2)));
@@ -183,12 +192,11 @@ public class SinkMetricsITCase extends TestLogger {
         public void write(Long element, Context context) {
             super.write(element, context);
             sendTime = element * BASE_SEND_TIME;
-            metricGroup.getNumRecordsSendCounter().inc();
+            metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
             if (element % 2 == 0) {
                 metricGroup.getNumRecordsOutErrorsCounter().inc();
-                metricGroup.getNumRecordsSendErrorsCounter().inc();
             }
-            metricGroup.getNumBytesSendCounter().inc(RECORD_SIZE_IN_BYTES);
+            metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
         }
     }
 }