You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/21 13:36:28 UTC
[flink] 02/04: [FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWriterMetricGroup directly.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4aec117eb4693f978f2d95580d72a12d14638e85
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 16:16:30 2022 +0100
[FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWriterMetricGroup directly.
(cherry picked from commit 4e0c24a82e9b0fd35ca23610ba396a932b0f41b8)
---
.../flink/connector/base/sink/writer/AsyncSinkWriter.java | 12 ++++++------
.../connector/base/sink/writer/TestSinkInitContext.java | 4 ++--
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index f45a3a5..090504a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -69,10 +69,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
private final SinkWriterMetricGroup metrics;
/* Counter for number of bytes this sink has attempted to send to the destination. */
- private final Counter numBytesOutCounter;
+ private final Counter numBytesSendCounter;
/* Counter for number of records this sink has attempted to send to the destination. */
- private final Counter numRecordsOutCounter;
+ private final Counter numRecordsSendCounter;
/**
* Rate limiting strategy {@code inflightMessages} at any given time, {@code
@@ -295,8 +295,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
this.metrics = context.metricGroup();
this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
- this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
- this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
+ this.numBytesSendCounter = this.metrics.getNumBytesSendCounter();
+ this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter();
this.fatalExceptionCons =
exception ->
@@ -417,8 +417,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
batchSizeBytes += requestEntrySize;
}
- numRecordsOutCounter.inc(batch.size());
- numBytesOutCounter.inc(batchSizeBytes);
+ numRecordsSendCounter.inc(batch.size());
+ numBytesSendCounter.inc(batchSizeBytes);
return batch;
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index b146190..a7e4979 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -146,10 +146,10 @@ public class TestSinkInitContext implements Sink.InitContext {
}
public Counter getNumRecordsOutCounter() {
- return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+ return metricGroup.getNumRecordsSendCounter();
}
public Counter getNumBytesOutCounter() {
- return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+ return metricGroup.getNumBytesSendCounter();
}
}