You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/07 08:30:32 UTC

[flink] 03/04: [FLINK-26126][kafka] develop record out error counter metric

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d07df5791f42b64891c5396f912841c9c7133d7a
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Mar 2 21:06:30 2022 +0100

    [FLINK-26126][kafka] develop record out error counter metric
---
 .../flink/connector/kafka/sink/KafkaWriter.java    |  7 ++++-
 .../connector/kafka/sink/KafkaWriterITCase.java    | 36 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 8102ee8..ecd62a0 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -95,6 +95,7 @@ class KafkaWriter<IN>
     private final boolean disabledMetrics;
     private final Counter numRecordsSendCounter;
     private final Counter numBytesSendCounter;
+    private final Counter numRecordsOutErrorsCounter;
     private final ProcessingTimeService timeService;
 
     // Number of outgoing bytes at the latest metric sync
@@ -154,6 +155,7 @@ class KafkaWriter<IN>
         this.metricGroup = sinkInitContext.metricGroup();
         this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
         this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
+        this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
         this.kafkaSinkContext =
                 new DefaultKafkaSinkContext(
                         sinkInitContext.getSubtaskId(),
@@ -410,7 +412,10 @@ class KafkaWriter<IN>
                 FlinkKafkaInternalProducer<byte[], byte[]> producer =
                         KafkaWriter.this.currentProducer;
                 mailboxExecutor.execute(
-                        () -> throwException(metadata, exception, producer),
+                        () -> {
+                            numRecordsOutErrorsCounter.inc();
+                            throwException(metadata, exception, producer);
+                        },
                         "Failed to send data to Kafka");
             }
 
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index b8447a1..ee21d04 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -193,6 +193,42 @@ public class KafkaWriterITCase {
     }
 
     @Test
+    void testNumRecordsOutErrorsCounterMetric() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
+        final InternalSinkWriterMetricGroup metricGroup =
+                InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
+
+        try (final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
+            final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
+            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(0L);
+
+            writer.write(1, SINK_WRITER_CONTEXT);
+            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(0L);
+
+            final String transactionalId = writer.getCurrentProducer().getTransactionalId();
+
+            try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                    new FlinkKafkaInternalProducer<>(properties, transactionalId)) {
+
+                producer.initTransactions();
+                producer.beginTransaction();
+                producer.send(new ProducerRecord<byte[], byte[]>(topic, "2".getBytes()));
+                producer.commitTransaction();
+            }
+
+            writer.write(3, SINK_WRITER_CONTEXT);
+            writer.flush(false);
+            writer.prepareCommit();
+            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(1L);
+        }
+    }
+
+    @Test
     public void testMetadataPublisher() throws Exception {
         List<String> metadataList = new ArrayList<>();
         try (final KafkaWriter<Integer> writer =