You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/21 13:36:27 UTC
[flink] 01/04: [FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup directly.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c336b848f98bd92df4960126c4335d45289cff21
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 16:00:02 2022 +0100
[FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup directly.
(cherry picked from commit 3ca38240496d1d0f1289f5aecf1226f537e30233)
---
.../flink/connector/file/sink/writer/FileWriter.java | 7 +++----
.../connector/file/sink/writer/FileWriterTest.java | 20 +++++++++++---------
2 files changed, 14 insertions(+), 13 deletions(-)
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index fad3b50..51cc6d8 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -92,7 +92,7 @@ public class FileWriter<IN>
private final OutputFileConfig outputFileConfig;
- private final Counter recordsOutCounter;
+ private final Counter numRecordsSendCounter;
private boolean endOfInput;
@@ -128,8 +128,7 @@ public class FileWriter<IN>
this.activeBuckets = new HashMap<>();
this.bucketerContext = new BucketerContext();
- this.recordsOutCounter =
- checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
+ this.numRecordsSendCounter = checkNotNull(metricGroup).getNumRecordsSendCounter();
this.processingTimeService = checkNotNull(processingTimeService);
checkArgument(
bucketCheckInterval > 0,
@@ -196,7 +195,7 @@ public class FileWriter<IN>
final String bucketId = bucketAssigner.getBucketId(element, bucketerContext);
final FileWriterBucket<IN> bucket = getOrCreateBucketForBucketId(bucketId);
bucket.write(element, processingTimeService.getCurrentProcessingTime());
- recordsOutCounter.inc();
+ numRecordsSendCounter.inc();
}
@Override
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index e521f61..8966ca7 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -283,16 +283,19 @@ public class FileWriterTest {
public void testNumberRecordsOutCounter() throws IOException, InterruptedException {
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+ final SinkWriterMetricGroup sinkWriterMetricGroup =
+ InternalSinkWriterMetricGroup.mock(
+ metricListener.getMetricGroup(), operatorIOMetricGroup);
File outDir = TEMP_FOLDER.newFolder();
Path path = new Path(outDir.toURI());
- Counter recordsCounter = operatorIOMetricGroup.getNumRecordsOutCounter();
+ Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter();
SinkWriter.Context context = new ContextImpl();
FileWriter<String> fileWriter =
createWriter(
path,
DefaultRollingPolicy.builder().build(),
new OutputFileConfig("part-", ""),
- operatorIOMetricGroup);
+ sinkWriterMetricGroup);
assertEquals(0, recordsCounter.getCount());
fileWriter.write("1", context);
@@ -432,13 +435,8 @@ public class FileWriterTest {
Path basePath,
RollingPolicy<String, String> rollingPolicy,
OutputFileConfig outputFileConfig,
- OperatorIOMetricGroup operatorIOMetricGroup)
+ SinkWriterMetricGroup sinkWriterMetricGroup)
throws IOException {
- final SinkWriterMetricGroup sinkWriterMetricGroup =
- operatorIOMetricGroup == null
- ? InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())
- : InternalSinkWriterMetricGroup.mock(
- metricListener.getMetricGroup(), operatorIOMetricGroup);
return new FileWriter<>(
basePath,
sinkWriterMetricGroup,
@@ -458,7 +456,11 @@ public class FileWriterTest {
RollingPolicy<String, String> rollingPolicy,
OutputFileConfig outputFileConfig)
throws IOException {
- return createWriter(basePath, rollingPolicy, outputFileConfig, null);
+ return createWriter(
+ basePath,
+ rollingPolicy,
+ outputFileConfig,
+ InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
}
private FileWriter<String> createWriter(