You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/11/03 02:24:12 UTC
[flink] branch release-1.15 updated: [FLINK-29567][connector/common] Change numRecordsSend / numBytesSend / numRecordsSendError back to numRecordsOut / numBytesOut / numRecordsOutError in sink
This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 221e9f7c5e5 [FLINK-29567][connector/common] Change numRecordsSend / numBytesSend / numRecordsSendError back to numRecordsOut / numBytesOut / numRecordsOutError in sink
221e9f7c5e5 is described below
commit 221e9f7c5e59265b7448eba70aea393b60a1fe42
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Fri Oct 14 16:32:52 2022 +0800
[FLINK-29567][connector/common] Change numRecordsSend / numBytesSend / numRecordsSendError back to numRecordsOut / numBytesOut / numRecordsOutError in sink
---
.../firehose/sink/KinesisFirehoseSinkWriter.java | 9 +-------
.../kinesis/sink/KinesisStreamsSinkWriter.java | 8 +------
.../base/sink/writer/AsyncSinkWriter.java | 12 +++++-----
.../base/sink/writer/TestSinkInitContext.java | 4 ++--
.../connector/file/sink/writer/FileWriter.java | 7 +++---
.../connector/file/sink/writer/FileWriterTest.java | 2 +-
.../flink/connector/kafka/sink/KafkaWriter.java | 18 ++++++---------
.../connector/kafka/sink/KafkaWriterITCase.java | 22 ++++++++----------
.../metrics/groups/SinkWriterMetricGroup.java | 17 ++++++++++----
.../apache/flink/runtime/metrics/MetricNames.java | 3 +--
.../groups/InternalOperatorIOMetricGroup.java | 6 +++--
.../groups/InternalSinkWriterMetricGroup.java | 16 ++++++++-----
.../api/operators/AbstractStreamOperator.java | 11 +++++----
.../runtime/operators/sink/SinkWriterOperator.java | 18 +++++++++++++++
.../test/streaming/runtime/SinkMetricsITCase.java | 26 ++++++++++++++--------
15 files changed, 101 insertions(+), 78 deletions(-)
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 24a17eee292..a60990676bb 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -95,11 +95,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
getSdkClientMisconfiguredExceptionClassifier());
- // deprecated, use numRecordsSendErrorsCounter instead.
- @Deprecated private final Counter numRecordsOutErrorsCounter;
-
- /* A counter for the total number of records that have encountered an error during put */
- private final Counter numRecordsSendErrorsCounter;
+ private final Counter numRecordsOutErrorsCounter;
/* Name of the delivery stream in Kinesis Data Firehose */
private final String deliveryStreamName;
@@ -170,7 +166,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
this.deliveryStreamName = deliveryStreamName;
this.metrics = context.metricGroup();
this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
- this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
this.httpClient = createHttpClient(firehoseClientProperties);
this.firehoseClient = createFirehoseClient(firehoseClientProperties, httpClient);
}
@@ -218,7 +213,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
requestEntries.get(0).toString(),
err);
numRecordsOutErrorsCounter.inc(requestEntries.size());
- numRecordsSendErrorsCounter.inc(requestEntries.size());
if (isRetryable(err)) {
requestResult.accept(requestEntries);
@@ -234,7 +228,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
requestEntries.size(),
requestEntries.get(0).toString());
numRecordsOutErrorsCounter.inc(response.failedPutCount());
- numRecordsSendErrorsCounter.inc(response.failedPutCount());
if (failOnError) {
getFatalExceptionCons()
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index a3be9136c3b..fe264e43d69 100644
--- a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -76,11 +76,7 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
getSdkClientMisconfiguredExceptionClassifier());
- // deprecated, use numRecordsSendErrorsCounter instead.
- @Deprecated private final Counter numRecordsOutErrorsCounter;
-
- /* A counter for the total number of records that have encountered an error during put */
- private final Counter numRecordsSendErrorsCounter;
+ private final Counter numRecordsOutErrorsCounter;
/* Name of the stream in Kinesis Data Streams */
private final String streamName;
@@ -151,7 +147,6 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
this.streamName = streamName;
this.metrics = context.metricGroup();
this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
- this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
this.httpClient = AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
}
@@ -204,7 +199,6 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
requestEntries.size(),
err);
numRecordsOutErrorsCounter.inc(requestEntries.size());
- numRecordsSendErrorsCounter.inc(requestEntries.size());
if (isRetryable(err)) {
requestResult.accept(requestEntries);
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index f7a649b1636..812b36d4d7d 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -69,10 +69,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
private final SinkWriterMetricGroup metrics;
/* Counter for number of bytes this sink has attempted to send to the destination. */
- private final Counter numBytesSendCounter;
+ private final Counter numBytesOutCounter;
/* Counter for number of records this sink has attempted to send to the destination. */
- private final Counter numRecordsSendCounter;
+ private final Counter numRecordsOutCounter;
/**
* Rate limiting strategy {@code inflightMessages} at any given time, {@code
@@ -295,8 +295,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
this.metrics = context.metricGroup();
this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
- this.numBytesSendCounter = this.metrics.getNumBytesSendCounter();
- this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter();
+ this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
+ this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
this.fatalExceptionCons =
exception ->
@@ -417,8 +417,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
batchSizeBytes += requestEntrySize;
}
- numRecordsSendCounter.inc(batch.size());
- numBytesSendCounter.inc(batchSizeBytes);
+ numRecordsOutCounter.inc(batch.size());
+ numBytesOutCounter.inc(batchSizeBytes);
return batch;
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index a7e4979efdf..b1461903fc8 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -146,10 +146,10 @@ public class TestSinkInitContext implements Sink.InitContext {
}
public Counter getNumRecordsOutCounter() {
- return metricGroup.getNumRecordsSendCounter();
+ return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
}
public Counter getNumBytesOutCounter() {
- return metricGroup.getNumBytesSendCounter();
+ return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
}
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index 51cc6d8c10f..1d0de3b28e5 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -92,7 +92,7 @@ public class FileWriter<IN>
private final OutputFileConfig outputFileConfig;
- private final Counter numRecordsSendCounter;
+ private final Counter numRecordsOutCounter;
private boolean endOfInput;
@@ -128,7 +128,8 @@ public class FileWriter<IN>
this.activeBuckets = new HashMap<>();
this.bucketerContext = new BucketerContext();
- this.numRecordsSendCounter = checkNotNull(metricGroup).getNumRecordsSendCounter();
+ this.numRecordsOutCounter =
+ checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
this.processingTimeService = checkNotNull(processingTimeService);
checkArgument(
bucketCheckInterval > 0,
@@ -195,7 +196,7 @@ public class FileWriter<IN>
final String bucketId = bucketAssigner.getBucketId(element, bucketerContext);
final FileWriterBucket<IN> bucket = getOrCreateBucketForBucketId(bucketId);
bucket.write(element, processingTimeService.getCurrentProcessingTime());
- numRecordsSendCounter.inc();
+ numRecordsOutCounter.inc();
}
@Override
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index c0a5ac66776..6a63a7957a6 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -298,7 +298,7 @@ public class FileWriterTest {
InternalSinkWriterMetricGroup.mock(
metricListener.getMetricGroup(), operatorIOMetricGroup);
- Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter();
+ Counter recordsCounter = sinkWriterMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
SinkWriter.Context context = new ContextImpl();
FileWriter<String> fileWriter =
createWriter(
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index cd46d67f786..53e48d9cd6e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -93,11 +93,9 @@ class KafkaWriter<IN>
private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
private final SinkWriterMetricGroup metricGroup;
private final boolean disabledMetrics;
- private final Counter numRecordsSendCounter;
- private final Counter numBytesSendCounter;
- // deprecated, use numRecordsSendErrorsCounter instead.
- @Deprecated private final Counter numRecordsOutErrorsCounter;
- private final Counter numRecordsSendErrorsCounter;
+ private final Counter numRecordsOutCounter;
+ private final Counter numBytesOutCounter;
+ private final Counter numRecordsOutErrorsCounter;
private final ProcessingTimeService timeService;
// Number of outgoing bytes at the latest metric sync
@@ -155,10 +153,9 @@ class KafkaWriter<IN>
checkNotNull(sinkInitContext, "sinkInitContext");
this.timeService = sinkInitContext.getProcessingTimeService();
this.metricGroup = sinkInitContext.metricGroup();
- this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
- this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
+ this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+ this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
- this.numRecordsSendErrorsCounter = metricGroup.getNumRecordsSendErrorsCounter();
this.kafkaSinkContext =
new DefaultKafkaSinkContext(
sinkInitContext.getSubtaskId(),
@@ -198,7 +195,7 @@ class KafkaWriter<IN>
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
currentProducer.send(record, deliveryCallback);
- numRecordsSendCounter.inc();
+ numRecordsOutCounter.inc();
}
@Override
@@ -391,7 +388,7 @@ class KafkaWriter<IN>
long outgoingBytesUntilNow = ((Number) byteOutMetric.metricValue()).longValue();
long outgoingBytesSinceLastUpdate =
outgoingBytesUntilNow - latestOutgoingByteTotal;
- numBytesSendCounter.inc(outgoingBytesSinceLastUpdate);
+ numBytesOutCounter.inc(outgoingBytesSinceLastUpdate);
latestOutgoingByteTotal = outgoingBytesUntilNow;
lastSync = time;
registerMetricSync();
@@ -417,7 +414,6 @@ class KafkaWriter<IN>
mailboxExecutor.execute(
() -> {
numRecordsOutErrorsCounter.inc();
- numRecordsSendErrorsCounter.inc();
throwException(metadata, exception, producer);
},
"Failed to send data to Kafka");
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 8c7e50aa704..bea559c107e 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -140,21 +140,21 @@ public class KafkaWriterITCase {
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
- final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
- final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter();
- final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter();
+ final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+ final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+ final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter();
- assertThat(numBytesSend.getCount()).isEqualTo(0L);
- assertThat(numRecordsSend.getCount()).isEqualTo(0);
- assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
+ assertThat(numBytesOut.getCount()).isEqualTo(0L);
+ assertThat(numRecordsOut.getCount()).isEqualTo(0);
+ assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
writer.write(1, SINK_WRITER_CONTEXT);
timeService.trigger();
- assertThat(numRecordsSend.getCount()).isEqualTo(1);
- assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
+ assertThat(numRecordsOut.getCount()).isEqualTo(1);
+ assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
- assertThat(numBytesSend.getCount()).isGreaterThan(0L);
+ assertThat(numBytesOut.getCount()).isGreaterThan(0L);
}
}
@@ -198,13 +198,10 @@ public class KafkaWriterITCase {
createWriterWithConfiguration(
properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
- final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter();
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
- assertThat(numRecordsSendErrors.getCount()).isEqualTo(0L);
writer.write(1, SINK_WRITER_CONTEXT);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
- assertThat(numRecordsSendErrors.getCount()).isEqualTo(0L);
final String transactionalId = writer.getCurrentProducer().getTransactionalId();
@@ -221,7 +218,6 @@ public class KafkaWriterITCase {
writer.flush(false);
writer.prepareCommit();
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
- assertThat(numRecordsSendErrors.getCount()).isEqualTo(1L);
}
}
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
index 1da6c4b8a29..f3f81201436 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
@@ -29,11 +29,14 @@ import org.apache.flink.metrics.Gauge;
@PublicEvolving
public interface SinkWriterMetricGroup extends OperatorMetricGroup {
- /** @deprecated use {@link #getNumRecordsSendErrorsCounter()} instead. */
- @Deprecated
+ /** The total number of records failed to send. */
Counter getNumRecordsOutErrorsCounter();
- /** The total number of records failed to send. */
+ /**
+ * The total number of records failed to send.
+ *
+ * <p>This metric has the same value as {@code numRecordsOutError}.
+ */
Counter getNumRecordsSendErrorsCounter();
/**
@@ -44,10 +47,16 @@ public interface SinkWriterMetricGroup extends OperatorMetricGroup {
* may have issue to perform the persistence action within its scope. Therefore, this count may
* include the number of records that are failed to write by the downstream system, which should
* be counted by {@link #getNumRecordsSendErrorsCounter()}.
+ *
+ * <p>This metric has the same value as {@code numRecordsOut} of the operator.
*/
Counter getNumRecordsSendCounter();
- /** The total number of output send bytes since the task started. */
+ /**
+ * The total number of output send bytes since the task started.
+ *
+ * <p>This metric has the same value as {@code numBytesOut} of the operator
+ */
Counter getNumBytesSendCounter();
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index ee922d11649..1ac400510c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -84,8 +84,7 @@ public class MetricNames {
public static final String DEBLOATED_BUFFER_SIZE = "debloatedBufferSize";
// FLIP-33 sink
- // deprecated use NUM_RECORDS_SEND_ERRORS instead.
- @Deprecated public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
+ public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
public static final String NUM_RECORDS_SEND_ERRORS = "numRecordsSendErrors";
public static final String CURRENT_SEND_TIME = "currentSendTime";
public static final String NUM_RECORDS_SEND = "numRecordsSend";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
index f03ab5206d1..31cf560ce78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
@@ -52,8 +52,10 @@ public class InternalOperatorIOMetricGroup extends ProxyMetricGroup<InternalOper
numRecordsOutRate =
parentMetricGroup.meter(
MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut));
- numBytesIn = parentMetricGroup.getTaskIOMetricGroup().getNumBytesInCounter();
- numBytesOut = parentMetricGroup.getTaskIOMetricGroup().getNumBytesOutCounter();
+ numBytesIn = parentMetricGroup.counter(MetricNames.IO_NUM_BYTES_IN);
+ numBytesOut = parentMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT);
+ parentMetricGroup.meter(MetricNames.IO_NUM_BYTES_IN_RATE, new MeterView(numBytesIn));
+ parentMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut));
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
index 9243e32cc4a..81aa8d78ce3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
@@ -34,8 +34,7 @@ import org.apache.flink.runtime.metrics.MetricNames;
public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
implements SinkWriterMetricGroup {
- // deprecated, use numRecordsSendErrors instead.
- @Deprecated private final Counter numRecordsOutErrors;
+ private final Counter numRecordsOutErrors;
private final Counter numRecordsSendErrors;
private final Counter numRecordsWritten;
private final Counter numBytesWritten;
@@ -45,9 +44,15 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
MetricGroup parentMetricGroup, OperatorIOMetricGroup operatorIOMetricGroup) {
super(parentMetricGroup);
numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
- numRecordsSendErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS);
- numRecordsWritten = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND);
- numBytesWritten = parentMetricGroup.counter(MetricNames.NUM_BYTES_SEND);
+ numRecordsSendErrors =
+ parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors);
+ numRecordsWritten =
+ parentMetricGroup.counter(
+ MetricNames.NUM_RECORDS_SEND,
+ operatorIOMetricGroup.getNumRecordsOutCounter());
+ numBytesWritten =
+ parentMetricGroup.counter(
+ MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter());
this.operatorIOMetricGroup = operatorIOMetricGroup;
}
@@ -73,7 +78,6 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
return operatorIOMetricGroup;
}
- @Deprecated
@Override
public Counter getNumRecordsOutErrorsCounter() {
return numRecordsOutErrors;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 3b103c22a22..a1afe279833 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -164,10 +164,7 @@ public abstract class AbstractStreamOperator<OUT>
environment
.getMetricGroup()
.getOrAddOperator(config.getOperatorID(), config.getOperatorName());
- this.output =
- new CountingOutput<>(
- output,
- operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+ this.output = registerCounterOnOutput(output, operatorMetricGroup);
if (config.isChainEnd()) {
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}
@@ -649,4 +646,10 @@ public abstract class AbstractStreamOperator<OUT>
protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
return Optional.ofNullable(timeServiceManager);
}
+
+ protected Output<StreamRecord<OUT>> registerCounterOnOutput(
+ Output<StreamRecord<OUT>> output, OperatorMetricGroup operatorMetricGroup) {
+ return new CountingOutput<>(
+ output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index e593616abf7..9029584f986 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -41,6 +42,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -216,6 +218,22 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
closeAll(sinkWriter, super::close);
}
+ /**
+ * Skip registering numRecordsOut counter on output.
+ *
+ * <p>Metric "numRecordsOut" is defined as the total number of records written to the external
+ * system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the number of
+ * records sent to downstream operators, which is number of Committable batches sent to
+ * SinkCommitter. So we skip registering this metric on output and leave this metric to sink
+ * writer implementations to report.
+ */
+ @Override
+ protected Output<StreamRecord<CommittableMessage<CommT>>> registerCounterOnOutput(
+ Output<StreamRecord<CommittableMessage<CommT>>> output,
+ OperatorMetricGroup operatorMetricGroup) {
+ return output;
+ }
+
private void emit(
int indexOfThisSubtask,
int numberOfParallelSubtasks,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index 90b696dd8eb..d2232010546 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -133,18 +132,17 @@ public class SinkMetricsITCase extends TestLogger {
for (OperatorMetricGroup group : groups) {
Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
// There are only 2 splits assigned; so two groups will not update metrics.
- // There is no other way to access the counter via OperatorMetricGroup, we have to use
- // metrics from the reporter.
- if (((Counter) metrics.get(MetricNames.NUM_RECORDS_SEND)).getCount() == 0) {
+ if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
continue;
}
subtaskWithMetrics++;
+
// SinkWriterMetricGroup metrics
assertThat(
- metrics.get(MetricNames.NUM_RECORDS_SEND),
+ metrics.get(MetricNames.IO_NUM_RECORDS_OUT),
isCounter(equalTo(processedRecordsPerSubtask)));
assertThat(
- metrics.get(MetricNames.NUM_BYTES_SEND),
+ metrics.get(MetricNames.IO_NUM_BYTES_OUT),
isCounter(
equalTo(
processedRecordsPerSubtask
@@ -153,6 +151,17 @@ public class SinkMetricsITCase extends TestLogger {
assertThat(
metrics.get(MetricNames.NUM_RECORDS_OUT_ERRORS),
isCounter(equalTo((processedRecordsPerSubtask + 1) / 2)));
+
+ // Test "send" metric series has the same value as "out" metric series.
+ assertThat(
+ metrics.get(MetricNames.NUM_RECORDS_SEND),
+ isCounter(equalTo(processedRecordsPerSubtask)));
+ assertThat(
+ metrics.get(MetricNames.NUM_BYTES_SEND),
+ isCounter(
+ equalTo(
+ processedRecordsPerSubtask
+ * MetricWriter.RECORD_SIZE_IN_BYTES)));
assertThat(
metrics.get(MetricNames.NUM_RECORDS_SEND_ERRORS),
isCounter(equalTo((processedRecordsPerSubtask + 1) / 2)));
@@ -183,12 +192,11 @@ public class SinkMetricsITCase extends TestLogger {
public void write(Long element, Context context) {
super.write(element, context);
sendTime = element * BASE_SEND_TIME;
- metricGroup.getNumRecordsSendCounter().inc();
+ metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
if (element % 2 == 0) {
metricGroup.getNumRecordsOutErrorsCounter().inc();
- metricGroup.getNumRecordsSendErrorsCounter().inc();
}
- metricGroup.getNumBytesSendCounter().inc(RECORD_SIZE_IN_BYTES);
+ metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
}
}
}