You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/17 16:45:36 UTC

[flink] branch master updated (e5111c9 -> da8ef26)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e5111c9  [FLINK-24776][table] Clarify DecodingFormat and introduce ProjectableDecodingFormat
     new 7312e13  [FLINK-24409][kafka] Fix collection of KafkaSourceReaderMetrics for topics containing periods
     new da8ef26  [FLINK-24409][kafka] Log PendingRecords metrics name if record lag collection fails

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../source/metrics/KafkaSourceReaderMetrics.java   |  8 ++--
 .../reader/KafkaPartitionSplitReaderTest.java      | 47 ++++++++++++++--------
 2 files changed, 35 insertions(+), 20 deletions(-)

[flink] 02/02: [FLINK-24409][kafka] Log PendingRecords metrics name if record lag collection fails

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da8ef265e3f78ec71aad43b4f8209884eaf3a860
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Wed Nov 17 10:56:21 2021 +0100

    [FLINK-24409][kafka] Log PendingRecords metrics name if record lag collection fails
---
 .../flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
index 94c1cb4..762f9a8 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
@@ -310,7 +310,7 @@ public class KafkaSourceReaderMetrics {
                             "Error when getting Kafka consumer metric \"%s\" "
                                     + "for partition \"%s\". "
                                     + "Metric \"%s\" may not be reported correctly. ",
-                            RECORDS_LAG, tp, MetricNames.PENDING_BYTES),
+                            RECORDS_LAG, tp, MetricNames.PENDING_RECORDS),
                     e);
             return null;
         }

[flink] 01/02: [FLINK-24409][kafka] Fix collection of KafkaSourceReaderMetrics for topics containing periods

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7312e13a6b25f798d13523f450babf9d49490ef0
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Fri Nov 12 09:10:01 2021 +0100

    [FLINK-24409][kafka] Fix collection of KafkaSourceReaderMetrics for topics containing periods
    
    Internally, Kafka translates the periods in topic names to underscore.
    This led to that Flink could not collect the metrics and logged a
    warning. With this commit we also translate the topic name before trying
    to collect the metrics.
---
 .../source/metrics/KafkaSourceReaderMetrics.java   |  6 ++-
 .../reader/KafkaPartitionSplitReaderTest.java      | 47 ++++++++++++++--------
 2 files changed, 34 insertions(+), 19 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
index 570338b..94c1cb4 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
@@ -289,6 +289,8 @@ public class KafkaSourceReaderMetrics {
     private @Nullable Metric getRecordsLagMetric(
             Map<MetricName, ? extends Metric> metrics, TopicPartition tp) {
         try {
+            final String resolvedTopic = tp.topic().replace('.', '_');
+            final String resolvedPartition = String.valueOf(tp.partition());
             Predicate<Map.Entry<MetricName, ? extends Metric>> filter =
                     entry -> {
                         final MetricName metricName = entry.getKey();
@@ -297,9 +299,9 @@ public class KafkaSourceReaderMetrics {
                         return metricName.group().equals(CONSUMER_FETCH_MANAGER_GROUP)
                                 && metricName.name().equals(RECORDS_LAG)
                                 && tags.containsKey("topic")
-                                && tags.get("topic").equals(tp.topic())
+                                && tags.get("topic").equals(resolvedTopic)
                                 && tags.containsKey("partition")
-                                && tags.get("partition").equals(String.valueOf(tp.partition()));
+                                && tags.get("partition").equals(resolvedPartition);
                     };
             return MetricUtil.getKafkaMetric(metrics, filter);
         } catch (IllegalStateException e) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 6e4af37..d4dd33f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -44,9 +44,12 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.hamcrest.Matchers;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EmptySource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -64,11 +67,11 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Unit tests for {@link KafkaPartitionSplitReader}. */
 public class KafkaPartitionSplitReaderTest {
@@ -79,7 +82,7 @@ public class KafkaPartitionSplitReaderTest {
     private static Map<Integer, Map<String, KafkaPartitionSplit>> splitsByOwners;
     private static Map<TopicPartition, Long> earliestOffsets;
 
-    @BeforeClass
+    @BeforeAll
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         KafkaSourceTestEnv.setupTopic(TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopic);
@@ -91,7 +94,7 @@ public class KafkaPartitionSplitReaderTest {
                         KafkaSourceTestEnv.getPartitionsForTopics(Arrays.asList(TOPIC1, TOPIC2)));
     }
 
-    @AfterClass
+    @AfterAll
     public static void tearDown() throws Exception {
         KafkaSourceTestEnv.tearDown();
     }
@@ -162,8 +165,18 @@ public class KafkaPartitionSplitReaderTest {
         assertThat(numBytesInCounter.getCount(), Matchers.greaterThan(latestNumBytesIn));
     }
 
-    @Test
-    public void testPendingRecordsGauge() throws Exception {
+    @ParameterizedTest
+    @EmptySource
+    @ValueSource(strings = {"_underscore.period-minus"})
+    public void testPendingRecordsGauge(String topicSuffix) throws Throwable {
+        final String topic1Name = TOPIC1 + topicSuffix;
+        final String topic2Name = TOPIC2 + topicSuffix;
+        if (!topicSuffix.isEmpty()) {
+            KafkaSourceTestEnv.setupTopic(
+                    topic1Name, true, true, KafkaSourceTestEnv::getRecordsForTopic);
+            KafkaSourceTestEnv.setupTopic(
+                    topic2Name, true, true, KafkaSourceTestEnv::getRecordsForTopic);
+        }
         MetricListener metricListener = new MetricListener();
         final Properties props = new Properties();
         props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
@@ -175,7 +188,7 @@ public class KafkaPartitionSplitReaderTest {
         reader.handleSplitsChanges(
                 new SplitsAddition<>(
                         Collections.singletonList(
-                                new KafkaPartitionSplit(new TopicPartition(TOPIC1, 0), 0L))));
+                                new KafkaPartitionSplit(new TopicPartition(topic1Name, 0), 0L))));
         // pendingRecords should have not been registered because of lazily registration
         assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
         // Trigger first fetch
@@ -194,7 +207,7 @@ public class KafkaPartitionSplitReaderTest {
         reader.handleSplitsChanges(
                 new SplitsAddition<>(
                         Collections.singletonList(
-                                new KafkaPartitionSplit(new TopicPartition(TOPIC2, 0), 0L))));
+                                new KafkaPartitionSplit(new TopicPartition(topic2Name, 0), 0L))));
         // Validate pendingRecords
         for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
             reader.fetch();
@@ -282,11 +295,11 @@ public class KafkaPartitionSplitReaderTest {
                     long earliestOffset = earliestOffsets.get(tp);
                     long expectedRecordCount = NUM_RECORDS_PER_PARTITION - earliestOffset;
                     assertEquals(
+                            expectedRecordCount,
+                            (long) recordCount,
                             String.format(
                                     "%s should have %d records.",
-                                    splits.get(splitId), expectedRecordCount),
-                            expectedRecordCount,
-                            (long) recordCount);
+                                    splits.get(splitId), expectedRecordCount));
                 });
     }