You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/21 13:36:27 UTC

[flink] 01/04: [FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup directly.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c336b848f98bd92df4960126c4335d45289cff21
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 16:00:02 2022 +0100

    [FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup directly.
    
    (cherry picked from commit 3ca38240496d1d0f1289f5aecf1226f537e30233)
---
 .../flink/connector/file/sink/writer/FileWriter.java |  7 +++----
 .../connector/file/sink/writer/FileWriterTest.java   | 20 +++++++++++---------
 2 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index fad3b50..51cc6d8 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -92,7 +92,7 @@ public class FileWriter<IN>
 
     private final OutputFileConfig outputFileConfig;
 
-    private final Counter recordsOutCounter;
+    private final Counter numRecordsSendCounter;
 
     private boolean endOfInput;
 
@@ -128,8 +128,7 @@ public class FileWriter<IN>
         this.activeBuckets = new HashMap<>();
         this.bucketerContext = new BucketerContext();
 
-        this.recordsOutCounter =
-                checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
+        this.numRecordsSendCounter = checkNotNull(metricGroup).getNumRecordsSendCounter();
         this.processingTimeService = checkNotNull(processingTimeService);
         checkArgument(
                 bucketCheckInterval > 0,
@@ -196,7 +195,7 @@ public class FileWriter<IN>
         final String bucketId = bucketAssigner.getBucketId(element, bucketerContext);
         final FileWriterBucket<IN> bucket = getOrCreateBucketForBucketId(bucketId);
         bucket.write(element, processingTimeService.getCurrentProcessingTime());
-        recordsOutCounter.inc();
+        numRecordsSendCounter.inc();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index e521f61..8966ca7 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -283,16 +283,19 @@ public class FileWriterTest {
     public void testNumberRecordsOutCounter() throws IOException, InterruptedException {
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+        final SinkWriterMetricGroup sinkWriterMetricGroup =
+                InternalSinkWriterMetricGroup.mock(
+                        metricListener.getMetricGroup(), operatorIOMetricGroup);
         File outDir = TEMP_FOLDER.newFolder();
         Path path = new Path(outDir.toURI());
-        Counter recordsCounter = operatorIOMetricGroup.getNumRecordsOutCounter();
+        Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter();
         SinkWriter.Context context = new ContextImpl();
         FileWriter<String> fileWriter =
                 createWriter(
                         path,
                         DefaultRollingPolicy.builder().build(),
                         new OutputFileConfig("part-", ""),
-                        operatorIOMetricGroup);
+                        sinkWriterMetricGroup);
 
         assertEquals(0, recordsCounter.getCount());
         fileWriter.write("1", context);
@@ -432,13 +435,8 @@ public class FileWriterTest {
             Path basePath,
             RollingPolicy<String, String> rollingPolicy,
             OutputFileConfig outputFileConfig,
-            OperatorIOMetricGroup operatorIOMetricGroup)
+            SinkWriterMetricGroup sinkWriterMetricGroup)
             throws IOException {
-        final SinkWriterMetricGroup sinkWriterMetricGroup =
-                operatorIOMetricGroup == null
-                        ? InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())
-                        : InternalSinkWriterMetricGroup.mock(
-                                metricListener.getMetricGroup(), operatorIOMetricGroup);
         return new FileWriter<>(
                 basePath,
                 sinkWriterMetricGroup,
@@ -458,7 +456,11 @@ public class FileWriterTest {
             RollingPolicy<String, String> rollingPolicy,
             OutputFileConfig outputFileConfig)
             throws IOException {
-        return createWriter(basePath, rollingPolicy, outputFileConfig, null);
+        return createWriter(
+                basePath,
+                rollingPolicy,
+                outputFileConfig,
+                InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
     }
 
     private FileWriter<String> createWriter(