You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/30 05:38:08 UTC

[GitHub] [kafka] rodesai opened a new pull request #11149: KIP-761: add total blocked time metric to streams

rodesai opened a new pull request #11149:
URL: https://github.com/apache/kafka/pull/11149


   add the following producer metrics:
   flush-time-total: cumulative sum of time elapsed during in flush.
   txn-init-time-total: cumulative sum of time elapsed during in initTransactions.
   txn-begin-time-total: cumulative sum of time elapsed during in beginTransaction.
   txn-send-offsets-time-total: cumulative sum of time elapsed during in sendOffsetsToTransaction.
   txn-commit-time-total: cumulative sum of time elapsed during in commitTransaction.
   txn-abort-time-total: cumulative sum of time elapsed during in abortTransaction.
   
   add the following consumer metrics:
   commited-time-total: cumulative sum of time elapsed during in committed.
   commit-sync-time-total: cumulative sum of time elapsed during in commitSync.
   
   add a total-blocked-time metric to streams that is the sum of:
   consumer’s io-waittime-total
   consumer’s iotime-total
   consumer’s committed-time-total
   consumer’s commit-sync-time-total
   restore consumer’s io-waittime-total
   restore consumer’s iotime-total
   admin client’s io-waittime-total
   admin client’s iotime-total
   producer’s bufferpool-wait-time-total
   producer's flush-time-total
   producer's txn-init-time-total
   producer's txn-begin-time-total
   producer's txn-send-offsets-time-total
   producer's txn-commit-time-total
   producer's txn-abort-time-total
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r697727733



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -269,6 +300,12 @@ public final void removeAllThreadLevelSensors(final String threadId) {
                 metrics.removeSensor(sensors.pop());
             }
         }
+        synchronized (threadLevelMetrics) {

Review comment:
       These metrics aren't tied to a sensor - they just report values that are either constant or are accumulated from other metrics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r694522888



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -67,22 +69,35 @@
     private final Map<String, Object> eosV2ProducerConfigs;
     private final KafkaClientSupplier clientSupplier;
     private final StreamThread.ProcessingMode processingMode;
+    private final Time time;
 
     private Producer<byte[], byte[]> producer;
     private boolean transactionInFlight = false;
     private boolean transactionInitialized = false;
+    private double oldProducerTotalBlockedTime = 0;
 
     public StreamsProducer(final StreamsConfig config,
                            final String threadId,
                            final KafkaClientSupplier clientSupplier,
                            final TaskId taskId,
                            final UUID processId,
                            final LogContext logContext) {
+        this(config, threadId, clientSupplier, taskId, processId, logContext, Time.SYSTEM);
+    }

Review comment:
       it's only meant for testing. I'll make it package-private and annotate it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696290580



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
         assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
     }
 
+    @Test
+    public void testMeasureCommitSyncDuration() {
+        // use a consumer that will throw to ensure we return quickly
+        Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+        client.createPendingAuthenticationError(node, 0);
+        final KafkaConsumer<String, String> consumer
+            = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);

Review comment:
       refactored




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688236582



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -590,9 +593,11 @@ else if (acks != -1)
     public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long now = time.nanoseconds();

Review comment:
       ah - I thought we were measuring in nanos. Not sure where I got that impression. I'll change to millis




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r695341815



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
##########
@@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
                 metricGroupName,
                 "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."),
                 new Avg());
+
+        this.commitSyncSensor = metrics.sensor("commit-sync-time-total");
+        this.commitSyncSensor.add(
+            metrics.metricName("commit-sync-time-total", metricGroupName),
+            new CumulativeSum()
+        );
+
+        this.committedSensor = metrics.sensor("committed-time-total");
+        this.committedSensor.add(
+            metrics.metricName("committed-time-total", metricGroupName),
+            new CumulativeSum()
+        );

Review comment:
       ack - will defer to a follow-up PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688245686



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -590,9 +593,11 @@ else if (acks != -1)
     public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long now = time.nanoseconds();

Review comment:
       Actually now I remember why - the bufferpool and selector total blocked times are all being measured in nanos and use the suffix `time-total`. So chose the naming convention and unit accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691098389



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    final double getMetricValue(

Review comment:
       Could you please make this method private? 
   Out of curiosity, why did you define this method as `final`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId,
         return sensor;
     }
 
+    public static void addThreadStartTimeMetric(final String threadId,

Review comment:
       Could you please add tests in `ThreadMetricsTest`?
   A similar test is `ClientMetricsTest#shouldAddVersionMetric()`.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;

Review comment:
       Could you please make this member fields private?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId,
         return sensor;
     }
 
