You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "guozhangwang (via GitHub)" <gi...@apache.org> on 2023/02/10 02:18:58 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #13228: [DRAFT] KAFKA-10199: Add task updater metrics, part 1

guozhangwang opened a new pull request, #13228:
URL: https://github.com/apache/kafka/pull/13228

   Should only be reviewed after #13025
   
   1. Added thread-level restoration metrics.
   2. Related unit tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1114660189


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -598,6 +685,12 @@ public Set<StandbyTask> getUpdatingStandbyTasks() {
             : Collections.emptySet();
     }
 
+    public Set<StreamTask> getUpdatingActiveTasks() {
+        return stateUpdaterThread != null

Review Comment:
   That's a good point, will update.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -151,9 +202,18 @@ private void resumeTasks() {
             }
         }
 
-        private void restoreTasks() {
+        private void pauseTasks() {
+            for (final Task task : updatingTasks.values()) {

Review Comment:
   I think the perf impact should be small since pause/resume are not commonly used, and if the named topology are not paused, then checking the status is just a few cpu cycles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #13228: KAFKA-10199: Add task updater metrics, part 1

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang merged PR #13228:
URL: https://github.com/apache/kafka/pull/13228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13228: [DRAFT] KAFKA-10199: Add task updater metrics, part 1

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13228:
URL: https://github.com/apache/kafka/pull/13228#issuecomment-1425085767

   cc @lucasbru , would update after #13025 is done, and the pausing logic is extracted out of the `checkAllUpdatingTaskStates` function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

Posted by "lucasbru (via GitHub)" <gi...@apache.org>.
lucasbru commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1114354933


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -151,9 +202,18 @@ private void resumeTasks() {
             }
         }
 
-        private void restoreTasks() {
+        private void pauseTasks() {
+            for (final Task task : updatingTasks.values()) {

Review Comment:
   Not sure, but is there any performance concern around running this loop in every single iteration of `runOnce` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -598,6 +685,12 @@ public Set<StandbyTask> getUpdatingStandbyTasks() {
             : Collections.emptySet();
     }
 
+    public Set<StreamTask> getUpdatingActiveTasks() {
+        return stateUpdaterThread != null

Review Comment:
   As I understand it, this function will be called quite frequently to export metrics. We only need the size of the collection. It could make sense to avoid the allocations here and just implement a `getNumberOfUpdaingActiveTasks` as a non-essential but free optimization. Similar for `getPausedStandbyTasks` etc. pp.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -399,31 +459,56 @@ private void addToRestoredTasks(final StreamTask task) {
             }
         }
 
-        private void checkAllUpdatingTaskStates(final long now) {
+        private void maybeCheckpointTasks(final long now) {
             final long elapsedMsSinceLastCommit = now - lastCommitMs;
             if (elapsedMsSinceLastCommit > commitIntervalMs) {
                 if (log.isDebugEnabled()) {
                     log.debug("Checking all restoring task states since {}ms has elapsed (commit interval is {}ms)",

Review Comment:
   Update the log message as well. This function isn't really checking task states anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1114660400


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -151,9 +202,18 @@ private void resumeTasks() {
             }
         }
 
-        private void restoreTasks() {
+        private void pauseTasks() {
+            for (final Task task : updatingTasks.values()) {

Review Comment:
   I think the perf impact should be small since pause/resume are not commonly used, and if the named topology are not paused, then checking the status is just a few cpu cycles.
   
   Another motivation is that if we remove named topologies, than pausing / resuming would always be impact on all tasks, in which case we could have a simpler check (e.g. just check a single flag) which would be even cheaper. So it's probably better to maintain the code layout in this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1113677550


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -109,17 +136,41 @@ public void run() {
                 Thread.interrupted(); // Clear the interrupted flag.
                 removeAddedTasksFromInputQueue();
                 removeUpdatingAndPausedTasks();
+                updaterMetrics.clear();
                 shutdownGate.countDown();
                 log.info("State updater thread shutdown");
             }
         }
 
+        // In each iteration:
+        //   1) check if updating tasks need to be paused
+        //   2) check if paused tasks need to be resumed
+        //   3) restore those updating tasks
+        //   4) checkpoint those updating task states
+        //   5) idle waiting if there is no more tasks to be restored
+        //
+        //   Note that, 1-3) are measured as restoring time, while 4) and 5) measured separately
+        //   as checkpointing time and idle time
         private void runOnce() throws InterruptedException {
+            final long totalStartTimeMs = time.milliseconds();
             performActionsOnTasks();
+
             resumeTasks();
-            restoreTasks();
-            checkAllUpdatingTaskStates(time.milliseconds());
+            pauseTasks();

Review Comment:
   This is related to 0) in PR description.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -423,49 +428,22 @@ public Set<TopicPartition> completedChangelogs() {
     // 2. if all changelogs have finished, return early;
     // 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
     @Override
-    public void restore(final Map<TaskId, Task> tasks) {
-
-        // If we are updating only standby tasks, and are not using a separate thread, we should
-        // use a non-blocking poll to unblock the processing as soon as possible.
-        final boolean useNonBlockingPoll = state == ChangelogReaderState.STANDBY_UPDATING && !stateUpdaterEnabled;
-
+    public long restore(final Map<TaskId, Task> tasks) {
         initializeChangelogs(tasks, registeredChangelogs());
 
         if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) {
             throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs");
         }
 
+        long totalRestored = 0L;
         if (allChangelogsCompleted()) {
             log.debug("Finished restoring all changelogs {}", changelogs.keySet());
-            return;
+            return totalRestored;
         }
 
         final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
         if (!restoringChangelogs.isEmpty()) {
-            final ConsumerRecords<byte[], byte[]> polledRecords;
-
-            try {
-                pauseResumePartitions(tasks, restoringChangelogs);
-
-                polledRecords = restoreConsumer.poll(useNonBlockingPoll ? Duration.ZERO : pollTime);
-
-                // TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ?
-                // TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ?
-            } catch (final InvalidOffsetException e) {
-                log.warn("Encountered " + e.getClass().getName() +
-                    " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
-                    "the consumer's position has fallen out of the topic partition offset range because the topic was " +
-                    "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
-                    " it later.", e);
-
-                final Set<TaskId> corruptedTasks = new HashSet<>();
-                e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
-                throw new TaskCorruptedException(corruptedTasks, e);
-            } catch (final InterruptException interruptException) {
-                throw interruptException;
-            } catch (final KafkaException e) {
-                throw new StreamsException("Restore consumer get unexpected error polling records.", e);
-            }
+            final ConsumerRecords<byte[], byte[]> polledRecords = pollRecordsFromRestoreConsumer(tasks, restoringChangelogs);

Review Comment:
   This is to avoid checkstyle rules on func complexity, without much logical change (except letting `restoreChangelog` to return restored records count).



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -650,7 +664,7 @@ private boolean restoreChangelog(final ChangelogMetadata changelogMetadata) {
 
             final Long currentOffset = storeMetadata.offset();
             log.trace("Restored {} records from changelog {} to store {}, end offset is {}, current offset is {}",
-                partition, storeName, numRecords, recordEndOffset(changelogMetadata.restoreEndOffset), currentOffset);
+                numRecords, partition, storeName, recordEndOffset(changelogMetadata.restoreEndOffset), currentOffset);

Review Comment:
   Some minor logging fixes piggy-backed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org