You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/17 16:45:37 UTC
[flink] 01/02: [FLINK-24409][kafka] Fix collection of KafkaSourceReaderMetrics for topics containing periods
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7312e13a6b25f798d13523f450babf9d49490ef0
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Fri Nov 12 09:10:01 2021 +0100
[FLINK-24409][kafka] Fix collection of KafkaSourceReaderMetrics for topics containing periods
Internally, Kafka translates the periods in topic names to underscore.
This led to that Flink could not collect the metrics and logged a
warning. With this commit we also translate the topic name before trying
to collect the metrics.
---
.../source/metrics/KafkaSourceReaderMetrics.java | 6 ++-
.../reader/KafkaPartitionSplitReaderTest.java | 47 ++++++++++++++--------
2 files changed, 34 insertions(+), 19 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
index 570338b..94c1cb4 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
@@ -289,6 +289,8 @@ public class KafkaSourceReaderMetrics {
private @Nullable Metric getRecordsLagMetric(
Map<MetricName, ? extends Metric> metrics, TopicPartition tp) {
try {
+ final String resolvedTopic = tp.topic().replace('.', '_');
+ final String resolvedPartition = String.valueOf(tp.partition());
Predicate<Map.Entry<MetricName, ? extends Metric>> filter =
entry -> {
final MetricName metricName = entry.getKey();
@@ -297,9 +299,9 @@ public class KafkaSourceReaderMetrics {
return metricName.group().equals(CONSUMER_FETCH_MANAGER_GROUP)
&& metricName.name().equals(RECORDS_LAG)
&& tags.containsKey("topic")
- && tags.get("topic").equals(tp.topic())
+ && tags.get("topic").equals(resolvedTopic)
&& tags.containsKey("partition")
- && tags.get("partition").equals(String.valueOf(tp.partition()));
+ && tags.get("partition").equals(resolvedPartition);
};
return MetricUtil.getKafkaMetric(metrics, filter);
} catch (IllegalStateException e) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 6e4af37..d4dd33f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -44,9 +44,12 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.hamcrest.Matchers;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EmptySource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -64,11 +67,11 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Unit tests for {@link KafkaPartitionSplitReader}. */
public class KafkaPartitionSplitReaderTest {
@@ -79,7 +82,7 @@ public class KafkaPartitionSplitReaderTest {
private static Map<Integer, Map<String, KafkaPartitionSplit>> splitsByOwners;
private static Map<TopicPartition, Long> earliestOffsets;
- @BeforeClass
+ @BeforeAll
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
KafkaSourceTestEnv.setupTopic(TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopic);
@@ -91,7 +94,7 @@ public class KafkaPartitionSplitReaderTest {
KafkaSourceTestEnv.getPartitionsForTopics(Arrays.asList(TOPIC1, TOPIC2)));
}
- @AfterClass
+ @AfterAll
public static void tearDown() throws Exception {
KafkaSourceTestEnv.tearDown();
}
@@ -162,8 +165,18 @@ public class KafkaPartitionSplitReaderTest {
assertThat(numBytesInCounter.getCount(), Matchers.greaterThan(latestNumBytesIn));
}
- @Test
- public void testPendingRecordsGauge() throws Exception {
+ @ParameterizedTest
+ @EmptySource
+ @ValueSource(strings = {"_underscore.period-minus"})
+ public void testPendingRecordsGauge(String topicSuffix) throws Throwable {
+ final String topic1Name = TOPIC1 + topicSuffix;
+ final String topic2Name = TOPIC2 + topicSuffix;
+ if (!topicSuffix.isEmpty()) {
+ KafkaSourceTestEnv.setupTopic(
+ topic1Name, true, true, KafkaSourceTestEnv::getRecordsForTopic);
+ KafkaSourceTestEnv.setupTopic(
+ topic2Name, true, true, KafkaSourceTestEnv::getRecordsForTopic);
+ }
MetricListener metricListener = new MetricListener();
final Properties props = new Properties();
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
@@ -175,7 +188,7 @@ public class KafkaPartitionSplitReaderTest {
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
- new KafkaPartitionSplit(new TopicPartition(TOPIC1, 0), 0L))));
+ new KafkaPartitionSplit(new TopicPartition(topic1Name, 0), 0L))));
// pendingRecords should have not been registered because of lazily registration
assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
// Trigger first fetch
@@ -194,7 +207,7 @@ public class KafkaPartitionSplitReaderTest {
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
- new KafkaPartitionSplit(new TopicPartition(TOPIC2, 0), 0L))));
+ new KafkaPartitionSplit(new TopicPartition(topic2Name, 0), 0L))));
// Validate pendingRecords
for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
reader.fetch();
@@ -282,11 +295,11 @@ public class KafkaPartitionSplitReaderTest {
long earliestOffset = earliestOffsets.get(tp);
long expectedRecordCount = NUM_RECORDS_PER_PARTITION - earliestOffset;
assertEquals(
+ expectedRecordCount,
+ (long) recordCount,
String.format(
"%s should have %d records.",
- splits.get(splitId), expectedRecordCount),
- expectedRecordCount,
- (long) recordCount);
+ splits.get(splitId), expectedRecordCount));
});
}