You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/11 05:50:40 UTC

[GitHub] [kafka] ableegoldman commented on a diff in pull request #12836: KAFKA-14282: stop tracking Produced sensors by processor node id

ableegoldman commented on code in PR #12836:
URL: https://github.com/apache/kafka/pull/12836#discussion_r1019854586


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -219,26 +219,20 @@ public <K, V> void send(final String topic,
                 }
 
                 if (!topic.endsWith("-changelog")) {
-                    final Map<String, Sensor> producedSensorByTopic = sinkNodeToProducedSensorByTopic.get(processorNodeId);
-                    if (producedSensorByTopic == null) {
-                        log.error("Unable to records bytes produced to topic {} by sink node {} as the node is not recognized.\n"
-                                      + "Known sink nodes are {}.", topic, processorNodeId, sinkNodeToProducedSensorByTopic.keySet());
-                    } else {
-                        // we may not have created a sensor during initialization if the node uses dynamic topic routing,
-                        // as all topics are not known up front, so create the sensor for that topic if absent
-                        final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
+                    // we may not have created a sensor during initialization if the node uses dynamic topic routing,
+                    // as all topics are not known up front, so create the sensor for this topic if absent
+                    final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
+                        topic,
+                        t -> TopicMetrics.producedSensor(
+                            Thread.currentThread().getName(),
+                            taskId.toString(),
+                            processorNodeId,
                             topic,
-                            t -> TopicMetrics.producedSensor(
-                                Thread.currentThread().getName(),
-                                taskId.toString(),
-                                processorNodeId,
-                                topic,
-                                context.metrics()
-                            )
-                        );
-                        final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
-                        topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs());
-                    }
+                            context.metrics()
+                        )
+                    );
+                    final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
+                    topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs());

Review Comment:
   Ok I went through the code to double check whether it's possible for the context to actually be `null` in the non-testing code, and I'm pretty sure it should always have been set before we try to send any records -- probably we were just being lazy in the tests (although some of them do pass in a non-null context, so I'm not sure why others don't)
   
   Anyways, I fixed the tests -- lmk if there's anything else, otherwise I'll merge this once the PR build finishes running



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org