You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/20 04:15:41 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8697: KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

ableegoldman opened a new pull request #8697:
URL: https://github.com/apache/kafka/pull/8697


   Kicking this off with just the min and max sensors for the INFO-level task metrics.
   
   There is an existing `Percentiles` sensor implementation, but I'm having trouble convincing myself that it's correct* so I decided to just get the trivial stuff out of the way while I look into this more deeply.
   
   *The `Percentiles` sensor actually isn't used anywhere in the project at the moment, nor was it at any point in the past AFAICT. I'm working on testing and fixing some minor bugs that I found already, hopefully all the bugs are "minor"...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429457310



##########
File path: checkstyle/suppressions.xml
##########
@@ -183,6 +183,8 @@
               files="StreamsPartitionAssignor.java"/>
     <suppress checks="JavaNCSS"
               files="EosBetaUpgradeIntegrationTest.java"/>
+    <suppress checks="StaticVariableName"

Review comment:
       Checkstyle won't allow you to have a letter and number next to each other, but `P90` and `E2E_LATENCY` seem preferable to `P_90` and `E_2_E_LATENCY`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429505441



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       I was debating this...my thinking here was that a negative value probably means you're processing some records with "future" timestamps, for whatever reason, in which case the e2e latency isn't meaningful and they shouldn't affect the statistics. 
   Or, your clocks are out of sync. I suppose we could add a separate metric that counts the number of records with negative e2e latency?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-632971229


   spotBugs is currently failing with a mysterious and uninformative exception so there's no use kicking off the tests again until I figure out what's going on. Might be a spotBugs bug ... in local testing I have seen it failing non deterministically... 😒 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429461550



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
##########
@@ -492,6 +493,52 @@ public void testPercentiles() {
         assertEquals(75, (Double) p75.metricValue(), 1.0);
     }
 
