You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/06/21 11:11:03 UTC

[kafka] branch trunk updated: MINOR: change Streams topic-level metrics tag from 'topic-name' to 'topic' (#12310)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0928666987 MINOR: change Streams topic-level metrics tag from 'topic-name' to 'topic' (#12310)
0928666987 is described below

commit 09286669877abaf17a1d9d52fa34c294e050f835
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Jun 21 04:10:36 2022 -0700

    MINOR: change Streams topic-level metrics tag from 'topic-name' to 'topic' (#12310)
    
    Changes the tag name from topic-name to just topic to conform to the way this tag is named elsewhere (ie in the clients)
    Also:
        - fixes a comment about dynamic topic routing
        - fixes some indentation in MockRecordCollector
        - Undoes the changes to KStreamSplitTest.scala and TestTopicsTest which are no longer necessary after this hotfix
    
    Reviewers: Bruno Cadonna <ca...@apache.org>
---
 .../processor/internals/RecordCollectorImpl.java   |  3 +-
 .../internals/metrics/StreamsMetricsImpl.java      |  2 +-
 .../internals/metrics/TopicMetricsTest.java        | 18 +++---
 .../org/apache/kafka/test/MockRecordCollector.java | 20 ++++---
 .../streams/scala/kstream/KStreamSplitTest.scala   | 69 +++++++++-------------
 .../org/apache/kafka/streams/TestTopicsTest.java   | 49 +++++++--------
 6 files changed, 74 insertions(+), 87 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 358af6b1b3..38445fde9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -219,12 +219,13 @@ public class RecordCollectorImpl implements RecordCollector {
                 }
 
                 if (!topic.endsWith("-changelog")) {
-                    // we may not have created a sensor yet if the node uses dynamic topic routing
                     final Map<String, Sensor> producedSensorByTopic = sinkNodeToProducedSensorByTopic.get(processorNodeId);
                     if (producedSensorByTopic == null) {
                         log.error("Unable to records bytes produced to topic {} by sink node {} as the node is not recognized.\n"
                                       + "Known sink nodes are {}.", topic, processorNodeId, sinkNodeToProducedSensorByTopic.keySet());
                     } else {
+                        // we may not have created a sensor during initialization if the node uses dynamic topic routing,
+                        // as all topics are not known up front, so create the sensor for that topic if absent
                         final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
                             topic,
                             t -> TopicMetrics.producedSensor(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index cb77fc9bdd..3260bfc1b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -117,7 +117,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     public static final String THREAD_ID_TAG = "thread-id";
     public static final String TASK_ID_TAG = "task-id";
     public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
-    public static final String TOPIC_NAME_TAG = "topic-name";
+    public static final String TOPIC_NAME_TAG = "topic";
     public static final String STORE_ID_TAG = "state-id";
     public static final String RECORD_CACHE_ID_TAG = "record-cache-id";
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TopicMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TopicMetricsTest.java
index c359affe5d..b698b26192 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TopicMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TopicMetricsTest.java
@@ -39,7 +39,7 @@ public class TopicMetricsTest {
     private static final String THREAD_ID = "test-thread";
     private static final String TASK_ID = "test-task";
     private static final String PROCESSOR_NODE_ID = "test-processor";
-    private static final String TOPIC_NAME = "topic";
+    private static final String TOPIC = "topic";
 
     private final Map<String, String> tagMap = Collections.singletonMap("hello", "world");
 
@@ -59,14 +59,14 @@ public class TopicMetricsTest {
         final String descriptionOfRecordsTotal = "The total number of records consumed from this topic";
         final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic";
 
-        when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "consumed", RecordingLevel.INFO))
+        when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "consumed", RecordingLevel.INFO))
             .thenReturn(expectedSensor);
-        when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "consumed", RecordingLevel.INFO))
+        when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "consumed", RecordingLevel.INFO))
             .thenReturn(expectedSensor);
