You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/21 13:36:28 UTC

[flink] 02/04: [FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWriterMetricGroup directly.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4aec117eb4693f978f2d95580d72a12d14638e85
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 16:16:30 2022 +0100

    [FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWriterMetricGroup directly.
    
    (cherry picked from commit 4e0c24a82e9b0fd35ca23610ba396a932b0f41b8)
---
 .../flink/connector/base/sink/writer/AsyncSinkWriter.java    | 12 ++++++------
 .../connector/base/sink/writer/TestSinkInitContext.java      |  4 ++--
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index f45a3a5..090504a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -69,10 +69,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
     private final SinkWriterMetricGroup metrics;
 
     /* Counter for number of bytes this sink has attempted to send to the destination. */
-    private final Counter numBytesOutCounter;
+    private final Counter numBytesSendCounter;
 
     /* Counter for number of records this sink has attempted to send to the destination. */
-    private final Counter numRecordsOutCounter;
+    private final Counter numRecordsSendCounter;
 
     /**
      * Rate limiting strategy {@code inflightMessages} at any given time, {@code
@@ -295,8 +295,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
 
         this.metrics = context.metricGroup();
         this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
-        this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
-        this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
+        this.numBytesSendCounter = this.metrics.getNumBytesSendCounter();
+        this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter();
 
         this.fatalExceptionCons =
                 exception ->
@@ -417,8 +417,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
             batchSizeBytes += requestEntrySize;
         }
 
-        numRecordsOutCounter.inc(batch.size());
-        numBytesOutCounter.inc(batchSizeBytes);
+        numRecordsSendCounter.inc(batch.size());
+        numBytesSendCounter.inc(batchSizeBytes);
 
         return batch;
     }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index b146190..a7e4979 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -146,10 +146,10 @@ public class TestSinkInitContext implements Sink.InitContext {
     }
 
     public Counter getNumRecordsOutCounter() {
-        return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        return metricGroup.getNumRecordsSendCounter();
     }
 
     public Counter getNumBytesOutCounter() {
-        return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+        return metricGroup.getNumBytesSendCounter();
     }
 }