You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/06/21 11:11:03 UTC
[kafka] branch trunk updated: MINOR: change Streams topic-level metrics tag from 'topic-name' to 'topic' (#12310)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0928666987 MINOR: change Streams topic-level metrics tag from 'topic-name' to 'topic' (#12310)
0928666987 is described below
commit 09286669877abaf17a1d9d52fa34c294e050f835
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Jun 21 04:10:36 2022 -0700
MINOR: change Streams topic-level metrics tag from 'topic-name' to 'topic' (#12310)
Changes the tag name from topic-name to just topic to conform to the way this tag is named elsewhere (ie in the clients)
Also:
- fixes a comment about dynamic topic routing
- fixes some indentation in MockRecordCollector
- Undoes the changes to KStreamSplitTest.scala and TestTopicsTest which are no longer necessary after this hotfix
Reviewers: Bruno Cadonna <ca...@apache.org>
---
.../processor/internals/RecordCollectorImpl.java | 3 +-
.../internals/metrics/StreamsMetricsImpl.java | 2 +-
.../internals/metrics/TopicMetricsTest.java | 18 +++---
.../org/apache/kafka/test/MockRecordCollector.java | 20 ++++---
.../streams/scala/kstream/KStreamSplitTest.scala | 69 +++++++++-------------
.../org/apache/kafka/streams/TestTopicsTest.java | 49 +++++++--------
6 files changed, 74 insertions(+), 87 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 358af6b1b3..38445fde9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -219,12 +219,13 @@ public class RecordCollectorImpl implements RecordCollector {
}
if (!topic.endsWith("-changelog")) {
- // we may not have created a sensor yet if the node uses dynamic topic routing
final Map<String, Sensor> producedSensorByTopic = sinkNodeToProducedSensorByTopic.get(processorNodeId);
if (producedSensorByTopic == null) {
log.error("Unable to records bytes produced to topic {} by sink node {} as the node is not recognized.\n"
+ "Known sink nodes are {}.", topic, processorNodeId, sinkNodeToProducedSensorByTopic.keySet());
} else {
+ // we may not have created a sensor during initialization if the node uses dynamic topic routing,
+ // as all topics are not known up front, so create the sensor for that topic if absent
final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
topic,
t -> TopicMetrics.producedSensor(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index cb77fc9bdd..3260bfc1b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -117,7 +117,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String THREAD_ID_TAG = "thread-id";
public static final String TASK_ID_TAG = "task-id";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
- public static final String TOPIC_NAME_TAG = "topic-name";
+ public static final String TOPIC_NAME_TAG = "topic";
public static final String STORE_ID_TAG = "state-id";
public static final String RECORD_CACHE_ID_TAG = "record-cache-id";
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TopicMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TopicMetricsTest.java
index c359affe5d..b698b26192 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TopicMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TopicMetricsTest.java
@@ -39,7 +39,7 @@ public class TopicMetricsTest {
private static final String THREAD_ID = "test-thread";
private static final String TASK_ID = "test-task";
private static final String PROCESSOR_NODE_ID = "test-processor";
- private static final String TOPIC_NAME = "topic";
+ private static final String TOPIC = "topic";
private final Map<String, String> tagMap = Collections.singletonMap("hello", "world");
@@ -59,14 +59,14 @@ public class TopicMetricsTest {
final String descriptionOfRecordsTotal = "The total number of records consumed from this topic";
final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic";
- when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "consumed", RecordingLevel.INFO))
+ when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "consumed", RecordingLevel.INFO))
.thenReturn(expectedSensor);
- when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "consumed", RecordingLevel.INFO))
+ when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "consumed", RecordingLevel.INFO))
.thenReturn(expectedSensor);
- when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME)).thenReturn(tagMap);
+ when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC)).thenReturn(tagMap);
verifySensor(
- () -> TopicMetrics.consumedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics)
+ () -> TopicMetrics.consumedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, streamsMetrics)
);
STREAMS_METRICS_STATIC_MOCK.verify(
@@ -89,13 +89,13 @@ public class TopicMetricsTest {
final String descriptionOfRecordsTotal = "The total number of records produced to this topic";
final String descriptionOfBytesTotal = "The total number of bytes produced to this topic";
- when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "produced", RecordingLevel.INFO))
+ when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "produced", RecordingLevel.INFO))
.thenReturn(expectedSensor);
- when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "produced", RecordingLevel.INFO))
+ when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "produced", RecordingLevel.INFO))
.thenReturn(expectedSensor);
- when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME)).thenReturn(tagMap);
+ when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC)).thenReturn(tagMap);
- verifySensor(() -> TopicMetrics.producedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics));
+ verifySensor(() -> TopicMetrics.producedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, streamsMetrics));
STREAMS_METRICS_STATIC_MOCK.verify(
() -> StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
index c99a32c980..8a7f543496 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
@@ -50,12 +50,14 @@ public class MockRecordCollector implements RecordCollector {
final Serializer<V> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context) {
- collected.add(new ProducerRecord<>(topic,
- partition,
- timestamp,
- key,
- value,
- headers));
+ collected.add(new ProducerRecord<>(
+ topic,
+ partition,
+ timestamp,
+ key,
+ value,
+ headers)
+ );
}
@Override
@@ -69,12 +71,14 @@ public class MockRecordCollector implements RecordCollector {
final String processorNodeId,
final InternalProcessorContext<Void, Void> context,
final StreamPartitioner<? super K, ? super V> partitioner) {
- collected.add(new ProducerRecord<>(topic,
+ collected.add(new ProducerRecord<>(
+ topic,
0, // partition id
timestamp,
key,
value,
- headers));
+ headers)
+ );
}
@Override
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala
index 89adbd6b1e..bbcc1b503f 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.Named
-import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.serialization.Serdes._
@@ -36,7 +35,7 @@ class KStreamSplitTest extends TestDriver {
val sinkTopic = Array("default", "even", "three");
val m = builder
- .stream[Int, Int](sourceTopic)
+ .stream[Integer, Integer](sourceTopic)
.split(Named.as("_"))
.branch((_, v) => v % 2 == 0)
.branch((_, v) => v % 3 == 0)
@@ -47,17 +46,14 @@ class KStreamSplitTest extends TestDriver {
m("_2").to(sinkTopic(2))
val testDriver = createTestDriver(builder)
- val testInput = testDriver.createInput[Int, Int](sourceTopic)
- val testOutput = sinkTopic.map(name => testDriver.createOutput[Int, Int](name))
-
- testInput pipeKeyValueList List(
- new KeyValue(1, 1),
- new KeyValue(1, 2),
- new KeyValue(1, 3),
- new KeyValue(1, 4),
- new KeyValue(1, 5)
- ).asJava
-
+ val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
+ val testOutput = sinkTopic.map(name => testDriver.createOutput[Integer, Integer](name))
+
+ testInput.pipeValueList(
+ List(1, 2, 3, 4, 5)
+ .map(Integer.valueOf)
+ .asJava
+ )
assertEquals(List(1, 5), testOutput(0).readValuesToList().asScala)
assertEquals(List(2, 4), testOutput(1).readValuesToList().asScala)
assertEquals(List(3), testOutput(2).readValuesToList().asScala)
@@ -71,7 +67,7 @@ class KStreamSplitTest extends TestDriver {
val sourceTopic = "source"
val m = builder
- .stream[Int, Int](sourceTopic)
+ .stream[Integer, Integer](sourceTopic)
.split(Named.as("_"))
.branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"), "consumedEvens"))
.branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x), "mapped"))
@@ -80,18 +76,15 @@ class KStreamSplitTest extends TestDriver {
m("_mapped").to("mapped")
val testDriver = createTestDriver(builder)
- val testInput = testDriver.createInput[Int, Int](sourceTopic)
- testInput pipeKeyValueList List(
- new KeyValue(1, 1),
- new KeyValue(1, 2),
- new KeyValue(1, 3),
- new KeyValue(1, 4),
- new KeyValue(1, 5),
- new KeyValue(1, 9)
- ).asJava
-
- val even = testDriver.createOutput[Int, Int]("even")
- val mapped = testDriver.createOutput[Int, Int]("mapped")
+ val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
+ testInput.pipeValueList(
+ List(1, 2, 3, 4, 5, 9)
+ .map(Integer.valueOf)
+ .asJava
+ )
+
+ val even = testDriver.createOutput[Integer, Integer]("even")
+ val mapped = testDriver.createOutput[Integer, Integer]("mapped")
assertEquals(List(2, 4), even.readValuesToList().asScala)
assertEquals(List(9, 81), mapped.readValuesToList().asScala)
@@ -105,7 +98,7 @@ class KStreamSplitTest extends TestDriver {
val sourceTopic = "source"
val m = builder
- .stream[Int, Int](sourceTopic)
+ .stream[Integer, Integer](sourceTopic)
.split(Named.as("_"))
.branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even")))
.branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x)))
@@ -114,23 +107,19 @@ class KStreamSplitTest extends TestDriver {
m("_2").to("mapped")
val testDriver = createTestDriver(builder)
- val testInput = testDriver.createInput[Int, Int](sourceTopic)
- testInput pipeKeyValueList List(
- new KeyValue(1, 1),
- new KeyValue(1, 2),
- new KeyValue(1, 3),
- new KeyValue(1, 4),
- new KeyValue(1, 5),
- new KeyValue(1, 9)
- ).asJava
-
- val even = testDriver.createOutput[Int, Int]("even")
- val mapped = testDriver.createOutput[Int, Int]("mapped")
+ val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
+ testInput.pipeValueList(
+ List(1, 2, 3, 4, 5, 9)
+ .map(Integer.valueOf)
+ .asJava
+ )
+
+ val even = testDriver.createOutput[Integer, Integer]("even")
+ val mapped = testDriver.createOutput[Integer, Integer]("mapped")
assertEquals(List(2, 4), even.readValuesToList().asScala)
assertEquals(List(9, 81), mapped.readValuesToList().asScala)
testDriver.close()
}
-
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
index 84b8c5ebd6..ac82c938d7 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
@@ -41,7 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
-import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -86,12 +85,12 @@ public class TestTopicsTest {
@Test
public void testValue() {
- final TestInputTopic<Long, String> inputTopic =
- testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
+ final TestInputTopic<String, String> inputTopic =
+ testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
- //Feed word "Hello" to inputTopic, timestamp and key irrelevant in this case
- inputTopic.pipeInput(1L, "Hello");
+ //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
+ inputTopic.pipeInput("Hello");
assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic
assertThat(outputTopic.isEmpty(), is(true));
@@ -99,20 +98,16 @@ public class TestTopicsTest {
@Test
public void testValueList() {
- final TestInputTopic<Long, String> inputTopic =
- testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
+ final TestInputTopic<String, String> inputTopic =
+ testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
- final List<KeyValue<Long, String>> inputList = Arrays.asList(
- new KeyValue<>(1L, "This"),
- new KeyValue<>(2L, "is"),
- new KeyValue<>(3L, "an"),
- new KeyValue<>(4L, "example"));
- //Feed list of words to inputTopic, key and timestamp are irrelevant in this case
- inputTopic.pipeKeyValueList(inputList);
+ final List<String> inputList = Arrays.asList("This", "is", "an", "example");
+ //Feed list of words to inputTopic and no kafka key, timestamp is irrelevant in this case
+ inputTopic.pipeValueList(inputList);
final List<String> output = outputTopic.readValuesToList();
assertThat(output, hasItems("This", "is", "an", "example"));
- assertThat(output, is(equalTo(inputList.stream().map(kv -> kv.value).collect(Collectors.toList()))));
+ assertThat(output, is(equalTo(inputList)));
}
@Test
@@ -229,8 +224,8 @@ public class TestTopicsTest {
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
- inputTopic.pipeInput(1L, "Hello", baseTime);
- assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", null, baseTime))));
+ inputTopic.pipeInput(null, "Hello", baseTime);
+ assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", null, baseTime))));
inputTopic.pipeInput(2L, "Kafka", ++baseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", null, baseTime))));
@@ -238,15 +233,13 @@ public class TestTopicsTest {
inputTopic.pipeInput(2L, "Kafka", testBaseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime))));
- final List<KeyValue<Long, String>> inputList = Arrays.asList(
- new KeyValue<>(1L, "Advancing"),
- new KeyValue<>(2L, "time"));
+ final List<String> inputList = Arrays.asList("Advancing", "time");
//Feed list of words to inputTopic and no kafka key, timestamp advancing from testInstant
final Duration advance = Duration.ofSeconds(15);
final Instant recordInstant = testBaseTime.plus(Duration.ofDays(1));
- inputTopic.pipeKeyValueList(inputList, recordInstant, advance);
- assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Advancing", recordInstant))));
- assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "time", null, recordInstant.plus(advance)))));
+ inputTopic.pipeValueList(inputList, recordInstant, advance);
+ assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Advancing", recordInstant))));
+ assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "time", null, recordInstant.plus(advance)))));
}
@Test
@@ -294,8 +287,8 @@ public class TestTopicsTest {
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, advance);
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
- inputTopic.pipeInput(1L, "Hello");
- assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", testBaseTime))));
+ inputTopic.pipeInput("Hello");
+ assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", testBaseTime))));
inputTopic.pipeInput(2L, "Kafka");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime.plus(advance)))));
}
@@ -339,12 +332,12 @@ public class TestTopicsTest {
@Test
public void testEmptyTopic() {
- final TestInputTopic<Long, String> inputTopic =
- testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
+ final TestInputTopic<String, String> inputTopic =
+ testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
- inputTopic.pipeInput(1L, "Hello");
+ inputTopic.pipeInput("Hello");
assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic
assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Empty topic");