You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "guozhangwang (via GitHub)" <gi...@apache.org> on 2023/02/24 19:32:17 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #13300: KAFKA-10199: Add task updater metrics, part 2

guozhangwang opened a new pull request, #13300:
URL: https://github.com/apache/kafka/pull/13300

   1. Added task-level metrics: 
     * restore-rate, restore-total (active)
     * update-rate, update-total (standby)
     * restore-remaining-records-total (active)
   
   2. Fixed some naming confusions in the XXXMetrics classes, especially for distinguishing sensor name constructs and metric name constructs.
   
   3. Add related unit tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1117540370


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java:
##########
@@ -863,45 +864,15 @@ public static void addTotalCountAndSumMetricsToSensor(final Sensor sensor,
         );
     }
 
-    public static void maybeMeasureLatency(final Runnable actionToMeasure,

Review Comment:
   Moved these util functions to `AbstractTask` since we need them for both standby tasks and active tasks now.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,6 +987,11 @@ private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToResto
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogs.get(partition).stateManager.taskId();
+                final StreamTask task = (StreamTask) tasks.get(taskId);
+                final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
+                task.recordRestoreRemaining(time, recordsToRestore);

Review Comment:
   The logic for measuring remaining records is a bit complex: we first aggregate the total amount of records to restore across all changelog partitions at the beginning when initializing the changelogs; and then during restoration we keep decrementing by the number of restored records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1128785406


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,6 +987,11 @@ private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToResto
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogs.get(partition).stateManager.taskId();
+                final StreamTask task = (StreamTask) tasks.get(taskId);
+                final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);

Review Comment:
   Not very compelling reasons, I just want to make sure we do not start with a negative number, but I cannot think of a case that it could be negative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13300:
