You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/16 20:14:50 UTC

[GitHub] [kafka] vvcephei opened a new pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

vvcephei opened a new pull request #8882:
URL: https://github.com/apache/kafka/pull/8882


   * Remove problematic Percentiles measurements until the implementation is fixed
   * Fix leaking e2e metrics when task is closed
   * Fix leaking metrics when tasks are recycled
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8882:
URL: https://github.com/apache/kafka/pull/8882


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441781498



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -133,6 +142,25 @@ public static Sensor activeBufferedRecordsSensor(final String threadId,
         return sensor;
     }
 
+    public static Sensor e2ELatencySensor(final String threadId,
+                                          final String taskId,
+                                          final String processorNodeId,
+                                          final RecordingLevel recordingLevel,
+                                          final StreamsMetricsImpl streamsMetrics) {
+        final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY;
+        final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, sensorName, recordingLevel);
+        final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
+        addMinAndMaxToSensor(
+            sensor,
+            PROCESSOR_NODE_LEVEL_GROUP,

Review comment:
       @vvcephei I'm not familiar enough with the metrics classification to know if this will be an issue or just an oddity, but we now have allegedly task-level metrics but with the processor-node-level tags/grouping. It's kind of a "task metric in implementation, processor node metric in interface" -- might be confusing for us but should be alright for users, yeah?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441169327



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -160,18 +162,21 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());

Review comment:
       I'd like to inline `close(boolean)`, but am resisting the urge... This is a compromise ;) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441782286



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -133,6 +142,25 @@ public static Sensor activeBufferedRecordsSensor(final String threadId,
         return sensor;
     }
 
+    public static Sensor e2ELatencySensor(final String threadId,
+                                          final String taskId,
+                                          final String processorNodeId,
+                                          final RecordingLevel recordingLevel,
+                                          final StreamsMetricsImpl streamsMetrics) {
+        final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY;
+        final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, sensorName, recordingLevel);
+        final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
+        addMinAndMaxToSensor(
+            sensor,
+            PROCESSOR_NODE_LEVEL_GROUP,

Review comment:
       We give it the task sensor prefix which becomes part of the full sensor name, rather than the processor node prefix




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441169062



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -235,7 +235,7 @@ public StateStore getStateStore(final String name) {
         setCurrentNode(child);
         child.process(key, value);
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());

Review comment:
       Ok, but right now, this is an INFO level metric, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441118241



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
##########
@@ -337,7 +307,7 @@ private static Sensor throughputAndLatencySensorWithParent(final String threadId
             descriptionOfCount,
             descriptionOfAvgLatency,
             descriptionOfMaxLatency,
-            RecordingLevel.DEBUG,
+            recordingLevel,

Review comment:
       We erroneously ignored the provided recordingLevel and set them to debug. It didn't manifest because this method happens to always be called with a recordingLevel of debug anyway.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -160,18 +162,21 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());

Review comment:
       Standby tasks don't currently register any sensors, but I personally rather to be defensive and idempotently ensure we remove any sensors while closing.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -235,7 +235,7 @@ public StateStore getStateStore(final String name) {
         setCurrentNode(child);
         child.process(key, value);
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());

Review comment:
       This previously relied on a lookup of the actual current system time. I thought we decided to use the cached system time. Can you set me straight, @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -400,20 +402,20 @@ public void shouldRecordBufferedRecords() {
 
         final KafkaMetric metric = getMetric("active-buffer", "%s-count", task.id().toString(), StreamsConfig.METRICS_LATEST);
 
-        assertThat(metric.metricValue(), equalTo(0.0d));
+        assertThat(metric.metricValue(), equalTo(0.0));

Review comment:
       Just cleaning up some oddball literals.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -454,24 +456,7 @@ public void shouldRecordE2ELatencyOnProcessForSourceNodes() {
         task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L)));
         task.process(100L);
 
-        assertThat(maxMetric.metricValue(), equalTo(100d));
-    }
-
-    @Test
-    public void shouldRecordE2ELatencyOnProcessForTerminalNodes() {
-        time = new MockTime(0L, 0L, 0L);
-        metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
-        task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
-
-        final String terminalNode = processorStreamTime.name();
-
-        final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNode, StreamsConfig.METRICS_LATEST);
-
-        // e2e latency = 100
-        time.setCurrentTimeMs(100L);
-        task.maybeRecordE2ELatency(0L, terminalNode);

Review comment:
       This test wasn't really testing the "terminal node" code path in ProcessorContextImpl, just that this overload actually fetches the current system time. Since I removed the overload, we don't need the test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -675,27 +670,6 @@ public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
                 tags),
             new Max()
         );