+    public static void addThreadStartTimeMetric(final String threadId,
+                                                final StreamsMetricsImpl streamsMetrics,
+                                                final long startTime) {
+        streamsMetrics.addThreadLevelImmutableMetric(
+            THREAD_START_TIME,
+            THREAD_START_TIME_DESCRIPTION,
+            threadId,
+            startTime
+        );
+    }
+
+    public static void addThreadBlockedTimeMetric(final String threadId,

Review comment:
       Could you please add tests in `ThreadMetricsTest`?
   A similar test is `ClientMetricsTest#shouldAddStateMetric()`

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1125,60 @@ public void shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+        eosBetaStreamsProducer.resetProducer();
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(eosBetaStreamsProducer.totalBlockedTime(), greaterThan(2 * expectedTotalBlocked));

Review comment:
       Do you use `greaterThan()` here because of `double`? Maybe you should use `Matchers.closeTo(double, double)`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 
 import java.util.Map;
+import org.apache.kafka.streams.processor.internals.StreamsThreadTotalBlockedTime;

Review comment:
       Could you please move this import after line 20? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -299,4 +299,12 @@ private LogContext getLogContext(final TaskId taskId) {
         return new LogContext(logPrefix);
     }
 
+    public double totalProducerBlockedTime() {

Review comment:
       Could you please write a unit test for this method?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1125,60 @@ public void shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+        eosBetaStreamsProducer.resetProducer();

Review comment:
       Could you add `assertThat(eosBetaStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));` above this line?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    final double getMetricValue(
+        final Map<MetricName, ? extends Metric> metrics,
+        final String name) {
+        return metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .findFirst()
+            .map(n -> (Double) metrics.get(n).metricValue())
+            .orElse(0.0);
+    }
+
+    public double getTotalBlockedTime() {

Review comment:
       We do usually not use the `get` prefix in AK. What about renaming the class to `StreamThreadTotalBlockedTime` (note that I removed the `s` in `StreamThread` to match with the `StreamThread` class)  and the method to `compute()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r698854001



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+import java.util.Map;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
+
+    private final Map<String, String> tags;
+    private final Metrics metrics;
+    private final Sensor initTimeSensor;
+    private final Sensor beginTxnTimeSensor;
+    private final Sensor flushTimeSensor;
+    private final Sensor sendOffsetsSensor;
+    private final Sensor commitTxnSensor;
+    private final Sensor abortTxnSensor;
+
+    public KafkaProducerMetrics(Metrics metrics) {
+        this.metrics = metrics;
+        tags = this.metrics.config().tags();
+        flushTimeSensor = newLatencySensor(
+            FLUSH,
+            "Total time producer has spent in flush in nanoseconds."
+        );
+        initTimeSensor = newLatencySensor(
+            TXN_INIT,
+            "Total time producer has spent in initTransactions in nanoseconds."
+        );
+        beginTxnTimeSensor = newLatencySensor(
+            TXN_BEGIN,
+            "Total time producer has spent in beginTransaction in nanoseconds."
+        );
+        sendOffsetsSensor = newLatencySensor(
+            TXN_SEND_OFFSETS,
+            "Total time producer has spent in sendOffsetsToTransaction."

Review comment:
       nit: also add ` in nanoseconds`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -91,6 +94,10 @@ private ThreadMetrics() {}
         "The fraction of time the thread spent on polling records from consumer";
     private static final String COMMIT_RATIO_DESCRIPTION =
         "The fraction of time the thread spent on committing all tasks";
+    private static final String BLOCKED_TIME_DESCRIPTION =
+        "The total time the thread spent blocked on kafka";

Review comment:
       ` in nanoseconds`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang edited a comment on pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
guozhangwang edited a comment on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-904852557


   > The title of the PR should start with the Jira ID, i.e., KAFKA-1234.
   
   Just to explain the context here, we have a browser plugin for AK tickets which can re-direct from PR directly to the ticket URL, but that script relies on the PR title to follow the pattern of `KAFKA-1234: blah blah`. For this KIP probably we have not created a JIRA ticket yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688239443



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +180,39 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
         }
 
+        final long start = Time.SYSTEM.nanoseconds();

Review comment:
       ah good call!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688268274



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public <T> void addThreadLevelImmutableMetric(final String name,
+        final String description,
+        final String threadId,
+        final T value) {
+        final MetricName metricName = metrics.metricName(
+            name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
+        synchronized (threadLevelMetrics) {

Review comment:
       Oh it's totally fine to use the thread id, it's just that for other thread-level metrics we would prefix the thread id with either `internal` or `external`, i.e. the "key" would not just be the thread id itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r697393359



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2262,6 +2352,14 @@ public void testListOffsetShouldUpateSubscriptions() {
         return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
     }
 
+    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() {
+        return consumerWithPendingAuthenticationError(new MockTime());
+    }
+
+    private KafkaConsumer<String, String> consumerWithPendingError(final Time time) {
+        return consumerWithPendingAuthenticationError(time);
+    }

Review comment:
       Is this method needed?  Couldn't you directly call `consumerWithPendingAuthenticationError(final Time time)`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class KafkaConsumerMetricsTest {
+  private static final long METRIC_VALUE = 123L;
+  private static final String CONSUMER_GROUP_PREFIX = "consumer";
+  private static final String CONSUMER_METRIC_GROUP = "consumer-metrics";
+
+  private final Metrics metrics = new Metrics();
+  private final KafkaConsumerMetrics consumerMetrics
+      = new KafkaConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX);
+
+  @Test
+  public void shouldRecordCommitSyncTime() {
+    // When:
+    consumerMetrics.recordCommitSync(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("commit-sync-time-total");
+  }
+
+  @Test
+  public void shouldRecordCommittedTime() {
+    // When:
+    consumerMetrics.recordCommitted(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("committed-time-total");
+  }
+

Review comment:
       Thanks you for adding the tests!
   Could you also add a test for the `close()` method to be sure all metrics are removed from `metrics` on close.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class KafkaProducerMetricsTest {
+  private static final long METRIC_VALUE = 123L;
+
+  private final Metrics metrics = new Metrics();
+  private final KafkaProducerMetrics producerMetrics = new KafkaProducerMetrics(metrics);
+
+  @Test
+  public void shouldRecordFlushTime() {
+    // When:
+    producerMetrics.recordFlush(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("flush-time-total");
+  }
+
+  @Test
+  public void shouldRecordInitTime() {
+    // When:
+    producerMetrics.recordInit(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-init-time-total");
+  }
+
+  @Test
+  public void shouldRecordTxBeginTime() {
+    // When:
+    producerMetrics.recordBeginTxn(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-begin-time-total");
+  }
+
+  @Test
+  public void shouldRecordTxCommitTime() {
+    // When:
+    producerMetrics.recordCommitTxn(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-commit-time-total");
+  }
+
+  @Test
+  public void shouldRecordTxAbortTime() {
+    // When:
+    producerMetrics.recordAbortTxn(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-abort-time-total");
+  }
+
+  @Test
+  public void shouldRecordSendOffsetsTime() {
+    // When:
+    producerMetrics.recordSendOffsets(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-send-offsets-time-total");
+  }
+
+  private void assertMetricValue(final String name) {
+    assertEquals(
+        metrics.metric(metrics.metricName(name, KafkaProducerMetrics.GROUP)).metricValue(),
+        (double) METRIC_VALUE
+    );
+  }
+}

Review comment:
       Could you please also add a test for `close()` to ensure that metrics are removed from `metrics` on close?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -99,6 +99,9 @@ void maybeCreateTasksFromNewTopologies() {
             activeTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies),
             standbyTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies)
         );
+

Review comment:
       Here a `}` is missing which leads to the compilation errors in the builds.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688142016



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
                 this.userCallback.onCompletion(metadata, exception);
         }
     }
+
+    private static class KafkaProducerMetrics {
+        private static final String FLUSH = "flush";
+        private static final String TXN_INIT = "txn-init";
+        private static final String TXN_BEGIN = "txn-begin";
+        private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+        private static final String TXN_COMMIT = "txn-commit";
+        private static final String TXN_ABORT = "txn-abort";
+        private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+        final Map<String, String> tags;
+        final Metrics metrics;
+        final Sensor initTimeSensor;
+        final Sensor beginTimeSensor;
+        final Sensor flushTimeSensor;
+        final Sensor sendOffsetsSensor;
+        final Sensor commitSensor;
+        final Sensor abortSensor;
+
+        private KafkaProducerMetrics(Metrics metrics) {
+            this.metrics = metrics;
+            this.tags = this.metrics.config().tags();
+            this.flushTimeSensor = newLatencySensor(FLUSH);
+            this.initTimeSensor = newLatencySensor(TXN_INIT);
+            this.beginTimeSensor = newLatencySensor(TXN_BEGIN);
+            this.sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS);
+            this.commitSensor = newLatencySensor(TXN_COMMIT);
+            this.abortSensor = newLatencySensor(TXN_ABORT);
+        }
+
+        private Sensor newLatencySensor(String name) {
+            Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX);
+            sensor.add(
+                metrics.metricName(name + TOTAL_TIME_SUFFIX, ProducerMetrics.GROUP, tags),
+                new CumulativeSum()
+            );
+            return sensor;
+        }
+
+        private void recordFlush(long duration) {
+            flushTimeSensor.record(duration);
+        }
+
+        private void recordInit(long duration) {
+            initTimeSensor.record(duration);
+        }
+
+        private void recordBegin(long duration) {

Review comment:
       nit: better leave the full name as recordBeginTxn/AbortTxn/CommitTxn.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
                 this.userCallback.onCompletion(metadata, exception);
         }
     }
+
+    private static class KafkaProducerMetrics {
+        private static final String FLUSH = "flush";
+        private static final String TXN_INIT = "txn-init";
+        private static final String TXN_BEGIN = "txn-begin";
+        private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+        private static final String TXN_COMMIT = "txn-commit";
+        private static final String TXN_ABORT = "txn-abort";
+        private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+        final Map<String, String> tags;
+        final Metrics metrics;
+        final Sensor initTimeSensor;
+        final Sensor beginTimeSensor;

Review comment:
       Ditto here; better rename it to `beginTxn` / `commitTxn` / `abortTxn`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +180,39 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
         }
 
+        final long start = Time.SYSTEM.nanoseconds();
         producer.close();
+        final long closeTime = Time.SYSTEM.nanoseconds() - start;
+
+        oldProducerTotalBlockedTime += closeTime + totalBlockedTime(producer);
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
         transactionInitialized = false;
     }
 
+    private static double getMetricValue(final Map<MetricName, ? extends Metric> metrics,
+                                         final String name) {
+        return metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .findFirst()

Review comment:
       Maybe worth checking there's only one element after the filtering? It should not be expected to have more than one right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime
+    ) {

Review comment:
       very nit: in AK repo we usually do not have a new line for `)` but just have it at the same line of the last param. Ditto elsewhere.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
##########
@@ -26,6 +26,7 @@
 import org.apache.kafka.common.metrics.Metrics;
 
 public class ProducerMetrics {
+    public static final String GROUP = "producer-metrics";

Review comment:
       I think it's better be in `KafkaProducerMetrics` rather than here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -590,9 +593,11 @@ else if (acks != -1)
     public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long now = time.nanoseconds();

Review comment:
       Do we want to measure them in millis or nanos? For most latency measures we are currently in ms, and if we do measure in ns we usually name the metric as e.g. "xyz-total-ns".

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -868,6 +904,36 @@ public void testAbortTransaction() {
         }
     }
 
+    @Test
+    public void testMeasureAbortTransactionDuration() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+
+            client.prepareResponse(endTxnResponse(Errors.NONE));
+            producer.beginTransaction();
+            producer.abortTransaction();
+            double first = getMetricValue(producer, "txn-abort-time-total");
+            assertTrue(first > 999999.0);

Review comment:
       Ditto here and elsewhere.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() {
         }
     }
 
+    private static Double getMetricValue(final KafkaProducer<?, ?> producer, final String name) {
+        Metrics metrics = producer.metrics;
+        Metric metric =  metrics.metric(metrics.metricName(name, "producer-metrics"));
+        return (Double) metric.metricValue();
+    }
+
+    @Test
+    public void testFlushMeasureLatency() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(
+            configs,
+            new StringSerializer(),
+            new StringSerializer(),
+            metadata,
+            client,
+            null,
+            time
+        )) {
+            producer.flush();
+            double first = getMetricValue(producer, "flush-time-total");
+            assertTrue(first > 999999.0);

Review comment:
       Why we want to assert this latency larger than `999999.0` nano seconds? Could this result in flakiness if it is time dependent? If all we want to measure is a not-null value, then just asserting that is > 0 is sufficient?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +180,39 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
         }
 
+        final long start = Time.SYSTEM.nanoseconds();

Review comment:
       Ditto here: nano seconds seems unnecessary?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime
+    ) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    final double getMetricValue(

Review comment:
       What about consolidating this function and the other in `StreamsProducer` as a static in `StreamsMetricsImpl#getMetricValue`, and we can also use this in unit test e.g. https://github.com/apache/kafka/pull/11149/files#diff-599de0f96fbd5ba6b3d919881426269fc72fe8bbe8e2436fab87d9abe84e8dbaR735 as well.

##########
File path: gradle.properties
##########
@@ -20,7 +20,7 @@ group=org.apache.kafka
 #  - tests/kafkatest/__init__.py
 #  - tests/kafkatest/version.py (variable DEV_VERSION)
 #  - kafka-merge-pr.py
-version=3.1.0-SNAPSHOT
+version=ROHAN-SNAPSHOT

Review comment:
       :)

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -590,9 +593,11 @@ else if (acks != -1)
     public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long now = time.nanoseconds();