URL: https://github.com/apache/kafka/pull/13300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1128784726


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -457,15 +457,9 @@ public long restore(final Map<TaskId, Task> tasks) {
                 //       small batches; this can be optimized in the future, e.g. wait longer for larger batches.
                 final TaskId taskId = changelogs.get(partition).stateManager.taskId();
                 try {
+                    final Task task = tasks.get(taskId);
                     final ChangelogMetadata changelogMetadata = changelogs.get(partition);
-                    final int restored = restoreChangelog(changelogMetadata);
-                    if (restored > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) {
-                        final Task task = tasks.get(taskId);
-                        if (task != null) {

Review Comment:
   I pondered on the code and I think it should not be `null` ever? Please correct me if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1157869349


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -457,15 +457,9 @@ public long restore(final Map<TaskId, Task> tasks) {
                 //       small batches; this can be optimized in the future, e.g. wait longer for larger batches.
                 final TaskId taskId = changelogs.get(partition).stateManager.taskId();
                 try {
+                    final Task task = tasks.get(taskId);
                     final ChangelogMetadata changelogMetadata = changelogs.get(partition);
-                    final int restored = restoreChangelog(changelogMetadata);
-                    if (restored > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) {
-                        final Task task = tasks.get(taskId);
-                        if (task != null) {

Review Comment:
   Was just wondering. It seemed to be unrelated to this PR, and so I just assumed there must be a reason for having the `null` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1157835624


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,6 +987,11 @@ private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToResto
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogs.get(partition).stateManager.taskId();
+                final StreamTask task = (StreamTask) tasks.get(taskId);
+                final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);

Review Comment:
   I've added a comment to cover the very edge case when it could become negative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1128784269


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,6 +987,11 @@ private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToResto
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogs.get(partition).stateManager.taskId();
+                final StreamTask task = (StreamTask) tasks.get(taskId);
+                final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
+                task.recordRestoreRemaining(time, recordsToRestore);

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1157835233


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java:
##########
@@ -52,6 +55,21 @@ private TaskMetrics() {}
     private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION + PUNCTUATE_DESCRIPTION;
     private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION + PUNCTUATE_DESCRIPTION;
 
+    private static final String RESTORE = "restore";

Review Comment:
   The string is not only used as part of the metric name, but also used as part of the sensor name (suffix to be more specific). In our KIP proposal it's defined as, e.g. `restore-rate | total` so it's correct to be defined as `restore/update`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -457,15 +457,9 @@ public long restore(final Map<TaskId, Task> tasks) {
                 //       small batches; this can be optimized in the future, e.g. wait longer for larger batches.
                 final TaskId taskId = changelogs.get(partition).stateManager.taskId();
                 try {
+                    final Task task = tasks.get(taskId);
                     final ChangelogMetadata changelogMetadata = changelogs.get(partition);
-                    final int restored = restoreChangelog(changelogMetadata);
-                    if (restored > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) {
-                        final Task task = tasks.get(taskId);
-                        if (task != null) {

Review Comment:
   @mjsax LMK what do you think? I may lack some background here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1123947731


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,6 +987,11 @@ private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToResto
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogs.get(partition).stateManager.taskId();
+                final StreamTask task = (StreamTask) tasks.get(taskId);
+                final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);

Review Comment:
   Why do we need to apply `max()` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -457,15 +457,9 @@ public long restore(final Map<TaskId, Task> tasks) {
                 //       small batches; this can be optimized in the future, e.g. wait longer for larger batches.
                 final TaskId taskId = changelogs.get(partition).stateManager.taskId();
                 try {
+                    final Task task = tasks.get(taskId);
                     final ChangelogMetadata changelogMetadata = changelogs.get(partition);
-                    final int restored = restoreChangelog(changelogMetadata);
-                    if (restored > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) {
-                        final Task task = tasks.get(taskId);
-                        if (task != null) {

Review Comment:
   Why do we remove this `null` check?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,6 +987,11 @@ private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToResto
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogs.get(partition).stateManager.taskId();
+                final StreamTask task = (StreamTask) tasks.get(taskId);
+                final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
+                task.recordRestoreRemaining(time, recordsToRestore);

Review Comment:
   Would it be helpful to rename this method to `initRestoreRemaining` to make it's purpuse clear (and/or add a JavaDoc to the method in `StreamTask`)?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java:
##########
@@ -52,6 +55,21 @@ private TaskMetrics() {}
     private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION + PUNCTUATE_DESCRIPTION;
     private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION + PUNCTUATE_DESCRIPTION;
 
+    private static final String RESTORE = "restore";
+    private static final String RESTORE_DESCRIPTION = "records restored";
+    private static final String RESTORE_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + RESTORE_DESCRIPTION;
+    private static final String RESTORE_RATE_DESCRIPTION =
+        RATE_DESCRIPTION_PREFIX + RESTORE_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+
+    private static final String UPDATE = "update";

Review Comment:
   `update` or `updated`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java:
##########
@@ -52,6 +55,21 @@ private TaskMetrics() {}
     private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION + PUNCTUATE_DESCRIPTION;
     private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION + PUNCTUATE_DESCRIPTION;
 
+    private static final String RESTORE = "restore";

Review Comment:
   `restore` or `restored` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1117803154


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java:
##########
@@ -767,20 +768,6 @@ public static void addInvocationRateToSensor(final Sensor sensor,
         );
     }
 
-    public static void addInvocationRateAndCountToSensor(final Sensor sensor,

Review Comment:
   This function is not used in prod code, hence cleaning it up.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java:
##########
@@ -213,7 +272,7 @@ public static Sensor recordLatenessSensor(final String threadId,
     public static Sensor droppedRecordsSensor(final String threadId,
                                               final String taskId,
                                               final StreamsMetricsImpl streamsMetrics) {
-        return invocationRateAndCountSensor(
+        return invocationRateAndTotalSensor(

Review Comment:
   This is a piggy-backed metric fix: we should use cumulativeSum than cumulativeCount for dropped records, even though today with most callees as `sensor.record()` it is effectively the same as it only increment by 1, it is still vulnerable in case we record a non-one value in the future.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java:
##########
@@ -146,13 +144,6 @@ private StateStoreMetrics() {}
     private static final String SUPPRESSION_BUFFER_SIZE_MAX_DESCRIPTION =
         MAX_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_SIZE_DESCRIPTION;
 
-    private static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop";

Review Comment:
   This metric is removed as part of KIP-743, and it's only used in tests (which I also cleaned up as a piggy-back).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1117540370


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java:
##########
@@ -863,45 +864,15 @@ public static void addTotalCountAndSumMetricsToSensor(final Sensor sensor,
         );
     }
 
-    public static void maybeMeasureLatency(final Runnable actionToMeasure,

Review Comment:
   Moved these util functions to `AbstractTask` since we need them for both standby tasks and active tasks now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org