You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:49:52 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics

ableegoldman commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r461993688



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -198,7 +198,7 @@
                                 .define(METRICS_RECORDING_LEVEL_CONFIG,
                                         Type.STRING,
                                         Sensor.RecordingLevel.INFO.toString(),
-                                        in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+                                        in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),

Review comment:
       It's kind of a bummer that we can't just add the new TRACE level for Streams only; we have to add it to all the clients that Streams passes its configs down to. We could check for the new TRACE level and strip it off before passing the configs on to the clients, but that just seems like asking for trouble.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -88,13 +92,6 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
-    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";

Review comment:
       Moved the common descriptions to StreamsMetricsImpl

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) {
         return byteEntries;
     }
 
+    private void maybeRecordE2ELatency() {
+        if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
       For KV stores, we just compare the current time with the current record's timestamp

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -248,4 +253,12 @@ public void close() {
     private Bytes keyBytes(final K key) {
         return Bytes.wrap(serdes.rawKey(key));
     }
+
+    private void maybeRecordE2ELatency() {
+        if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
       For session and window stores, we also just compare the current time with the current record's timestamp when `put` is called. This can mean the e2e latency is measured several times on the same record, for example in a windowed aggregation.
   At first I thought that didn't make sense, but now I think it's actually exactly what we want. First of all, it means we can actually account for the latency between calls to `put` within a processor. For simple point inserts this might not be a huge increase on the scale of ms, but more complex processing may benefit from seeing this granularity of information. If they don't want it, well, that's why we introduced `TRACE`
   
   Second, while it might seem like we're over-weighting some records by measuring the e2e latency on them more than others, I'm starting to think this actually makes more sense than not: the big picture benefit/use case for the e2e latency metric is less "how long for this record to get sent downstream" and more "how long for this record to be reflected in the state store/IQ results". Given that, each record should be weighted by its actual proportion of the state store. You aren't querying individual records (in a window store), you're querying the windows themselves
   
   I toyed around with the idea of measuring the e2e latency relative to the window time, instead of the record timestamp, but ultimately couldn't find any sense in that. 
   Thoughts?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##########
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24,
         checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
         checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
         checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+        checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics);
+        checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics);
+        checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics);

Review comment:
       Ok there's something I'm not understanding about this test and/or the built-in metrics version. For some reason, the KV-store metrics are 0 when `METRICS_0100_TO_24` is used, and 1 (as expected) when the latest version in used. I feel like this is wrong, and it should always be 1, but I need some clarify on how this config is supposed to be used
   What makes me pretty sure there's something actually wrong here is that for the window/session store metrics, they are actually always at 1. But I can't figure out why the KV store metrics would be any different than the others. Any ideas @cadonna ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org