Review comment:
       Personally I think we can just measure in ms since it is cumulative sum as `total` anyways. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
                 this.userCallback.onCompletion(metadata, exception);
         }
     }
+
+    private static class KafkaProducerMetrics {

Review comment:
       What about moving it to `org.apache.kafka.clients.producer.internals` as a separate class?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -508,6 +508,21 @@ public StreamThread(final Time time,
         ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
         ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
 
+        ThreadMetrics.addThreadStartTimeMetric(
+            threadId,
+            streamsMetrics,
+            time.nanoseconds()

Review comment:
       I think milli seconds would be sufficient?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public <T> void addThreadLevelImmutableMetric(final String name,
+        final String description,
+        final String threadId,
+        final T value) {
+        final MetricName metricName = metrics.metricName(
+            name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
+        synchronized (threadLevelMetrics) {

Review comment:
       Why we want to not prefix the thread id but directly use it as the key in the `threadLevelMetrics` map? Ditto below.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
                 this.userCallback.onCompletion(metadata, exception);
         }
     }
+
+    private static class KafkaProducerMetrics {

Review comment:
       Also it seems we do not have logic to de-register the metrics from `metrics` registry when closing the producer? Maybe we can follow the same as consumer to declare it as `implements AutoCloseable` and then call its `close` when shutting down the producer.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +180,39 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
         }
 
+        final long start = Time.SYSTEM.nanoseconds();

Review comment:
       Also just to follow my other comment, if we de-register all the metrics upon producer closure then we'd better read it out before closing and starting a new producer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688267856



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime
+    ) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    final double getMetricValue(

Review comment:
       Sounds good then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696280437



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
         assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
     }
 
+    @Test
+    public void testMeasureCommitSyncDuration() {
+        // use a consumer that will throw to ensure we return quickly
+        Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+        client.createPendingAuthenticationError(node, 0);
+        final KafkaConsumer<String, String> consumer
+            = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);

Review comment:
       it doesn't set a tick on the mock time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r698327147



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +184,48 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
         }
 
+        oldProducerTotalBlockedTime += totalBlockedTime(producer);
+        final long start = time.nanoseconds();
         producer.close();
+        final long closeTime = time.nanoseconds() - start;
+        oldProducerTotalBlockedTime += closeTime;
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
         transactionInitialized = false;
     }
 
+    private double getMetricValue(final Map<MetricName, ? extends Metric> metrics,
+                                  final String name) {
+        final List<MetricName> found = metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .collect(Collectors.toList());
+        if (found.isEmpty()) {
+            return 0.0;
+        }
+        if (found.size() > 1) {

Review comment:
       @rodesai I see your point here. However, the downside of not throwing is that we will also not notice the bad behavior in our tests like the soak tests. I personally prefer to improve tests instead of downgrading the reaction to bad behavior. Assume in future somebody makes a change that breaks the assumption of the non-shared metrics registry, we would find this bug immediately during development instead of during production.
   Another option that comes to my mind is to classify exceptions that originate from the metrics framework differently in the uncaught exception handler, but that would probably need some more work. 

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2262,6 +2352,14 @@ public void testListOffsetShouldUpateSubscriptions() {
         return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
     }
 
+    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() {
+        return consumerWithPendingAuthenticationError(new MockTime());
+    }
+
+    private KafkaConsumer<String, String> consumerWithPendingError(final Time time) {
+        return consumerWithPendingAuthenticationError(time);
+    }

Review comment:
       Fair enough!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691613659



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1125,60 @@ public void shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+        eosBetaStreamsProducer.resetProducer();

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688240994



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +180,39 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
         }
 
+        final long start = Time.SYSTEM.nanoseconds();
         producer.close();
+        final long closeTime = Time.SYSTEM.nanoseconds() - start;
+
+        oldProducerTotalBlockedTime += closeTime + totalBlockedTime(producer);
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
         transactionInitialized = false;
     }
 
+    private static double getMetricValue(final Map<MetricName, ? extends Metric> metrics,
+                                         final String name) {
+        return metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .findFirst()

Review comment:
       yeah it should always be one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696282952



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
         assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
     }
 
+    @Test
+    public void testMeasureCommitSyncDuration() {
+        // use a consumer that will throw to ensure we return quickly

Review comment:
       Yeah. There are not tests for that path, and I lost steam trying to pay that debt just to implement this metric.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r695567992



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
         assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
     }
 
+    @Test
+    public void testMeasureCommitSyncDuration() {
+        // use a consumer that will throw to ensure we return quickly

Review comment:
       I think we also need to test the case without failure. Otherwise, we assume in the test that the measurement is in the `finally` clause which we should not assume but rather ensure with unit tests. Same applies to the test below.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.junit.jupiter.api.Test;

Review comment:
       ```suggestion
   import org.apache.kafka.common.metrics.Metrics;
   
   import org.junit.jupiter.api.Test;
   
   import static org.junit.jupiter.api.Assertions.assertEquals;
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
         }
     }
 
+    private double getAndAssertDuration(KafkaProducer<?, ?> producer, String name, double floor) {
+        double value = getMetricValue(producer, name);
+        assertTrue(value > floor);
+        return value;
+    }
+
+    @Test
+    public void testMeasureTransactionDurations() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+            assertTrue(getMetricValue(producer, "txn-init-time-total") > 999999);

Review comment:
       I now saw that in the consumer tests you use `Duration.ofSeconds(1).toMillis()` and `Duration.ofMillis(999).toNanos()`. This makes it already clearer. I think a variable with a meaningful name for the lower bound would make it even clearer. 

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
         assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
     }
 
+    @Test
+    public void testMeasureCommitSyncDuration() {
+        // use a consumer that will throw to ensure we return quickly
+        Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+        client.createPendingAuthenticationError(node, 0);
+        final KafkaConsumer<String, String> consumer
+            = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);

Review comment:
       Could you not just use `final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();` as in the test above?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
         assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
     }
 
+    @Test
+    public void testMeasureCommitSyncDuration() {
+        // use a consumer that will throw to ensure we return quickly
+        Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+        client.createPendingAuthenticationError(node, 0);
+        final KafkaConsumer<String, String> consumer
+            = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp0, new OffsetAndMetadata(10L));
+
+        try {
+            consumer.commitSync(offsets);
+        } catch (final RuntimeException e) {
+        }
+
+        final Metric metric = consumer.metrics()
+            .get(consumer.metrics.metricName("commit-sync-time-total", "consumer-metrics"));
+        assertTrue((Double)metric.metricValue() >= Duration.ofMillis(999).toNanos());
+    }
+
+    @Test
+    public void testMeasureCommittedDuration() {
+        // use a consumer that will throw to ensure we return quickly
+        Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+        client.createPendingAuthenticationError(node, 0);
+        final KafkaConsumer<String, String> consumer
+            = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);

Review comment:
       Could you not just use final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(); as in the test above?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
##########
@@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
                 metricGroupName,
                 "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."),
                 new Avg());
+
+        this.commitSyncSensor = metrics.sensor("commit-sync-time-total");
+        this.commitSyncSensor.add(
+            metrics.metricName("commit-sync-time-total", metricGroupName),
+            new CumulativeSum()
+        );
+
+        this.committedSensor = metrics.sensor("committed-time-total");
+        this.committedSensor.add(
+            metrics.metricName("committed-time-total", metricGroupName),
+            new CumulativeSum()
+        );

Review comment:
       Could you please already open the follow-up PR with scaffolding and link it here? I think otherwise we risk to forget about it. 

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##########
@@ -1337,7 +1337,7 @@ public boolean isOpen() {
         public TestDriverProducer(final StreamsConfig config,
                                   final KafkaClientSupplier clientSupplier,
                                   final LogContext logContext) {
-            super(config, "TopologyTestDriver-StreamThread-1", clientSupplier, new TaskId(0, 0), UUID.randomUUID(), logContext);
+            super(config, "TopologyTestDriver-StreamThread-1", clientSupplier, new TaskId(0, 0), UUID.randomUUID(), logContext, Time.SYSTEM);

Review comment:
       I think this should be:
   ```suggestion
               super(config, "TopologyTestDriver-StreamThread-1", clientSupplier, new TaskId(0, 0), UUID.randomUUID(), logContext, mockWallClockTime);
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
         }
     }
 