-        when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME)).thenReturn(tagMap);
+        when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC)).thenReturn(tagMap);
 
         verifySensor(
-            () -> TopicMetrics.consumedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics)
+            () -> TopicMetrics.consumedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, streamsMetrics)
         );
 
         STREAMS_METRICS_STATIC_MOCK.verify(
@@ -89,13 +89,13 @@ public class TopicMetricsTest {
         final String descriptionOfRecordsTotal = "The total number of records produced to this topic";
         final String descriptionOfBytesTotal = "The total number of bytes produced to this topic";
 
-        when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "produced", RecordingLevel.INFO))
+        when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "produced", RecordingLevel.INFO))
             .thenReturn(expectedSensor);
-        when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "produced", RecordingLevel.INFO))
+        when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, "produced", RecordingLevel.INFO))
             .thenReturn(expectedSensor);
-        when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME)).thenReturn(tagMap);
+        when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC)).thenReturn(tagMap);
 
-        verifySensor(() -> TopicMetrics.producedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics));
+        verifySensor(() -> TopicMetrics.producedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC, streamsMetrics));
 
         STREAMS_METRICS_STATIC_MOCK.verify(
             () -> StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
index c99a32c980..8a7f543496 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
@@ -50,12 +50,14 @@ public class MockRecordCollector implements RecordCollector {
                             final Serializer<V> valueSerializer,
                             final String processorNodeId,
                             final InternalProcessorContext<Void, Void> context) {
-        collected.add(new ProducerRecord<>(topic,
-                                           partition,
-                                           timestamp,
-                                           key,
-                                           value,
-                                           headers));
+        collected.add(new ProducerRecord<>(
+            topic,
+            partition,
+            timestamp,
+            key,
+            value,
+            headers)
+        );
     }
 
     @Override
@@ -69,12 +71,14 @@ public class MockRecordCollector implements RecordCollector {
                             final String processorNodeId,
                             final InternalProcessorContext<Void, Void> context,
                             final StreamPartitioner<? super K, ? super V> partitioner) {
-        collected.add(new ProducerRecord<>(topic,
+        collected.add(new ProducerRecord<>(
+            topic,
             0, // partition id
             timestamp,
             key,
             value,
-            headers));
+            headers)
+        );
     }
 
     @Override
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala
index 89adbd6b1e..bbcc1b503f 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamSplitTest.scala
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.scala.kstream
 
 import org.apache.kafka.streams.kstream.Named
-import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.StreamsBuilder
 import org.apache.kafka.streams.scala.serialization.Serdes._
@@ -36,7 +35,7 @@ class KStreamSplitTest extends TestDriver {
     val sinkTopic = Array("default", "even", "three");
 
     val m = builder
-      .stream[Int, Int](sourceTopic)
+      .stream[Integer, Integer](sourceTopic)
       .split(Named.as("_"))
       .branch((_, v) => v % 2 == 0)
       .branch((_, v) => v % 3 == 0)
@@ -47,17 +46,14 @@ class KStreamSplitTest extends TestDriver {
     m("_2").to(sinkTopic(2))
 
     val testDriver = createTestDriver(builder)
-    val testInput = testDriver.createInput[Int, Int](sourceTopic)
-    val testOutput = sinkTopic.map(name => testDriver.createOutput[Int, Int](name))
-
-    testInput pipeKeyValueList List(
-      new KeyValue(1, 1),
-      new KeyValue(1, 2),
-      new KeyValue(1, 3),
-      new KeyValue(1, 4),
-      new KeyValue(1, 5)
-    ).asJava
-
+    val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
+    val testOutput = sinkTopic.map(name => testDriver.createOutput[Integer, Integer](name))
+
+    testInput.pipeValueList(
+      List(1, 2, 3, 4, 5)
+        .map(Integer.valueOf)
+        .asJava
+    )
     assertEquals(List(1, 5), testOutput(0).readValuesToList().asScala)
     assertEquals(List(2, 4), testOutput(1).readValuesToList().asScala)
     assertEquals(List(3), testOutput(2).readValuesToList().asScala)
@@ -71,7 +67,7 @@ class KStreamSplitTest extends TestDriver {
     val sourceTopic = "source"
 
     val m = builder
-      .stream[Int, Int](sourceTopic)
+      .stream[Integer, Integer](sourceTopic)
       .split(Named.as("_"))
       .branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"), "consumedEvens"))
       .branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x), "mapped"))
@@ -80,18 +76,15 @@ class KStreamSplitTest extends TestDriver {
     m("_mapped").to("mapped")
 
     val testDriver = createTestDriver(builder)
-    val testInput = testDriver.createInput[Int, Int](sourceTopic)
-    testInput pipeKeyValueList List(
-      new KeyValue(1, 1),
-      new KeyValue(1, 2),
-      new KeyValue(1, 3),
-      new KeyValue(1, 4),
-      new KeyValue(1, 5),
-      new KeyValue(1, 9)
-    ).asJava
-
-    val even = testDriver.createOutput[Int, Int]("even")
-    val mapped = testDriver.createOutput[Int, Int]("mapped")
+    val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
+    testInput.pipeValueList(
+      List(1, 2, 3, 4, 5, 9)
+        .map(Integer.valueOf)
+        .asJava
+    )
+
+    val even = testDriver.createOutput[Integer, Integer]("even")
+    val mapped = testDriver.createOutput[Integer, Integer]("mapped")
 
     assertEquals(List(2, 4), even.readValuesToList().asScala)
     assertEquals(List(9, 81), mapped.readValuesToList().asScala)
@@ -105,7 +98,7 @@ class KStreamSplitTest extends TestDriver {
     val sourceTopic = "source"
 
     val m = builder
-      .stream[Int, Int](sourceTopic)
+      .stream[Integer, Integer](sourceTopic)
       .split(Named.as("_"))
       .branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even")))
       .branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x)))
