You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/07 08:30:32 UTC
[flink] 03/04: [FLINK-26126][kafka] develop record out error counter metric
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d07df5791f42b64891c5396f912841c9c7133d7a
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Mar 2 21:06:30 2022 +0100
[FLINK-26126][kafka] develop record out error counter metric
---
.../flink/connector/kafka/sink/KafkaWriter.java | 7 ++++-
.../connector/kafka/sink/KafkaWriterITCase.java | 36 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 8102ee8..ecd62a0 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -95,6 +95,7 @@ class KafkaWriter<IN>
private final boolean disabledMetrics;
private final Counter numRecordsSendCounter;
private final Counter numBytesSendCounter;
+ private final Counter numRecordsOutErrorsCounter;
private final ProcessingTimeService timeService;
// Number of outgoing bytes at the latest metric sync
@@ -154,6 +155,7 @@ class KafkaWriter<IN>
this.metricGroup = sinkInitContext.metricGroup();
this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
+ this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
this.kafkaSinkContext =
new DefaultKafkaSinkContext(
sinkInitContext.getSubtaskId(),
@@ -410,7 +412,10 @@ class KafkaWriter<IN>
FlinkKafkaInternalProducer<byte[], byte[]> producer =
KafkaWriter.this.currentProducer;
mailboxExecutor.execute(
- () -> throwException(metadata, exception, producer),
+ () -> {
+ numRecordsOutErrorsCounter.inc();
+ throwException(metadata, exception, producer);
+ },
"Failed to send data to Kafka");
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index b8447a1..ee21d04 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -193,6 +193,42 @@ public class KafkaWriterITCase {
}
@Test
+ void testNumRecordsOutErrorsCounterMetric() throws Exception {
+ Properties properties = getKafkaClientConfiguration();
+ final InternalSinkWriterMetricGroup metricGroup =
+ InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
+
+ try (final KafkaWriter<Integer> writer =
+ createWriterWithConfiguration(
+ properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
+ final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
+ org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+ .isEqualTo(0L);
+
+ writer.write(1, SINK_WRITER_CONTEXT);
+ org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+ .isEqualTo(0L);
+
+ final String transactionalId = writer.getCurrentProducer().getTransactionalId();
+
+ try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+ new FlinkKafkaInternalProducer<>(properties, transactionalId)) {
+
+ producer.initTransactions();
+ producer.beginTransaction();
+ producer.send(new ProducerRecord<byte[], byte[]>(topic, "2".getBytes()));
+ producer.commitTransaction();
+ }
+
+ writer.write(3, SINK_WRITER_CONTEXT);
+ writer.flush(false);
+ writer.prepareCommit();
+ org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+ .isEqualTo(1L);
+ }
+ }
+
+ @Test
public void testMetadataPublisher() throws Exception {
List<String> metadataList = new ArrayList<>();
try (final KafkaWriter<Integer> writer =