+    private double getAndAssertDuration(KafkaProducer<?, ?> producer, String name, double floor) {
+        double value = getMetricValue(producer, name);
+        assertTrue(value > floor);
+        return value;
+    }
+
+    @Test
+    public void testMeasureTransactionDurations() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+            assertTrue(getMetricValue(producer, "txn-init-time-total") > 999999);

Review comment:
       Got it!
   Could you please put the `999999` in a variable and give it a meaningful name such as `lowerBoundNs`. Maybe put also the `1` that you use in the `MockTime` constructor in a variable such as `tickMs` so that the difference in units becomes clear. 
   
   BTW: you could also use `getAndAssertDuration()` here, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r696290766



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -1925,6 +1926,56 @@ public void testCommittedAuthenticationFailure() {
         assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
     }
 
+    @Test
+    public void testMeasureCommitSyncDuration() {
+        // use a consumer that will throw to ensure we return quickly

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r695342570



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
         }
     }
 
+    private double getAndAssertDuration(KafkaProducer<?, ?> producer, String name, double floor) {
+        double value = getMetricValue(producer, name);
+        assertTrue(value > floor);
+        return value;
+    }
+
+    @Test
+    public void testMeasureTransactionDurations() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+            assertTrue(getMetricValue(producer, "txn-init-time-total") > 999999);

