You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/01 18:54:30 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

cadonna commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r887131106


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java:
##########
@@ -111,7 +111,7 @@ public void shouldGetProcessAtSourceSensor() {
                 expectedParentSensor
         );
 
-        verifySensor(() -> ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, streamsMetrics));
+        verifySensor(() -> ProcessorNodeMetrics.recordsProcessedAtSourceSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, streamsMetrics));

Review Comment:
   `recordsProcessedAtSourceSensor()` does not exist and causes a compilation error.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java:
##########
@@ -136,6 +153,82 @@ public static Sensor processAtSourceSensor(final String threadId,
         );
     }
 
+    public static Sensor bytesConsumedSensor(final String threadId,

Review Comment:
   Could you please add unit tests?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -40,12 +40,12 @@
     private final Processor<KIn, VIn, KOut, VOut> processor;
     private final FixedKeyProcessor<KIn, VIn, VOut> fixedKeyProcessor;
     private final String name;
-    private final Time time;
+    protected final Time time;
 
     public final Set<String> stateStores;
 
-    private InternalProcessorContext<KOut, VOut> internalProcessorContext;
-    private String threadId;
+    protected InternalProcessorContext<KOut, VOut> internalProcessorContext;

Review Comment:
   I do not think you need to do this. You can simply use the context that is passed in into the `init()` method of `SinkNode`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -82,7 +107,12 @@ public void process(final Record<KIn, VIn> record) {
 
         final String topic = topicExtractor.extract(key, value, contextForExtraction);
 
-        collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner);
+        final int bytesProduced =
+            collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner);
+
+        // Just use the cached system time to avoid the clock lookup

Review Comment:
   You can also remove the comment.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -56,6 +61,26 @@ public void addChild(final ProcessorNode<Void, Void, ?, ?> child) {
 
     @Override
     public void init(final InternalProcessorContext<Void, Void> context) {
+        // It is important to first create the sensor before calling init on the
+        // parent object. Otherwise due to backwards compatibility an empty sensor
+        // without parent is created with the same name.
+        // Once the backwards compatibility is not needed anymore it might be possible to
+        // change this.

Review Comment:
   Nice, I just realized that you copied that comment from code that I wrote! 🙂 
   I guess the backwards compatibility was the one with the old metrics structure that we changed in KIP-444. We removed the old structure in 3.0, so I guess that this is an instance of comments that started to lie.
   Moreover, from where you copied the comment (I guess it was `SourceNode`) the sensor `processAtSourceSensor` has indeed a parent. I think the comment does not make sense in this class and we need to verify if it still makes sense in the other class. That does not need to be verifed in this PR.  
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -40,12 +40,12 @@
     private final Processor<KIn, VIn, KOut, VOut> processor;
     private final FixedKeyProcessor<KIn, VIn, VOut> fixedKeyProcessor;
     private final String name;
-    private final Time time;
+    protected final Time time;

Review Comment:
   Is this really needed? Could not find where is is used outside `ProcessorNode`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -56,6 +61,26 @@ public void addChild(final ProcessorNode<Void, Void, ?, ?> child) {
 
     @Override
     public void init(final InternalProcessorContext<Void, Void> context) {
+        // It is important to first create the sensor before calling init on the
+        // parent object. Otherwise due to backwards compatibility an empty sensor
+        // without parent is created with the same name.
+        // Once the backwards compatibility is not needed anymore it might be possible to
+        // change this.

Review Comment:
   I actually do not understand this comment. Backwards compatibility of what? What does "empty sensor without parent" mean? `recordsProducedSensor()` and `bytesProducedSensor()` do not create sensors with parents.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -40,12 +40,12 @@
     private final Processor<KIn, VIn, KOut, VOut> processor;
     private final FixedKeyProcessor<KIn, VIn, VOut> fixedKeyProcessor;
     private final String name;
-    private final Time time;
+    protected final Time time;
 
     public final Set<String> stateStores;
 
-    private InternalProcessorContext<KOut, VOut> internalProcessorContext;
-    private String threadId;
+    protected InternalProcessorContext<KOut, VOut> internalProcessorContext;
+    protected String threadId;

Review Comment:
   Also this does not seem to be strictly needed since in `SinkNode` you could use `Thread.currentThread().getName()` as in `init()` of `SourceNode`. I even think you need to use `Thread.currentThread().getName()` in `SinkNode` since you use `threadId` before the variable is set in `super.init()`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -71,6 +74,20 @@ public class RecordQueue {
             processorContext.taskId().toString(),
             processorContext.metrics()
         );
+        bytesConsumedSensor = ProcessorNodeMetrics.bytesConsumedSensor(
+            Thread.currentThread().getName(),

Review Comment:
   Could you store the thread name in a local variable and use that variable in all sensor retrieval methods?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -82,7 +107,12 @@ public void process(final Record<KIn, VIn> record) {
 
         final String topic = topicExtractor.extract(key, value, contextForExtraction);
 
-        collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner);
+        final int bytesProduced =
+            collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner);
+
+        // Just use the cached system time to avoid the clock lookup

Review Comment:
   I think it is fine as you do it since we do it similarly in other places like `processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());` in  `SourceNode`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -199,6 +199,7 @@ public <K, V> void send(final String topic,
                 log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
             }
         });
+        return keyBytes.length + valBytes.length;

Review Comment:
   Should the metric record the actually produced bytes or the bytes that were passed to the producer for sending? Here we would record the latter. The value of the metric might be higher than the actually produced bytes to the output topic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org