@@ -114,23 +107,19 @@ class KStreamSplitTest extends TestDriver {
     m("_2").to("mapped")
 
     val testDriver = createTestDriver(builder)
-    val testInput = testDriver.createInput[Int, Int](sourceTopic)
-    testInput pipeKeyValueList List(
-      new KeyValue(1, 1),
-      new KeyValue(1, 2),
-      new KeyValue(1, 3),
-      new KeyValue(1, 4),
-      new KeyValue(1, 5),
-      new KeyValue(1, 9)
-    ).asJava
-
-    val even = testDriver.createOutput[Int, Int]("even")
-    val mapped = testDriver.createOutput[Int, Int]("mapped")
+    val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
+    testInput.pipeValueList(
+      List(1, 2, 3, 4, 5, 9)
+        .map(Integer.valueOf)
+        .asJava
+    )
+
+    val even = testDriver.createOutput[Integer, Integer]("even")
+    val mapped = testDriver.createOutput[Integer, Integer]("mapped")
 
     assertEquals(List(2, 4), even.readValuesToList().asScala)
     assertEquals(List(9, 81), mapped.readValuesToList().asScala)
 
     testDriver.close()
   }
-
 }
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
index 84b8c5ebd6..ac82c938d7 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
@@ -41,7 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -86,12 +85,12 @@ public class TestTopicsTest {
 
     @Test
     public void testValue() {
-        final TestInputTopic<Long, String> inputTopic =
-            testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
+        final TestInputTopic<String, String> inputTopic =
+            testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
         final TestOutputTopic<String, String> outputTopic =
             testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
-        //Feed word "Hello" to inputTopic, timestamp and key irrelevant in this case
-        inputTopic.pipeInput(1L, "Hello");
+        //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
+        inputTopic.pipeInput("Hello");
         assertThat(outputTopic.readValue(), equalTo("Hello"));
         //No more output in topic
         assertThat(outputTopic.isEmpty(), is(true));
@@ -99,20 +98,16 @@ public class TestTopicsTest {
 
     @Test
     public void testValueList() {
-        final TestInputTopic<Long, String> inputTopic =
-            testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
+        final TestInputTopic<String, String> inputTopic =
+            testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
         final TestOutputTopic<String, String> outputTopic =
             testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
-        final List<KeyValue<Long, String>> inputList = Arrays.asList(
-            new KeyValue<>(1L, "This"),
-            new KeyValue<>(2L, "is"),
-            new KeyValue<>(3L, "an"),
-            new KeyValue<>(4L, "example"));
-        //Feed list of words to inputTopic, key and timestamp are irrelevant in this case
-        inputTopic.pipeKeyValueList(inputList);
+        final List<String> inputList = Arrays.asList("This", "is", "an", "example");
+        //Feed list of words to inputTopic and no kafka key, timestamp is irrelevant in this case
+        inputTopic.pipeValueList(inputList);
         final List<String> output = outputTopic.readValuesToList();
         assertThat(output, hasItems("This", "is", "an", "example"));
-        assertThat(output, is(equalTo(inputList.stream().map(kv -> kv.value).collect(Collectors.toList()))));
+        assertThat(output, is(equalTo(inputList)));
     }
 
     @Test
@@ -229,8 +224,8 @@ public class TestTopicsTest {
             testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
         final TestOutputTopic<Long, String> outputTopic =
             testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
-        inputTopic.pipeInput(1L, "Hello", baseTime);
-        assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", null, baseTime))));
+        inputTopic.pipeInput(null, "Hello", baseTime);
+        assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", null, baseTime))));
 
         inputTopic.pipeInput(2L, "Kafka", ++baseTime);
         assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", null, baseTime))));
@@ -238,15 +233,13 @@ public class TestTopicsTest {
         inputTopic.pipeInput(2L, "Kafka", testBaseTime);
         assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime))));
 
-        final List<KeyValue<Long, String>> inputList = Arrays.asList(
-            new KeyValue<>(1L, "Advancing"),
-            new KeyValue<>(2L, "time"));
+        final List<String> inputList = Arrays.asList("Advancing", "time");
         //Feed list of words to inputTopic and no kafka key, timestamp advancing from testInstant
         final Duration advance = Duration.ofSeconds(15);
         final Instant recordInstant = testBaseTime.plus(Duration.ofDays(1));
-        inputTopic.pipeKeyValueList(inputList, recordInstant, advance);
-        assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Advancing", recordInstant))));
-        assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "time", null, recordInstant.plus(advance)))));
+        inputTopic.pipeValueList(inputList, recordInstant, advance);
+        assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Advancing", recordInstant))));
+        assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "time", null, recordInstant.plus(advance)))));
     }
 
     @Test
@@ -294,8 +287,8 @@ public class TestTopicsTest {
             testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, advance);
         final TestOutputTopic<Long, String> outputTopic =
             testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
-        inputTopic.pipeInput(1L, "Hello");
-        assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", testBaseTime))));
+        inputTopic.pipeInput("Hello");
+        assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", testBaseTime))));
         inputTopic.pipeInput(2L, "Kafka");
         assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime.plus(advance)))));
     }
@@ -339,12 +332,12 @@ public class TestTopicsTest {
 
     @Test
     public void testEmptyTopic() {
-        final TestInputTopic<Long, String> inputTopic =
-            testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
+        final TestInputTopic<String, String> inputTopic =
+            testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer());
         final TestOutputTopic<String, String> outputTopic =
             testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
         //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
-        inputTopic.pipeInput(1L, "Hello");
+        inputTopic.pipeInput("Hello");
         assertThat(outputTopic.readValue(), equalTo("Hello"));
         //No more output in topic
         assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Empty topic");