Review comment:
       I'm verifying that something was measured and that it's at least 1 tick of the clock. The clock is shared between multiple threads (e.g. the io threads) so the number of ticks depends  on what threads get scheduled while we're in `initTransactions`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r697743994



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2262,6 +2352,14 @@ public void testListOffsetShouldUpateSubscriptions() {
         return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
     }
 
+    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() {
+        return consumerWithPendingAuthenticationError(new MockTime());
+    }
+
+    private KafkaConsumer<String, String> consumerWithPendingError(final Time time) {
+        return consumerWithPendingAuthenticationError(time);
+    }

Review comment:
       I like the name better - the caller just cares that the consumer throws on the following call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691767731



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public <T> void addThreadLevelImmutableMetric(final String name,
+        final String description,
+        final String threadId,
+        final T value) {
+        final MetricName metricName = metrics.metricName(
+            name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
+        synchronized (threadLevelMetrics) {

Review comment:
       Ah I wasn't aware of the external vs internal sensor names. Now that I read through this again it seems to be that some external caller with a reference to `StreamsMetrics` can add their own sensors, which don't get cleaned up when the thread goes away. In this case we don't have external callers adding any thread-level metrics to the map, so we don't really need the prefix. Happy to include it to keep things consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688255651



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() {
         }
     }
 
+    private static Double getMetricValue(final KafkaProducer<?, ?> producer, final String name) {
+        Metrics metrics = producer.metrics;
+        Metric metric =  metrics.metric(metrics.metricName(name, "producer-metrics"));
+        return (Double) metric.metricValue();
+    }
+
+    @Test
+    public void testFlushMeasureLatency() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(
+            configs,
+            new StringSerializer(),
+            new StringSerializer(),
+            metadata,
+            client,
+            null,
+            time
+        )) {
+            producer.flush();
+            double first = getMetricValue(producer, "flush-time-total");
+            assertTrue(first > 999999.0);

Review comment:
       Ah actually this doesn't work because the mock time is passed to and used from the other client threads - so the value is not predictable. So the best we can do is assert that at least one tick (1000000 nanoseconds has passed). I'll update the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-896970248


   @cadonna @ableegoldman @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-904548251


   The title of the PR should start with the Jira ID, i.e., KAFKA-1234.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691775378



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1125,60 @@ public void shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+        eosBetaStreamsProducer.resetProducer();
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(eosBetaStreamsProducer.totalBlockedTime(), greaterThan(2 * expectedTotalBlocked));

Review comment:
       ah somehow I thought we couldn't use the hamcrest matchers. thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691768579



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    final double getMetricValue(

Review comment:
       typo - i'll fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r693539139



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public <T> void addThreadLevelImmutableMetric(final String name,
+        final String description,
+        final String threadId,
+        final T value) {
+        final MetricName metricName = metrics.metricName(
+            name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
+        synchronized (threadLevelMetrics) {

Review comment:
       Yes, I'm only suggesting this as to keep things consistent, not really a correctness issue.
   
   As for `external` metrics, when users create their own metrics I think they are responsible for e.g. de-registering them upon closure, not Streams itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11149: KAFKA-1234: KIP-761, add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-904852557


   > The title of the PR should start with the Jira ID, i.e., KAFKA-1234.
   
   Just to explain the context here, we have a browser plugin for AK tickets which can re-direct from PR directly to the ticket URL, but that script relies on the PR title to follow the pattern of `KAFKA-1234: blah blah`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r697740825



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +184,48 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
         }
 
+        oldProducerTotalBlockedTime += totalBlockedTime(producer);
+        final long start = time.nanoseconds();
         producer.close();
+        final long closeTime = time.nanoseconds() - start;
+        oldProducerTotalBlockedTime += closeTime;
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
         transactionInitialized = false;
     }
 
+    private double getMetricValue(final Map<MetricName, ? extends Metric> metrics,
+                                  final String name) {
+        final List<MetricName> found = metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .collect(Collectors.toList());
+        if (found.isEmpty()) {
+            return 0.0;
+        }
+        if (found.size() > 1) {

Review comment:
       I agree it should never happen. I'm a little hesitant here since if this scenario occurs and we just log the only downside is a bad metric value, but if we throw we may cause a query to go down. I was thinking we can observe this log value after deploying and convert to an exception as long as we never see it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688240676



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() {
         }
     }
 
+    private static Double getMetricValue(final KafkaProducer<?, ?> producer, final String name) {
+        Metrics metrics = producer.metrics;
+        Metric metric =  metrics.metric(metrics.metricName(name, "producer-metrics"));
+        return (Double) metric.metricValue();
+    }
+
+    @Test
+    public void testFlushMeasureLatency() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(
+            configs,
+            new StringSerializer(),
+            new StringSerializer(),
+            metadata,
+            client,
+            null,
+            time
+        )) {
+            producer.flush();
+            double first = getMetricValue(producer, "flush-time-total");
+            assertTrue(first > 999999.0);

Review comment:
       It's using mock time, so the value here is well-known (should be 1 second). I'm using > rather than equalTo because I don't want the test to fail spuriously on floating point rounding errors. It would probably be better to use [isCloseTo](http://hamcrest.org/JavaHamcrest/javadoc/1.3/org/hamcrest/number/IsCloseTo.html)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r697605349



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+import java.util.Map;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-total";

Review comment:
       Just following my other comment here: maybe we can declare it as TOTAL_NS_TIME_SUFFIX = "-time-ns-total";

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
##########
@@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
                 metricGroupName,
                 "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."),
                 new Avg());
+
+        this.commitSyncSensor = metrics.sensor("commit-sync-time-total");
+        this.commitSyncSensor.add(
+            metrics.metricName("commit-sync-time-total", metricGroupName),

Review comment:
       Could we add the description in the metricName as well indicating this is measured in nanos not millis? Ditto elsewhere.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +184,48 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
         }
 
+        oldProducerTotalBlockedTime += totalBlockedTime(producer);
+        final long start = time.nanoseconds();
         producer.close();
+        final long closeTime = time.nanoseconds() - start;
+        oldProducerTotalBlockedTime += closeTime;
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
         transactionInitialized = false;
     }
 
+    private double getMetricValue(final Map<MetricName, ? extends Metric> metrics,
+                                  final String name) {
+        final List<MetricName> found = metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .collect(Collectors.toList());
+        if (found.isEmpty()) {
+            return 0.0;
+        }
+        if (found.size() > 1) {

Review comment:
       Since the `metrics` registry object is not shared among all embedded producer/consumer instances, we should never get this situation. How about treating it as an illegal state exception which would be fatal and kill the node?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -45,6 +46,8 @@ private ThreadMetrics() {}
     private static final String CREATE_TASK = "task-created";
     private static final String CLOSE_TASK = "task-closed";
     private static final String SKIP_RECORD = "skipped-records";
+    private static final String BLOCKED_TIME = "blocked-time-total";

Review comment:
       Just follow my other comment: how about naming it as `blocked-time-ns-total`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+import java.util.Map;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+    private final Map<String, String> tags;
+    private final Metrics metrics;
+    private final Sensor initTimeSensor;
+    private final Sensor beginTxnTimeSensor;
+    private final Sensor flushTimeSensor;
+    private final Sensor sendOffsetsSensor;
+    private final Sensor commitTxnSensor;
+    private final Sensor abortTxnSensor;
+
+    public KafkaProducerMetrics(Metrics metrics) {
+        this.metrics = metrics;
+        tags = this.metrics.config().tags();
+        flushTimeSensor = newLatencySensor(FLUSH);
+        initTimeSensor = newLatencySensor(TXN_INIT);
+        beginTxnTimeSensor = newLatencySensor(TXN_BEGIN);
+        sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS);
+        commitTxnSensor = newLatencySensor(TXN_COMMIT);
+        abortTxnSensor = newLatencySensor(TXN_ABORT);
+    }
+
+    @Override
+    public void close() {
+        removeMetric(FLUSH);
+        removeMetric(TXN_INIT);
+        removeMetric(TXN_BEGIN);
+        removeMetric(TXN_SEND_OFFSETS);
+        removeMetric(TXN_COMMIT);
+        removeMetric(TXN_ABORT);
+    }
+
+    public void recordFlush(long duration) {
+        flushTimeSensor.record(duration);
+    }
+
+    public void recordInit(long duration) {
+        initTimeSensor.record(duration);
+    }
+
+    public void recordBeginTxn(long duration) {
+        beginTxnTimeSensor.record(duration);
+    }
+
+    public void recordSendOffsets(long duration) {
+        sendOffsetsSensor.record(duration);
+    }
+
+    public void recordCommitTxn(long duration) {
+        commitTxnSensor.record(duration);
+    }
+
+    public void recordAbortTxn(long duration) {
+        abortTxnSensor.record(duration);
+    }
+
+    private Sensor newLatencySensor(String name) {
+        Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX);
+        sensor.add(metricName(name), new CumulativeSum());
+        return sensor;
+    }
+
+    private MetricName metricName(final String name) {
+        return metrics.metricName(name + TOTAL_TIME_SUFFIX, GROUP, tags);
+    }
+
+    private void removeMetric(final String name) {
+        metrics.removeSensor(name + TOTAL_TIME_SUFFIX);

Review comment:
       `removeSensor()` would remove its associated metrics as well, I think we do not need the second call below.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -590,9 +593,11 @@ else if (acks != -1)
     public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long now = time.nanoseconds();

Review comment:
       I took a look at these metrics and I think they should be fixed with the `-ns` suffix (I was wrong though thinking that they should be `-total-ns`, instead they should be `-ns-total`). Filed https://issues.apache.org/jira/browse/KAFKA-13243
   
   For the newly added metrics, I'd suggest we add the -ns suffix before the "type" (-total) suffix, in which we would update the wiki page and then just send an update on the KIP thread saying we are making a small change to it. We do not need to ask for re-voting since this is a small change.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamThreadTotalBlockedTime {
+    private final Consumer<?, ?> consumer;
+    private final Consumer<?, ?> restoreConsumer;
+    private final Supplier<Double> producerTotalBlockedTime;
+
+    StreamThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    private double metricValue(
+        final Map<MetricName, ? extends Metric> metrics,
+        final String name) {
+        return metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .findFirst()
+            .map(n -> (Double) metrics.get(n).metricValue())
+            .orElse(0.0);
+    }
+
+    public double compute() {
+        return metricValue(consumer.metrics(), "io-waittime-total")
+            + metricValue(consumer.metrics(), "iotime-total")

Review comment:
       Not introduced in this PR at all, but I just realized we mistakenly had a bug that caused the avg and total metric names to be different: `iotime` for total and `io-time` for average. I will include that in the filed JIRA ticket as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -269,6 +300,12 @@ public final void removeAllThreadLevelSensors(final String threadId) {
                 metrics.removeSensor(sensors.pop());
             }
         }
+        synchronized (threadLevelMetrics) {

Review comment:
       We should not need this block as well since now the names are consistent, the above block should be sufficient. Note that `metrics.removeSensor()` would also trigger `metrics.removeMetric` for its children metrics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r698854001



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+import java.util.Map;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
+
+    private final Map<String, String> tags;
+    private final Metrics metrics;
+    private final Sensor initTimeSensor;
+    private final Sensor beginTxnTimeSensor;
+    private final Sensor flushTimeSensor;
+    private final Sensor sendOffsetsSensor;
+    private final Sensor commitTxnSensor;
+    private final Sensor abortTxnSensor;
+
+    public KafkaProducerMetrics(Metrics metrics) {
+        this.metrics = metrics;
+        tags = this.metrics.config().tags();
+        flushTimeSensor = newLatencySensor(
+            FLUSH,
+            "Total time producer has spent in flush in nanoseconds."
+        );
+        initTimeSensor = newLatencySensor(
+            TXN_INIT,
+            "Total time producer has spent in initTransactions in nanoseconds."
+        );
+        beginTxnTimeSensor = newLatencySensor(
+            TXN_BEGIN,
+            "Total time producer has spent in beginTransaction in nanoseconds."
+        );
+        sendOffsetsSensor = newLatencySensor(
+            TXN_SEND_OFFSETS,
+            "Total time producer has spent in sendOffsetsToTransaction."

Review comment:
       nit: also add ` in nanoseconds`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -91,6 +94,10 @@ private ThreadMetrics() {}
         "The fraction of time the thread spent on polling records from consumer";
     private static final String COMMIT_RATIO_DESCRIPTION =
         "The fraction of time the thread spent on committing all tasks";
+    private static final String BLOCKED_TIME_DESCRIPTION =
+        "The total time the thread spent blocked on kafka";

Review comment:
       ` in nanoseconds`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688267294



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -590,9 +593,11 @@ else if (acks != -1)
     public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        long now = time.nanoseconds();

Review comment:
       hmm, I know that in selector we use nano seconds, but that should be `time-total-ns`, not sure when that's get changed..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r691098389



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    final double getMetricValue(

Review comment:
       Could you please make this method private? 
   Out of curiosity, why did you define this method as `final`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId,
         return sensor;
     }
 
+    public static void addThreadStartTimeMetric(final String threadId,

Review comment:
       Could you please add tests in `ThreadMetricsTest`?
   A similar test is `ClientMetricsTest#shouldAddVersionMetric()`.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;

Review comment:
       Could you please make this member fields private?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId,
         return sensor;
     }
 
+    public static void addThreadStartTimeMetric(final String threadId,
+                                                final StreamsMetricsImpl streamsMetrics,
+                                                final long startTime) {
+        streamsMetrics.addThreadLevelImmutableMetric(
+            THREAD_START_TIME,
+            THREAD_START_TIME_DESCRIPTION,
+            threadId,
+            startTime
+        );
+    }
+
+    public static void addThreadBlockedTimeMetric(final String threadId,

Review comment:
       Could you please add tests in `ThreadMetricsTest`?
   A similar test is `ClientMetricsTest#shouldAddStateMetric()`

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1125,60 @@ public void shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+        eosBetaStreamsProducer.resetProducer();
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(eosBetaStreamsProducer.totalBlockedTime(), greaterThan(2 * expectedTotalBlocked));

Review comment:
       Do you use `greaterThan()` here because of `double`? Maybe you should use `Matchers.closeTo(double, double)`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##########
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 
 import java.util.Map;
+import org.apache.kafka.streams.processor.internals.StreamsThreadTotalBlockedTime;

Review comment:
       Could you please move this import after line 20? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -299,4 +299,12 @@ private LogContext getLogContext(final TaskId taskId) {
         return new LogContext(logPrefix);
     }
 
+    public double totalProducerBlockedTime() {

Review comment:
       Could you please write a unit test for this method?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1125,60 @@ public void shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+        eosBetaStreamsProducer.resetProducer();

Review comment:
       Could you add `assertThat(eosBetaStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));` above this line?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    final double getMetricValue(
+        final Map<MetricName, ? extends Metric> metrics,
+        final String name) {
+        return metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .findFirst()
+            .map(n -> (Double) metrics.get(n).metricValue())
+            .orElse(0.0);
+    }
+
+    public double getTotalBlockedTime() {

Review comment:
       We do usually not use the `get` prefix in AK. What about renaming the class to `StreamThreadTotalBlockedTime` (note that I removed the `s` in `StreamThread` to match with the `StreamThread` class)  and the method to `compute()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11149:
URL: https://github.com/apache/kafka/pull/11149


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688244445



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+    final Consumer<?, ?> consumer;
+    final Consumer<?, ?> restoreConsumer;
+    final Supplier<Double> producerTotalBlockedTime;
+
+    StreamsThreadTotalBlockedTime(
+        final Consumer<?, ?> consumer,
+        final Consumer<?, ?> restoreConsumer,
+        final Supplier<Double> producerTotalBlockedTime
+    ) {
+        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
+        this.producerTotalBlockedTime = producerTotalBlockedTime;
+    }
+
+    final double getMetricValue(

Review comment:
       I tried doing it this way at first, but found it hard to loop over the producers in `TaskManager/Tasks/ActiveTaskCreator` without breaking those abstractions by adding methods to return the producers so we could get the metrics out. So then I went the route of having the total blocked time metric implementation ask `TaskManager` for it's total blocked time component.
   
   > we can also use this in unit test e.g. https://github.com/apache/kafka/pull/11149/files#diff-599de0f96fbd5ba6b3d919881426269fc72fe8bbe8e2436fab87d9abe84e8dbaR735
   What do you mean here? This is the producer's unit test, and this method is computing total blocked time for a streams app.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r693747330



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
##########
@@ -387,4 +393,51 @@ public void shouldGetCloseTaskSensor() {
 
         assertThat(sensor, is(expectedSensor));
     }
+
+    @Test
+    public void shouldAddThreadStartTimeMetric() {
+        // When:
+        ThreadMetrics.addThreadStartTimeMetric(
+            "bongo",
+            streamsMetrics,
+            123L
+        );
+
+        // Then:
+        verify(streamsMetrics).addThreadLevelImmutableMetric(
+            "thread-start-time",
+            "The time that the thread was started",
+            "bongo",
+            123L
+        );
+    }
+
+    @Test
+    public void shouldAddTotalBlockedTimeMetric() {
+        // Given:
+        final StreamThreadTotalBlockedTime blockedTime = mock(StreamThreadTotalBlockedTime.class);
+        when(blockedTime.compute()).thenReturn(123.45);

Review comment:
       Could you please use a variable with a meaningful name like `startTime` instead of a literal and re-use the variable in the verification?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
##########
@@ -387,4 +393,51 @@ public void shouldGetCloseTaskSensor() {
 
         assertThat(sensor, is(expectedSensor));
     }
+
+    @Test
+    public void shouldAddThreadStartTimeMetric() {
+        // When:
+        ThreadMetrics.addThreadStartTimeMetric(
+            "bongo",
+            streamsMetrics,
+            123L

Review comment:
       Could you please use a variable with a meaningful name like `startTime` instead of a literal and re-use the variable in the verification?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -1209,4 +1211,46 @@ public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() {
 
         verify(sensor);
     }
+
+    @Test
+    public void shouldAddThreadLevelMutableMetric() {
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+        streamsMetrics.addThreadLevelMutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            (c, t) -> 123
+        );
+
+        final MetricName name = metrics.metricName(
+            "foobar",
+            THREAD_LEVEL_GROUP,
+            Collections.singletonMap("thread-id", "t1")
+        );
+        assertThat(metrics.metric(name), notNullValue());
+        assertThat(metrics.metric(name).metricValue(), equalTo(123));
+    }
+
+    @Test
+    public void shouldCleanupThreadLevelMutableMetric() {
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+        streamsMetrics.addThreadLevelMutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            (c, t) -> 123

Review comment:
       Could you please use variables with a meaningful name instead of a literal and re-use the variable in the verification?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
##########
@@ -117,6 +120,15 @@ public void shouldNoOpCloseTaskProducerIfEosDisabled() {
         assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
     }
 
+    @Test
+    public void shouldReturnBlockedTimeWhenThreadProducer() {
+        createTasks();
+        final MockProducer<?, ?> producer = mockClientSupplier.producers.get(0);
+        addMetric(producer, "flush-time-total", 123.0);

Review comment:
       Could you please use a variable with a meaningful name like `blockedTime` instead of a literal and re-use the variable in the verification?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -67,22 +69,35 @@
     private final Map<String, Object> eosV2ProducerConfigs;
     private final KafkaClientSupplier clientSupplier;
     private final StreamThread.ProcessingMode processingMode;
+    private final Time time;
 
     private Producer<byte[], byte[]> producer;
     private boolean transactionInFlight = false;
     private boolean transactionInitialized = false;
+    private double oldProducerTotalBlockedTime = 0;
 
     public StreamsProducer(final StreamsConfig config,
                            final String threadId,
                            final KafkaClientSupplier clientSupplier,
                            final TaskId taskId,
                            final UUID processId,
                            final LogContext logContext) {
+        this(config, threadId, clientSupplier, taskId, processId, logContext, Time.SYSTEM);
+    }

Review comment:
       I think, we should not keep this constructor. It seems to me that we risk to have different time objects for thread/tasks and their producers which has the potential to lead to inconsistent time between these components. If the removal of the constructor makes this PR too large (and I suspect it will), I recommend to make a separate refactoring for this constructor change and get that merged before this PR. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -1209,4 +1211,46 @@ public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() {
 
         verify(sensor);
     }
+
+    @Test
+    public void shouldAddThreadLevelMutableMetric() {
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+        streamsMetrics.addThreadLevelMutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            (c, t) -> 123

Review comment:
       Could you please use variables with a meaningful name instead of a literal and re-use the variable in the verification?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1130,65 @@ public void shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);

Review comment:
       Could you please use variables with a meaningful names instead of literals and re-use the variables in the verification?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1130,65 @@ public void shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), closeTo(expectedTotalBlocked, 0.01));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;

Review comment:
       Could you please use variables with a meaningful names instead of literals and re-use the variables in the verification?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r694676118



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time
         acquireAndEnsureOpen();
         try {
             maybeThrowInvalidGroupIdException();
-            Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+            final Map<TopicPartition, OffsetAndMetadata> offsets;
+            long start = time.nanoseconds();
+            try {
+                offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+            } finally {
+                kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start);
+            }

Review comment:
       Could you please add unit tests for this change?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time
         acquireAndEnsureOpen();
         try {
             maybeThrowInvalidGroupIdException();

Review comment:
       Why do you exclude this check in the measured time here but include it above? Similar applies to `offsets.forEach(this::updateLastSeenEpochIfNewer)`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;

Review comment:
       ```suggestion
   import org.apache.kafka.common.MetricName;
   import org.apache.kafka.common.metrics.Metrics;
   import org.apache.kafka.common.metrics.Sensor;
   import org.apache.kafka.common.metrics.stats.CumulativeSum;
   
   import java.util.Map;
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -699,7 +706,9 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
         throwIfProducerClosed();
         TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
         sender.wakeup();

Review comment:
       Why are those lines not included in the measurement?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
         }
     }
 
+    private double getAndAssertDuration(KafkaProducer<?, ?> producer, String name, double floor) {
+        double value = getMetricValue(producer, name);
+        assertTrue(value > floor);
+        return value;
+    }
+
+    @Test
+    public void testMeasureTransactionDurations() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+            assertTrue(getMetricValue(producer, "txn-init-time-total") > 999999);

Review comment:
       I am not sure I understand this verification. Could you elaborate?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -67,22 +69,35 @@
     private final Map<String, Object> eosV2ProducerConfigs;
     private final KafkaClientSupplier clientSupplier;
     private final StreamThread.ProcessingMode processingMode;
+    private final Time time;
 
     private Producer<byte[], byte[]> producer;
     private boolean transactionInFlight = false;
     private boolean transactionInitialized = false;
+    private double oldProducerTotalBlockedTime = 0;
 
     public StreamsProducer(final StreamsConfig config,
                            final String threadId,
                            final KafkaClientSupplier clientSupplier,
                            final TaskId taskId,
                            final UUID processId,
                            final LogContext logContext) {
+        this(config, threadId, clientSupplier, taskId, processId, logContext, Time.SYSTEM);
+    }

Review comment:
       That is not my point. My point is that the objects that call the constructor, i.e. tasks and threads, have a time object that they use for the their metrics (and probably for other purposes). Now that we also have metrics in the `StreamsProducer` that needs a time object, it is inconsistent to create a new time object in the constructor instead of passing along the time object from tasks and threads into the `StreamProducer`.  

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1493,6 +1494,7 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, fin
                         "committing offsets " + offsets);
             }
         } finally {
+            kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);

Review comment:
       Could you please add unit tests for this change?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+public class KafkaProducerMetrics implements AutoCloseable {

Review comment:
       Could you please add unit tests for this class?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+    final Map<String, String> tags;
+    final Metrics metrics;
+    final Sensor initTimeSensor;
+    final Sensor beginTxnTimeSensor;
+    final Sensor flushTimeSensor;
+    final Sensor sendOffsetsSensor;
+    final Sensor commitTxnSensor;
+    final Sensor abortTxnSensor;

Review comment:
       Could you please specify this member fields as `private`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+    final Map<String, String> tags;
+    final Metrics metrics;
+    final Sensor initTimeSensor;
+    final Sensor beginTxnTimeSensor;
+    final Sensor flushTimeSensor;
+    final Sensor sendOffsetsSensor;
+    final Sensor commitTxnSensor;
+    final Sensor abortTxnSensor;
+
+    public KafkaProducerMetrics(Metrics metrics) {
+        this.metrics = metrics;
+        this.tags = this.metrics.config().tags();
+        this.flushTimeSensor = newLatencySensor(FLUSH);
+        this.initTimeSensor = newLatencySensor(TXN_INIT);
+        this.beginTxnTimeSensor = newLatencySensor(TXN_BEGIN);
+        this.sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS);
+        this.commitTxnSensor = newLatencySensor(TXN_COMMIT);
+        this.abortTxnSensor = newLatencySensor(TXN_ABORT);

Review comment:
       ```suggestion
           this.tags = this.metrics.config().tags();
           this.flushTimeSensor = newLatencySensor(FLUSH);
           initTimeSensor = newLatencySensor(TXN_INIT);
           beginTxnTimeSensor = newLatencySensor(TXN_BEGIN);
           sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS);
           commitTxnSensor = newLatencySensor(TXN_COMMIT);
           abortTxnSensor = newLatencySensor(TXN_ABORT);
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1124,12 +1137,16 @@ private void ensureValidRecordSize(int size) {
     @Override
     public void flush() {
         log.trace("Flushing accumulated records in producer.");
+
         this.accumulator.beginFlush();
         this.sender.wakeup();

Review comment:
       Why are those lines not included in the measurement? They do contribute to the flush time, right?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
##########
@@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
                 metricGroupName,
                 "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."),
                 new Avg());
+
+        this.commitSyncSensor = metrics.sensor("commit-sync-time-total");
+        this.commitSyncSensor.add(
+            metrics.metricName("commit-sync-time-total", metricGroupName),
+            new CumulativeSum()
+        );
+
+        this.committedSensor = metrics.sensor("committed-time-total");
+        this.committedSensor.add(
+            metrics.metricName("committed-time-total", metricGroupName),
+            new CumulativeSum()
+        );

Review comment:
       I think you forgot to remove the sensors in `close()`. I know that there do not exist unit tests for this class, but maybe you should add them. Maybe in a separate PR to not make this PR larger as needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-907375597


   The compilation current failed and @cadonna suggested where we should fix it. We should see one green build before we can merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688242729



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public <T> void addThreadLevelImmutableMetric(final String name,
+        final String description,
+        final String threadId,
+        final T value) {
+        final MetricName metricName = metrics.metricName(
+            name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
+        synchronized (threadLevelMetrics) {

Review comment:
       not sure I follow the question here - we are using `threadLevelMetrics` to track the metrics for each thread so they can be cleaned up later on when the thread exits. What's wrong with using the thread id for that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11149:
URL: https://github.com/apache/kafka/pull/11149


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r697736171



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -269,6 +300,12 @@ public final void removeAllThreadLevelSensors(final String threadId) {
                 metrics.removeSensor(sensors.pop());
             }
         }
+        synchronized (threadLevelMetrics) {

Review comment:
       Ah right, the start time / total blocked time are registered as metrics directly.
   
   In that case could you separate them and put this as in `removeAllThreadLevelMetrics` just like the client-level registration/removal above?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org