-
-        sensor.add(

Review comment:
       Dropped the percentiles metric.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -150,18 +151,18 @@ public StreamTask(final TaskId id,
         punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
         bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
 
-        for (final String terminalNode : topology.terminalNodes()) {
+        for (final String terminalNodeName : topology.terminalNodes()) {
             e2eLatencySensors.put(
-                terminalNode,
-                ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, terminalNode, RecordingLevel.INFO, streamsMetrics)
+                terminalNodeName,
+                TaskMetrics.e2ELatencySensor(threadId, taskId, terminalNodeName, RecordingLevel.INFO, streamsMetrics)

Review comment:
       Fixes the sensor leak by simply registering these as task-level sensors. Note the node name is still provided to scope the sensors themselves.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -456,12 +457,14 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());

Review comment:
       We previously relied on the task manager to remove these sensors before calling close, but forgot to do it before recycling. In retrospect, it's better to do it within the same class that creates the sensors to begin with.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1806,11 +1793,19 @@ public void shouldRecycleTask() {
         task.initializeIfNeeded();
         task.completeRestoration();
 
+        assertThat(getTaskMetrics(), not(empty()));
+
         task.closeAndRecycleState();
 
+        assertThat(getTaskMetrics(), empty());

Review comment:
       Verified this fails on trunk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441170578



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -235,7 +235,7 @@ public StateStore getStateStore(final String name) {
         setCurrentNode(child);
         child.process(key, value);
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());

Review comment:
       This will probably need to get refactored when you do the second PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#issuecomment-645414361


   cherry-picked to 2.6


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441175266



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
##########
@@ -366,6 +369,37 @@ public void shouldThrowOnCloseCleanCheckpointError() {
         EasyMock.replay(stateManager);
     }
 
+    @Test
+    public void shouldUnregisterMetricsInCloseClean() {
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
+        EasyMock.replay(stateManager);
+
+        task = createStandbyTask();
+        task.initializeIfNeeded();
+
+        task.suspend();
+        task.closeClean();
+        // Currently, there are no metrics registered for standby tasks.
+        // This is a regression test so that, if we add some, we will be sure to deregister them.
+        assertThat(getTaskMetrics(), empty());
+    }
+
+    @Test
+    public void shouldUnregisterMetricsInCloseDirty() {
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
+        EasyMock.replay(stateManager);
+
+        task = createStandbyTask();
+        task.initializeIfNeeded();
+
+        task.suspend();
+        task.closeDirty();
+
+        // Currently, there are no metrics registered for standby tasks.
+        // This is a regression test so that, if we add some, we will be sure to deregister them.

Review comment:
       🙏 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#issuecomment-645126850


   Rebased on trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441161523



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -675,27 +670,6 @@ public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
                 tags),
             new Max()
         );
-
-        sensor.add(

Review comment:
       Github won't let me comment on these lines, but we should remove the two percentiles-necessitated constants above (`PERCENTILES_SIZE_IN_BYTES` and `MAXIMUM_E2E_LATENCY`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -160,18 +162,21 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());

Review comment:
       Sounds good. But why do it both here and in `closeDirty` vs doing so in `close(clean)`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -235,7 +235,7 @@ public StateStore getStateStore(final String name) {
         setCurrentNode(child);
         child.process(key, value);
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());

Review comment:
       Oh, hm, I thought we decided to push the stateful-node-level metrics to TRACE so we could get the actual time at each node without a (potential) performance hit. But with the INFO-level metrics it would be ok since we're only updating it twice per process.
   But maybe I'm misremembering...I suppose ideally we could run some benchmarks for both cases and see if it really makes a difference...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -456,12 +457,14 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());

Review comment:
       Agreed, we should clean up anything we created in the same class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441172228



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -160,18 +162,21 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());

Review comment:
       Fair enough. I thought the answer might be something like that...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441171975



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -235,7 +235,7 @@ public StateStore getStateStore(final String name) {
         setCurrentNode(child);
         child.process(key, value);
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());

Review comment:
       Yeah. I'm just not 100% sure we all agreed it was alright to get the actual system time even for the task-level metrics ... so we should probably stick with the cached time for now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#issuecomment-645405482


   All failures unrelated (were different in each build):
   ```
   org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore
   kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8882: KAFKA-10165: Remove Percentiles from e2e metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441169628



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -675,27 +670,6 @@ public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
                 tags),
             new Max()
         );
-
-        sensor.add(

Review comment:
       Ah, missed those. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org