You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/21 21:38:49 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #8697: KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r428930276



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -420,6 +422,50 @@ public void shouldRecordProcessRatio() {
         assertThat(metric.metricValue(), equalTo(1.0d));
     }
 
+    @Test
+    public void shouldRecordE2ELatency() {
+        time = new MockTime(0L, 0L, 0L);
+        metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
+
+        task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+
+        final KafkaMetric maxMetric = getMetric("record-e2e-latency", "%s-max", task.id().toString(), StreamsConfig.METRICS_LATEST);
+        final KafkaMetric minMetric = getMetric("record-e2e-latency", "%s-min", task.id().toString(), StreamsConfig.METRICS_LATEST);
+
+        assertThat(maxMetric.metricValue(), equalTo(Double.NaN));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 0L),
+            getConsumerRecord(partition1, 10L),
+            getConsumerRecord(partition1, 5L),
+            getConsumerRecord(partition1, 20L)

Review comment:
       We we increase this ts to 35? This would allow to test min in the last step better

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##########
@@ -14,15 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals.metrics;
+package org.apache.kafka.streams.processor.internals.metrics;

Review comment:
       Nice one!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -86,6 +87,14 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+    static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =
+        "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+            + "system time when it has been fully processed by the task";

Review comment:
       Assuming that a task might have a cache, is this correct, ie, `has been fully processed by the task`)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org