You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/17 16:45:37 UTC

[flink] 01/02: [FLINK-24409][kafka] Fix collection of KafkaSourceReaderMetrics for topics containing periods

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7312e13a6b25f798d13523f450babf9d49490ef0
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Fri Nov 12 09:10:01 2021 +0100

    [FLINK-24409][kafka] Fix collection of KafkaSourceReaderMetrics for topics containing periods
    
    Internally, Kafka translates the periods in topic names to underscore.
    This led to that Flink could not collect the metrics and logged a
    warning. With this commit we also translate the topic name before trying
    to collect the metrics.
---
 .../source/metrics/KafkaSourceReaderMetrics.java   |  6 ++-
 .../reader/KafkaPartitionSplitReaderTest.java      | 47 ++++++++++++++--------
 2 files changed, 34 insertions(+), 19 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
index 570338b..94c1cb4 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
@@ -289,6 +289,8 @@ public class KafkaSourceReaderMetrics {
     private @Nullable Metric getRecordsLagMetric(
             Map<MetricName, ? extends Metric> metrics, TopicPartition tp) {
         try {
+            final String resolvedTopic = tp.topic().replace('.', '_');
+            final String resolvedPartition = String.valueOf(tp.partition());
             Predicate<Map.Entry<MetricName, ? extends Metric>> filter =
                     entry -> {
                         final MetricName metricName = entry.getKey();
@@ -297,9 +299,9 @@ public class KafkaSourceReaderMetrics {
                         return metricName.group().equals(CONSUMER_FETCH_MANAGER_GROUP)
                                 && metricName.name().equals(RECORDS_LAG)
                                 && tags.containsKey("topic")
-                                && tags.get("topic").equals(tp.topic())
+                                && tags.get("topic").equals(resolvedTopic)
                                 && tags.containsKey("partition")
-                                && tags.get("partition").equals(String.valueOf(tp.partition()));
+                                && tags.get("partition").equals(resolvedPartition);
                     };
             return MetricUtil.getKafkaMetric(metrics, filter);
         } catch (IllegalStateException e) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 6e4af37..d4dd33f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -44,9 +44,12 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.hamcrest.Matchers;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EmptySource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -64,11 +67,11 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Unit tests for {@link KafkaPartitionSplitReader}. */
 public class KafkaPartitionSplitReaderTest {
@@ -79,7 +82,7 @@ public class KafkaPartitionSplitReaderTest {
     private static Map<Integer, Map<String, KafkaPartitionSplit>> splitsByOwners;
     private static Map<TopicPartition, Long> earliestOffsets;
 
-    @BeforeClass
+    @BeforeAll
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         KafkaSourceTestEnv.setupTopic(TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopic);
@@ -91,7 +94,7 @@ public class KafkaPartitionSplitReaderTest {
                         KafkaSourceTestEnv.getPartitionsForTopics(Arrays.asList(TOPIC1, TOPIC2)));
     }
 
-    @AfterClass
+    @AfterAll
     public static void tearDown() throws Exception {
         KafkaSourceTestEnv.tearDown();
     }
@@ -162,8 +165,18 @@ public class KafkaPartitionSplitReaderTest {
         assertThat(numBytesInCounter.getCount(), Matchers.greaterThan(latestNumBytesIn));
     }
 
-    @Test
-    public void testPendingRecordsGauge() throws Exception {
+    @ParameterizedTest
+    @EmptySource
+    @ValueSource(strings = {"_underscore.period-minus"})
+    public void testPendingRecordsGauge(String topicSuffix) throws Throwable {
+        final String topic1Name = TOPIC1 + topicSuffix;
+        final String topic2Name = TOPIC2 + topicSuffix;
+        if (!topicSuffix.isEmpty()) {
+            KafkaSourceTestEnv.setupTopic(
+                    topic1Name, true, true, KafkaSourceTestEnv::getRecordsForTopic);
+            KafkaSourceTestEnv.setupTopic(
+                    topic2Name, true, true, KafkaSourceTestEnv::getRecordsForTopic);
+        }
         MetricListener metricListener = new MetricListener();
         final Properties props = new Properties();
         props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
@@ -175,7 +188,7 @@ public class KafkaPartitionSplitReaderTest {
         reader.handleSplitsChanges(
                 new SplitsAddition<>(
                         Collections.singletonList(
-                                new KafkaPartitionSplit(new TopicPartition(TOPIC1, 0), 0L))));
+                                new KafkaPartitionSplit(new TopicPartition(topic1Name, 0), 0L))));
         // pendingRecords should have not been registered because of lazily registration
         assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
         // Trigger first fetch
@@ -194,7 +207,7 @@ public class KafkaPartitionSplitReaderTest {
         reader.handleSplitsChanges(
                 new SplitsAddition<>(
                         Collections.singletonList(
-                                new KafkaPartitionSplit(new TopicPartition(TOPIC2, 0), 0L))));
+                                new KafkaPartitionSplit(new TopicPartition(topic2Name, 0), 0L))));
         // Validate pendingRecords
         for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
             reader.fetch();
@@ -282,11 +295,11 @@ public class KafkaPartitionSplitReaderTest {
                     long earliestOffset = earliestOffsets.get(tp);
                     long expectedRecordCount = NUM_RECORDS_PER_PARTITION - earliestOffset;
                     assertEquals(
+                            expectedRecordCount,
+                            (long) recordCount,
                             String.format(
                                     "%s should have %d records.",
-                                    splits.get(splitId), expectedRecordCount),
-                            expectedRecordCount,
-                            (long) recordCount);
+                                    splits.get(splitId), expectedRecordCount));
                 });
     }