You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/16 20:25:36 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

vvcephei commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441118241



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
##########
@@ -337,7 +307,7 @@ private static Sensor throughputAndLatencySensorWithParent(final String threadId
             descriptionOfCount,
             descriptionOfAvgLatency,
             descriptionOfMaxLatency,
-            RecordingLevel.DEBUG,
+            recordingLevel,

Review comment:
       We erroneously ignored the provided recordingLevel and set them to debug. It didn't manifest because this method happens to always be called with a recordingLevel of debug anyway.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -160,18 +162,21 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());

Review comment:
       Standby tasks don't currently register any sensors, but I personally rather to be defensive and idempotently ensure we remove any sensors while closing.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -235,7 +235,7 @@ public StateStore getStateStore(final String name) {
         setCurrentNode(child);
         child.process(key, value);
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());

Review comment:
       This previously relied on a lookup of the actual current system time. I thought we decided to use the cached system time. Can you set me straight, @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -400,20 +402,20 @@ public void shouldRecordBufferedRecords() {
 
         final KafkaMetric metric = getMetric("active-buffer", "%s-count", task.id().toString(), StreamsConfig.METRICS_LATEST);
 
-        assertThat(metric.metricValue(), equalTo(0.0d));
+        assertThat(metric.metricValue(), equalTo(0.0));

Review comment:
       Just cleaning up some oddball literals.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -454,24 +456,7 @@ public void shouldRecordE2ELatencyOnProcessForSourceNodes() {
         task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L)));
         task.process(100L);
 
-        assertThat(maxMetric.metricValue(), equalTo(100d));
-    }
-
-    @Test
-    public void shouldRecordE2ELatencyOnProcessForTerminalNodes() {
-        time = new MockTime(0L, 0L, 0L);
-        metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
-        task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
-
-        final String terminalNode = processorStreamTime.name();
-
-        final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNode, StreamsConfig.METRICS_LATEST);
-
-        // e2e latency = 100
-        time.setCurrentTimeMs(100L);
-        task.maybeRecordE2ELatency(0L, terminalNode);

Review comment:
       This test wasn't really testing the "terminal node" code path in ProcessorContextImpl, just that this overload actually fetches the current system time. Since I removed the overload, we don't need the test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -675,27 +670,6 @@ public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
                 tags),
             new Max()
         );
-
-        sensor.add(

Review comment:
       Dropped the percentiles metric.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -150,18 +151,18 @@ public StreamTask(final TaskId id,
         punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
         bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
 
-        for (final String terminalNode : topology.terminalNodes()) {
+        for (final String terminalNodeName : topology.terminalNodes()) {
             e2eLatencySensors.put(
-                terminalNode,
-                ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, terminalNode, RecordingLevel.INFO, streamsMetrics)
+                terminalNodeName,
+                TaskMetrics.e2ELatencySensor(threadId, taskId, terminalNodeName, RecordingLevel.INFO, streamsMetrics)

Review comment:
       Fixes the sensor leak by simply registering these as task-level sensors. Note the node name is still provided to scope the sensors themselves.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -456,12 +457,14 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());

Review comment:
       We previously relied on the task manager to remove these sensors before calling close, but forgot to do it before recycling. In retrospect, it's better to do it within the same class that creates the sensors to begin with.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1806,11 +1793,19 @@ public void shouldRecycleTask() {
         task.initializeIfNeeded();
         task.completeRestoration();
 
+        assertThat(getTaskMetrics(), not(empty()));
+
         task.closeAndRecycleState();
 
+        assertThat(getTaskMetrics(), empty());

Review comment:
       Verified this fails on trunk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org