+    @Test
+    public void testPercentilesWithRandomNumbersAndLinearBucketing() {
+        long seed = new Random().nextLong();
+        int sizeInBytes = 1000 * 1000;   // 1MB
+        long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days
+
+        try {
+            Random prng = new Random(seed);
+            int numberOfValues = 5000 + prng.nextInt(10_000);  // ranges is [5000, 15000]
+
+            Percentiles percs = new Percentiles(sizeInBytes,
+                                                maximumValue,
+                                                BucketSizing.LINEAR,
+                                                new Percentile(metrics.metricName("test.p90", "grp1"), 90),
+                                                new Percentile(metrics.metricName("test.p99", "grp1"), 99));
+            MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+            Sensor sensor = metrics.sensor("test", config);
+            sensor.add(percs);
+            Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
+            Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
+
+            final List<Long> values = new ArrayList<>(numberOfValues);
+            // record two windows worth of sequential values
+            for (int i = 0; i < numberOfValues; ++i) {
+                long value = Math.abs(prng.nextLong()) % maximumValue;
+                values.add(value);
+                sensor.record(value);
+            }
+
+            Collections.sort(values);
+
+            int p90Index = (int)Math.ceil(((double)(90 * numberOfValues)) / 100);
+            int p99Index = (int)Math.ceil(((double)(99 * numberOfValues)) / 100);
+
+            double expectedP90 = values.get(p90Index - 1);
+            double expectedP99 = values.get(p99Index - 1);
+
+            assertEquals(expectedP90, (Double) p90.metricValue(), expectedP90 / 10);

Review comment:
       Trying to build confidence in the `Percentiles` implementation and gauge the accuracy with a more complicated test.
   I found it was accurate to within 5% maybe 2/3 or 3/4 of the time, but it seems reasonable to expect it to be accurate to within 10%




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429457310



##########
File path: checkstyle/suppressions.xml
##########
@@ -183,6 +183,8 @@
               files="StreamsPartitionAssignor.java"/>
     <suppress checks="JavaNCSS"
               files="EosBetaUpgradeIntegrationTest.java"/>
+    <suppress checks="StaticVariableName"

Review comment:
       Checkstyle won't allow you to have a letter and number next to each other, but in the case of `E2E_LATENCY` I feel that's a lot easier to understand than `E_2_E_LATENCY`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429457310



##########
File path: checkstyle/suppressions.xml
##########
@@ -183,6 +183,8 @@
               files="StreamsPartitionAssignor.java"/>
     <suppress checks="JavaNCSS"
               files="EosBetaUpgradeIntegrationTest.java"/>
+    <suppress checks="StaticVariableName"

Review comment:
       Checkstyle won't allow you to have a letter and number next to each other, but in the case of `E2E_LATENCY` I feel that's a lot easier to understand that `E_2_E_LATENCY`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429396524



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -137,6 +138,7 @@ public StreamTask(final TaskId id,
         }
         processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
         processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
+        recordE2ELatencySensor = TaskMetrics.recordE2ELatencySensor(threadId, taskId, streamsMetrics);

Review comment:
       I modified the proposal slightly to make these all processor-node level (will push the changes in a minute) but this question is still relevant, so here's the answer:
   We can't record the e2e latency in the sink node because not all topologies _have_ a sink node. For that reason we also can't record at the record collector. We need to figure out the terminal nodes when processing the topology, then record this metric after `child.process` in `ProcessorContext#forward`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r430707581



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -223,6 +223,9 @@ public StateStore getStateStore(final String name) {
                                 final V value) {
         setCurrentNode(child);
         child.process(key, value);
+        if (child.children().isEmpty()) {

Review comment:
       ack

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
##########
@@ -492,6 +493,52 @@ public void testPercentiles() {
         assertEquals(75, (Double) p75.metricValue(), 1.0);
     }
 
+    @Test
+    public void testPercentilesWithRandomNumbersAndLinearBucketing() {
+        long seed = new Random().nextLong();
+        int sizeInBytes = 1000 * 1000;   // 1MB
+        long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days
+
+        try {
+            Random prng = new Random(seed);
+            int numberOfValues = 5000 + prng.nextInt(10_000);  // ranges is [5000, 15000]
+
+            Percentiles percs = new Percentiles(sizeInBytes,
+                                                maximumValue,
+                                                BucketSizing.LINEAR,
+                                                new Percentile(metrics.metricName("test.p90", "grp1"), 90),
+                                                new Percentile(metrics.metricName("test.p99", "grp1"), 99));
+            MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+            Sensor sensor = metrics.sensor("test", config);
+            sensor.add(percs);
+            Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
+            Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
+
+            final List<Long> values = new ArrayList<>(numberOfValues);
+            // record two windows worth of sequential values
+            for (int i = 0; i < numberOfValues; ++i) {
+                long value = Math.abs(prng.nextLong()) % maximumValue;

Review comment:
       I wasn't going for a uniform distribution, just any non-pidgeonholed distribution (see existing `testPercentiles` for comparison). I was just trying to verify the basic validity and get a rough estimate on the accuracy here in case it turned out to be 5000% off. 
   
   Good point about the overflow though. Pretty annoying that you can't give a bound for `nextLong` like you can with `nextInt` :/ 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
             log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode, wallClockTime);
+            maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());

Review comment:
       We discussed this offline, but in case anyone else was wondering:
   
   Yes. We can't record the latency _after_ processing for source nodes due to our recursive DFS approach to processing, as the source node's `#process` actually doesn't complete until the record has been processed by every other node in the subtopology. And anyways, the intent of the source node metric is to gauge the e2e latency when the record arrives at the subtopology, which is what we are recording here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days

Review comment:
       @cadonna @vvcephei @mjsax We have several related discussions going on across this PR so I'm just going to try and summarize here: let me know if I miss anything you still feel is important
   
   The plan is to pin large/small values in the percentiles to the min/max for now and just log a warning. Since we're the only users of the `Percentiles` class, we can just modify it directly and avoid restricting the values for the min/max metrics as John mentioned above. If a user is experiencing small negative e2e latencies it's likely due to clock drift, and approximating as 0 seems reasonable. If they're experiencing large negative e2e latencies, there's clearly something odd going on and the e2e latency percentiles aren't meaningful. But it will still show up in the min metric and alert them to this. Presumably users may be interested to know.
   
   I'd like to avoid introducing a config in particular because the maximum isn't an inherent mathematical property of percentiles (obviously), it's just an artifact of the existing percentiles algorithm. We can improve this and presumably remove the requirement to set a static max, but I felt the algorithm was "good enough" for now (and didn't want to make large scale changes and/or rewrite it entirely right before the 2.6 deadline).
   
   In sum I'd say the guiding principle for this PR and the initial metrics was to be useful without being misleadingly wrong. I think pinning the percentiles to the bounds but reporting the min/max as is achieves this, and allows us flexibility in improving the situation later




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634823280


   Unbelievable. The java 8 build started running right after I mentioned that it's not running.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429499839



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {

Review comment:
       Not sure about this... Why do we need/want to have a limit?
   
   Nit: double space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429505029



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days

Review comment:
       10 days was just rounding up from the 7 day default retention limit. The maximum is due to the percentiles calculation which is based on incrementally sized buckets. It's a tradeoff with accuracy
   
   For example if I increase it by a factor of 1000, the `StreamTask` percentiles test is off by almost 20% (p99 is 82.9 instead of 99). This test uses values between 0 and 100, which is probably considerably lower than most e2e latencies will be.If you look at the `MetricsTest` percentiles test I added, this uses random values up to the max value and can maintain the 10% accuracy up to a higher max value. 
   
   Of course we don't know what the distribution will be, but it seems likely to be somewhere in the middle (not in the 100s of ms, not in the 10s or 1000s of days) so for reasonable accuracy we need to pick a reasonable maximum. We can definitely go higher than 10 days, but I reasoned that if you have records earlier than 10 days you're probably processing historical data and in that case the e2e latency isn't that meaningful. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8697: KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-631225868


   call for review @guozhangwang @mjsax @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-632468421


   Actually this needs to be reworked to account for the case there is no sink node. Also got the Percentiles working so I'll add them back to this PR and call for review again when ready


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r430753853



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days

Review comment:
       Thanks for the details. Avoiding a config for now sounds good to me. This leave the path open to add a config later, or as John suggested to maybe change the algorithm (that might not need a max). I am fine with a hard coded max of 10 days.
   
   Also +1 to John's suggestion to split percentiles and min/max to avoid applying the hard coded limed to min/max metric.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       I would not record a negative latency. That seems to be kinda weird. I am fine with skipping and warning, too. Just wanted to clarify.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
             log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode, wallClockTime);
+            maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());

Review comment:
       > o whether we measure these nodes before or "after" their processing logic should make no practical difference at all.
   
   I think it make a big difference, and only recording _before_ processing is what we want (according to what the KIP says). Otherwise, the latency includes the processing time for one or more processors (in the worst case even all processors).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429470290



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static double MAXIMUM_E2E_LATENCY = 100 * 24 * 60 * 60 * 1000d; // maximum latency is 1000 days

Review comment:
       Not sure if we might want to make it configurable though? Or just pick a number and see if anyone complains?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r428933011



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -86,6 +87,14 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+    static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =
+        "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+            + "system time when it has been fully processed by the task";

Review comment:
       Oh right, I put the `record` in the wrong place but this description is correct. It should record at the `RecordCollector` for the task-level metrics




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-632960947


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r430846939



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
             log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode, wallClockTime);
+            maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());

Review comment:
       Sorry for my ambiguity. Please let me clarify my terms. Currently if you wait until the end of the "process" method, you wind up including the call to forward, which recursively calls process on all descendents of the source node. This is _not_ what I was talking about. I meant only the time spent _just_ in processing the SourceNode, excluding the time in "forward". What shall we call this? Maybe "actual", or "proper", or "internal" processing time?
   
   So, my comment was that, given that we know the implementation of SourceNode, we know that it's "actual", "proper", "internal" processing time is going to be very small, probably far less than a single millisecond. So it doesn't make any practical difference whether we measure before the call for just the special case of source nodes, or magically solve the problem of measuring the e2e latency after internal processing, but not including the calls to "forward".
   
   This is why I think it's fine to measure SourceNodes _before_ the call to process, even though the KIP technically specifies that processors' end-to-end latencies should include processing latency. We're making a simplifying assumption that for source nodes specifically, the processing latency would be `<< 1`, so we can ignore it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429499887



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       For this case, should we record "zero" instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429469651



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static double MAXIMUM_E2E_LATENCY = 100 * 24 * 60 * 60 * 1000d; // maximum latency is 1000 days

Review comment:
       Want to call attention to these...do they seem reasonable? The size is the bytes per each percentile sensor, so 2 per source or terminal node. The minimum has to be 0 for the linear bucketing (which I found significantly more accurate than constant bucketing in my tests).
   On the other hand, the maximum is obviously not representative of the maximum difference between the current and record timestamp. If someone's processing historical data, it can exceed this. But I figure if you're processing historical data than the e2e latency isn't really going to be at all useful anyways, so we may as well set it to something reasonable 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429904772



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -642,6 +651,55 @@ public static void addAvgAndMaxToSensor(final Sensor sensor,
         );
     }
 
