You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/07 08:30:31 UTC

[flink] 02/04: [FLINK-26126][kafka] use record/byte send counter metrics from SinkWriterMetricGroup directly. Bug fixed. Enable the metric test in KafkaSinkITCase and KafkaSinkE2ECase again.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 339cd3c98e4783e3c8225c5495be40a0c04ff386
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Thu Feb 17 23:35:47 2022 +0100

    [FLINK-26126][kafka] use record/byte send counter metrics from SinkWriterMetricGroup directly. Bug fixed. Enable the metric test in KafkaSinkITCase and KafkaSinkE2ECase again.
---
 .../apache/flink/connector/kafka/sink/KafkaWriter.java    | 15 ++++++++-------
 .../flink/connector/kafka/sink/KafkaSinkITCase.java       | 13 -------------
 .../flink/connector/kafka/sink/KafkaWriterITCase.java     | 15 ++++++++++-----
 .../apache/flink/tests/util/kafka/KafkaSinkE2ECase.java   | 13 -------------
 4 files changed, 18 insertions(+), 38 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 0e0a874..8102ee8 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -89,12 +89,13 @@ class KafkaWriter<IN>
     private final KafkaRecordSerializationSchema<IN> recordSerializer;
     private final Callback deliveryCallback;
     private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext;
+
     private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
     private final SinkWriterMetricGroup metricGroup;
-    private final Counter numBytesOutCounter;
-    private final ProcessingTimeService timeService;
     private final boolean disabledMetrics;
-    private final Counter numRecordsOutCounter;
+    private final Counter numRecordsSendCounter;
+    private final Counter numBytesSendCounter;
+    private final ProcessingTimeService timeService;
 
     // Number of outgoing bytes at the latest metric sync
     private long latestOutgoingByteTotal;
@@ -151,8 +152,8 @@ class KafkaWriter<IN>
         checkNotNull(sinkInitContext, "sinkInitContext");
         this.timeService = sinkInitContext.getProcessingTimeService();
         this.metricGroup = sinkInitContext.metricGroup();
-        this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
-        this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
+        this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
         this.kafkaSinkContext =
                 new DefaultKafkaSinkContext(
                         sinkInitContext.getSubtaskId(),
@@ -192,7 +193,7 @@ class KafkaWriter<IN>
         final ProducerRecord<byte[], byte[]> record =
                 recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
         currentProducer.send(record, deliveryCallback);
-        numRecordsOutCounter.inc();
+        numRecordsSendCounter.inc();
     }
 
     @Override
@@ -385,7 +386,7 @@ class KafkaWriter<IN>
                     long outgoingBytesUntilNow = ((Number) byteOutMetric.metricValue()).longValue();
                     long outgoingBytesSinceLastUpdate =
                             outgoingBytesUntilNow - latestOutgoingByteTotal;
-                    numBytesOutCounter.inc(outgoingBytesSinceLastUpdate);
+                    numBytesSendCounter.inc(outgoingBytesSinceLastUpdate);
                     latestOutgoingByteTotal = outgoingBytesUntilNow;
                     lastSync = time;
                     registerMetricSync();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 32210c6..be7a928 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -34,9 +34,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
 import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
-import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -81,9 +79,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.TestTemplate;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -205,15 +201,6 @@ public class KafkaSinkITCase extends TestLogger {
         @TestContext
         KafkaSinkExternalContextFactory sinkContext =
                 new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList());
-
-        @Disabled("Skip metric test until FLINK-26126 fixed")
-        @TestTemplate
-        @Override
-        public void testMetrics(
-                TestEnvironment testEnv,
-                DataStreamSinkExternalContext<String> externalContext,
-                CheckpointingMode semantic)
-                throws Exception {}
     }
 
     @Test
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index d68a813..b8447a1 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -147,13 +147,18 @@ public class KafkaWriterITCase {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
                         getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
-            final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
-            final Counter numRecordsOut = operatorIOMetricGroup.getNumRecordsOutCounter();
-            assertEquals(numBytesOut.getCount(), 0L);
+            final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
+            final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter();
+            final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter();
+            assertEquals(numBytesSend.getCount(), 0L);
+            assertEquals(numRecordsSend.getCount(), 0);
+            assertEquals(numRecordsWrittenErrors.getCount(), 0);
+
             writer.write(1, SINK_WRITER_CONTEXT);
             timeService.trigger();
-            assertEquals(numRecordsOut.getCount(), 1);
-            assertThat(numBytesOut.getCount(), greaterThan(0L));
+            assertEquals(numRecordsSend.getCount(), 1);
+            assertEquals(numRecordsWrittenErrors.getCount(), 0);
+            assertThat(numBytesSend.getCount(), greaterThan(0L));
         }
     }
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
index ff845f4..0c75a00 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -19,9 +19,7 @@
 package org.apache.flink.tests.util.kafka;
 
 import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
 import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
-import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -32,8 +30,6 @@ import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
 import org.apache.flink.util.DockerImageVersions;
 
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
@@ -84,13 +80,4 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
                                     .toURL()));
 
     public KafkaSinkE2ECase() throws Exception {}
-
-    @Disabled("Skip metric test until FLINK-26126 fixed")
-    @TestTemplate
-    @Override
-    public void testMetrics(
-            TestEnvironment testEnv,
-            DataStreamSinkExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {}
 }