+    public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,

Review comment:
       req: Please add unit tests in `StreamsMetricsImplTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634329391






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429158357



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -642,6 +642,30 @@ public static void addAvgAndMaxToSensor(final Sensor sensor,
         );
     }
 
+    public static void addMaxAndMinToSensor(final Sensor sensor,

Review comment:
       prop (super-nit): Could you call the method `addMinAndMaxToSensor()` since we have already one method that is called `addAvgAndMinAndMaxToSensor()`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##########
@@ -356,4 +354,27 @@ public void shouldGetDroppedRecordsSensorOrLateRecordDropSensor() {
             shouldGetDroppedRecordsSensor();
         }
     }
+
+    @Test
+    public void shouldGetRecordE2ELatencySensor() {
+        final String operation = "record-e2e-latency";
+        expect(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.INFO))
+            .andReturn(expectedSensor);
+        expect(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).andReturn(tagMap);
+        StreamsMetricsImpl.addMaxAndMinToSensor(
+            expectedSensor,
+            TASK_LEVEL_GROUP,
+            tagMap,
+            operation,
+            RECORD_E2E_LATENCY_MAX_DESCRIPTION,

Review comment:
       req: Please do not use the constant here. The point of this test is also to check the correctness of the description, i.e., the content of that constant.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##########
@@ -14,15 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals.metrics;
+package org.apache.kafka.streams.processor.internals.metrics;

Review comment:
       Yes, very nice!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -86,6 +87,14 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+    static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =
+        "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+            + "system time when it has been fully processed by the task";
+    static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION =

Review comment:
       See my comment above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -86,6 +87,14 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+    static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =

Review comment:
       req: Please define this constant as private as all the others. I left a request in `TaskMetricsTest` which makes this request clearer.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -137,6 +138,7 @@ public StreamTask(final TaskId id,
         }
         processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
         processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
+        recordE2ELatencySensor = TaskMetrics.recordE2ELatencySensor(threadId, taskId, streamsMetrics);

Review comment:
       Q: Is there a specific reason to init the sensor here and not in `SinkNode`? You can init and store it there. That was one motivation to make `*Metrics` classes (e.g. `TaskMetrics`) static, so that you do not need any code in the processor context to get specific sensors. If there is not specific reason, you could get rid of the changes in the `*Context*` classes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429814548



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
##########
@@ -260,6 +261,43 @@ public void shouldGetProcessAtSourceSensorOrForwardSensor() {
         }
     }
 
+    @Test
+    public void shouldGetRecordE2ELatencySensor() {
+        final String operation = "record-e2e-latency";
+        final String recordE2ELatencyMinDescription =
+            "The minimum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+                + "system time when it has been fully processed by the node";
+        final String recordE2ELatencyMaxDescription =
+            "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+                + "system time when it has been fully processed by the node";
+        final String recordE2ELatencyP99Description =
+            "The 99th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the "
+                + "system time when it has been fully processed by the node";
+        final String recordE2ELatencyP90Description =
+            "The 90th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the "
+                + "system time when it has been fully processed by the node";
+        expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, operation, RecordingLevel.INFO))
+            .andReturn(expectedSensor);
+        expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
+        StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor(
+            expectedSensor,
+            PROCESSOR_NODE_LEVEL_GROUP,
+            tagMap,
+            operation,
+            recordE2ELatencyMinDescription,
+            recordE2ELatencyMaxDescription,
+            recordE2ELatencyP99Description,
+            recordE2ELatencyP90Description
+        );
+
+        replay(StreamsMetricsImpl.class, streamsMetrics);
+
+        final Sensor sensor = ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics);
+
+        verify(StreamsMetricsImpl.class, streamsMetrics);
+        assertThat(sensor, is(expectedSensor));

Review comment:
       req:
   ```suggestion
           verifySensor(() -> ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics));
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -50,6 +51,13 @@ public ProcessorTopology(final List<ProcessorNode<?, ?>> processorNodes,
         this.globalStateStores = Collections.unmodifiableList(globalStateStores);
         this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic);
         this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics);
+
+        this.terminalNodes = new HashSet<>();
+        for (final ProcessorNode<?, ?> node : processorNodes) {
+            if (node.children().isEmpty()) {

Review comment:
       See my comment in `ProcessorContextImpl`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {

Review comment:
       Q: Wouldn't it be better to count measurements beyond the maximum latency towards the highest bucket as the `Percentiles` metric does?
   Admittedly, the measured value would be quite wrong in the case of a lot of measurements greater than the maximum latency. However, with the sizes of the buckets that increase linearly, the reported values would be quite wrong anyways due to the increased approximation error. Furthermore, I guess users would put an alert on substantially smaller values.
   OTOH, not counting measurements beyond the maximum latency would falsify a bit the metric because they would not count towards the remaining 1% or 10% (for p99 and p90, respectively). Additionally, the max metric would also be falsified by not counting those measurements.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -223,6 +223,9 @@ public StateStore getStateStore(final String name) {
                                 final V value) {
         setCurrentNode(child);
         child.process(key, value);
+        if (child.children().isEmpty()) {

Review comment:
       prop: For the sake of readability, could you extract this check to a method named `isTerminalNode()`? Even better would be to add a method named `isTerminalNode()` to `ProcessorNode` and use it here and in `ProcessorTopology`.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -547,6 +624,28 @@ private KafkaMetric getMetric(final String operation,
         ));
     }
 
+    private KafkaMetric getProcessorMetric(final String operation,
+                                           final String nameFormat,
+                                           final String taskId,
+                                           final String processorNodeId,
+                                           final String builtInMetricsVersion) {
+        final String descriptionIsNotVerified = "";
+        return metrics.metrics().get(metrics.metricName(
+            String.format(nameFormat, operation),
+            "stream-processor-node-metrics",
+            descriptionIsNotVerified,
+            mkMap(
+                mkEntry("task-id", taskId),
+                mkEntry("processor-node-id", processorNodeId),
+                mkEntry(
+                    StreamsConfig.METRICS_LATEST.equals(builtInMetricsVersion) ? THREAD_ID_TAG
+                        : THREAD_ID_TAG_0100_TO_24,
+                    Thread.currentThread().getName()
+                )
+            )
+        ));
+    }
+

Review comment:
       prop: Take a look into `StreamsTestUtils` and see what you can re-use there to retrieve specific metrics.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days

Review comment:
       I understand that we need a maximum due to the way the percentiles are approximated. Since the e2e latency depends on user requirements, it would make sense to consider a config for the max latency. I see two reasons for such a config. 
   
   1. We always think about near-realtime use cases, but there could also be use cases that are allowed to provide a much higher latency but the latency should still be within a certain limit. For example, one were producers are not always online. Admittedly, 10 days is already quite high.
   
   2. OTOH, decreasing the max latency would also make the metric more accurate, AFAIU. That would also be a reason for a config that users can tweak.
   
   For both cases, we could leave it like it is for now and see if there is really the need for such a config. WDYT? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       @ableegoldman I agree with your thinking here. IMO, we should just log the warning for now. If we see that there is a demand for such a metric, we can add it later on.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-632961017


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-632758838


   Build failed with checkstyle:
   ```
   [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java:23:8: Unused import - org.apache.kafka.common.metrics.Measurable. [UnusedImports]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429499547



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
##########
@@ -492,6 +493,52 @@ public void testPercentiles() {
         assertEquals(75, (Double) p75.metricValue(), 1.0);
     }
 
+    @Test
+    public void testPercentilesWithRandomNumbersAndLinearBucketing() {
+        long seed = new Random().nextLong();
+        int sizeInBytes = 1000 * 1000;   // 1MB
+        long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days
+
+        try {
+            Random prng = new Random(seed);
+            int numberOfValues = 5000 + prng.nextInt(10_000);  // ranges is [5000, 15000]
+
+            Percentiles percs = new Percentiles(sizeInBytes,
+                                                maximumValue,
+                                                BucketSizing.LINEAR,
+                                                new Percentile(metrics.metricName("test.p90", "grp1"), 90),
+                                                new Percentile(metrics.metricName("test.p99", "grp1"), 99));
+            MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+            Sensor sensor = metrics.sensor("test", config);
+            sensor.add(percs);
+            Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
+            Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
+
+            final List<Long> values = new ArrayList<>(numberOfValues);
+            // record two windows worth of sequential values
+            for (int i = 0; i < numberOfValues; ++i) {
+                long value = Math.abs(prng.nextLong()) % maximumValue;
+                values.add(value);
+                sensor.record(value);
+            }
+
+            Collections.sort(values);
+
+            int p90Index = (int)Math.ceil(((double)(90 * numberOfValues)) / 100);
+            int p99Index = (int)Math.ceil(((double)(99 * numberOfValues)) / 100);
+
+            double expectedP90 = values.get(p90Index - 1);
+            double expectedP99 = values.get(p99Index - 1);
+
+            assertEquals(expectedP90, (Double) p90.metricValue(), expectedP90 / 10);

Review comment:
       Let's see...
   
   But I am wondering if the condition is right? Should it be:
   ```
   assertTrue("Expected +/- 10% accuracy for exact value of " + expectedP90, expectedP90 * 0.9 <= (Double) p90.metricValue() && (Double) p90.metricValue() <= expectedP90 * 1.1);
   ```
   
   Also possible to split into two asserts of course, but using `assertEquals` should not work?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429500010



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days

Review comment:
       Not sure about this. Why do we need a maximum to begin with? And why pick 10 days? Rather arbitrary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r428930276



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -420,6 +422,50 @@ public void shouldRecordProcessRatio() {
         assertThat(metric.metricValue(), equalTo(1.0d));
     }
 
+    @Test
+    public void shouldRecordE2ELatency() {
+        time = new MockTime(0L, 0L, 0L);
+        metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
+
+        task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+
+        final KafkaMetric maxMetric = getMetric("record-e2e-latency", "%s-max", task.id().toString(), StreamsConfig.METRICS_LATEST);
+        final KafkaMetric minMetric = getMetric("record-e2e-latency", "%s-min", task.id().toString(), StreamsConfig.METRICS_LATEST);
+
+        assertThat(maxMetric.metricValue(), equalTo(Double.NaN));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 0L),
+            getConsumerRecord(partition1, 10L),
+            getConsumerRecord(partition1, 5L),
+            getConsumerRecord(partition1, 20L)

Review comment:
       We we increase this ts to 35? This would allow to test min in the last step better

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##########
@@ -14,15 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals.metrics;
+package org.apache.kafka.streams.processor.internals.metrics;

Review comment:
       Nice one!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -86,6 +87,14 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+    static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =
+        "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+            + "system time when it has been fully processed by the task";

Review comment:
       Assuming that a task might have a cache, is this correct, ie, `has been fully processed by the task`)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8697:
URL: https://github.com/apache/kafka/pull/8697


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8697: KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-632360387


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634821709


   The only failing tests were unrelated:
       org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers
       org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
       org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   Note that the java 8 build is not actually running.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634827871


   > The java 8 build started running right after I mentioned that it's not running
   
   I hope you'll choose to use these new powers for good


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r430575822



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days

Review comment:
       This necessity makes me think that our Percentiles metric algorithm needs to be improved. Admittedly, I haven't looked at the math, but it seems like it should be possible to be more adaptive.
   
   I'm in favor of not adding a config and just leaving it alone for now, so that we can take the option in the future to fix the problem by fixing the algorithm. 

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
##########
@@ -492,6 +493,52 @@ public void testPercentiles() {
         assertEquals(75, (Double) p75.metricValue(), 1.0);
     }
 
+    @Test
+    public void testPercentilesWithRandomNumbersAndLinearBucketing() {
+        long seed = new Random().nextLong();
+        int sizeInBytes = 1000 * 1000;   // 1MB
+        long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days
+
+        try {
+            Random prng = new Random(seed);
+            int numberOfValues = 5000 + prng.nextInt(10_000);  // ranges is [5000, 15000]
+
+            Percentiles percs = new Percentiles(sizeInBytes,
+                                                maximumValue,
+                                                BucketSizing.LINEAR,
+                                                new Percentile(metrics.metricName("test.p90", "grp1"), 90),
+                                                new Percentile(metrics.metricName("test.p99", "grp1"), 99));
+            MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+            Sensor sensor = metrics.sensor("test", config);
+            sensor.add(percs);
+            Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
+            Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
+
+            final List<Long> values = new ArrayList<>(numberOfValues);
+            // record two windows worth of sequential values
+            for (int i = 0; i < numberOfValues; ++i) {
+                long value = Math.abs(prng.nextLong()) % maximumValue;

Review comment:
       Not sure if it really matters, but this is not a uniform distribution (because MAX_VALUE and MIN_VALUE are not integer multiples of 1000 days. If you wanted a uniform distribution, it looks like you can use the bounded `nextInt` and cast to `long`.
   
   Also, FYI, `Math.abs(Long.MIN_VALUE) == Long.MIN_VALUE` (which is a negative number), due to overflow.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {

Review comment:
       Meta-review procedural question: In the future, can we try to avoid making the same comment in multiple places in the PR, since it leads to split discussions like this one?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
             log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode, wallClockTime);
+            maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());

Review comment:
       If I understand this right, we are recording sink latencies after processing, but source latencies before processing. This nicely avoids the problem with recording non-sink latencies after processing, but is it accurate?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       I'm fine with this as well, although I think it makes more sense either to pin to zero and warn or to just record the negative latency and warn. It feels like we're overthinking it. If the clocks are drifting a little and we report small negative numbers, the e2e latency is still _low_, which is still meaningful information. I really don't see a problem with just naively reporting it and not even bothering with a warning.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days

Review comment:
       However, I *do not* think we should restrict the max value for other metrics than the percentiles one. E.g., there's no reason to restrict the value we record for the max and min metrics. You should be able to update the Percentiles implementation to apply the maximum bound in the metric record method. Otherwise, I'd recommend recording two sensors separately; one for the bounded metrics, and one for the unbounded ones.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
             log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode, wallClockTime);
+            maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());

Review comment:
       Thanks, @ableegoldman , I think this is a fine tradeoff. Also helping is the fact that we know all "source nodes" are actually instances of SourceNode, which specifically do nothing except forward every record, so whether we measure these nodes before or "after" their processing logic should make no practical difference at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634694919


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8697: KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-632404608


   Alright I fixed the latency measurement to record the right thing @mjsax 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429499547



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
##########
@@ -492,6 +493,52 @@ public void testPercentiles() {
         assertEquals(75, (Double) p75.metricValue(), 1.0);
     }
 
+    @Test
+    public void testPercentilesWithRandomNumbersAndLinearBucketing() {
+        long seed = new Random().nextLong();
+        int sizeInBytes = 1000 * 1000;   // 1MB
+        long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days
+
+        try {
+            Random prng = new Random(seed);
+            int numberOfValues = 5000 + prng.nextInt(10_000);  // ranges is [5000, 15000]
+
+            Percentiles percs = new Percentiles(sizeInBytes,
+                                                maximumValue,
+                                                BucketSizing.LINEAR,
+                                                new Percentile(metrics.metricName("test.p90", "grp1"), 90),
+                                                new Percentile(metrics.metricName("test.p99", "grp1"), 99));
+            MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+            Sensor sensor = metrics.sensor("test", config);
+            sensor.add(percs);
+            Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
+            Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
+
+            final List<Long> values = new ArrayList<>(numberOfValues);
+            // record two windows worth of sequential values
+            for (int i = 0; i < numberOfValues; ++i) {
+                long value = Math.abs(prng.nextLong()) % maximumValue;
+                values.add(value);
+                sensor.record(value);
+            }
+
+            Collections.sort(values);
+
+            int p90Index = (int)Math.ceil(((double)(90 * numberOfValues)) / 100);
+            int p99Index = (int)Math.ceil(((double)(99 * numberOfValues)) / 100);
+
+            double expectedP90 = values.get(p90Index - 1);
+            double expectedP99 = values.get(p99Index - 1);
+
+            assertEquals(expectedP90, (Double) p90.metricValue(), expectedP90 / 10);

Review comment:
       Let's see...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429499547



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
##########
@@ -492,6 +493,52 @@ public void testPercentiles() {
         assertEquals(75, (Double) p75.metricValue(), 1.0);
     }
 
+    @Test
+    public void testPercentilesWithRandomNumbersAndLinearBucketing() {
+        long seed = new Random().nextLong();
+        int sizeInBytes = 1000 * 1000;   // 1MB
+        long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days
+
+        try {
+            Random prng = new Random(seed);
+            int numberOfValues = 5000 + prng.nextInt(10_000);  // ranges is [5000, 15000]
+
+            Percentiles percs = new Percentiles(sizeInBytes,
+                                                maximumValue,
+                                                BucketSizing.LINEAR,
+                                                new Percentile(metrics.metricName("test.p90", "grp1"), 90),
+                                                new Percentile(metrics.metricName("test.p99", "grp1"), 99));
+            MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
+            Sensor sensor = metrics.sensor("test", config);
+            sensor.add(percs);
+            Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
+            Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
+
+            final List<Long> values = new ArrayList<>(numberOfValues);
+            // record two windows worth of sequential values
+            for (int i = 0; i < numberOfValues; ++i) {
+                long value = Math.abs(prng.nextLong()) % maximumValue;
+                values.add(value);
+                sensor.record(value);
+            }
+
+            Collections.sort(values);
+
+            int p90Index = (int)Math.ceil(((double)(90 * numberOfValues)) / 100);
+            int p99Index = (int)Math.ceil(((double)(99 * numberOfValues)) / 100);
+
+            double expectedP90 = values.get(p90Index - 1);
+            double expectedP99 = values.get(p99Index - 1);
+
+            assertEquals(expectedP90, (Double) p90.metricValue(), expectedP90 / 10);

Review comment:
       Let's see...
   
   But I am wondering if the condition is right? Should it be:
   ```
   assertTrue("Expected +/- 10% accuracy for exact value of " + expectedP90, expectedP90 * 0.9 <= (Double) p90.metricValue() && (Double) p90.metricValue() <= expectedP90 * 1.1);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429505517



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {

Review comment:
       Ack (limit explanation on comment below)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org