You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "guozhangwang (via GitHub)" <gi...@apache.org> on 2023/02/21 23:42:05 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

guozhangwang commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1113677550


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -109,17 +136,41 @@ public void run() {
                 Thread.interrupted(); // Clear the interrupted flag.
                 removeAddedTasksFromInputQueue();
                 removeUpdatingAndPausedTasks();
+                updaterMetrics.clear();
                 shutdownGate.countDown();
                 log.info("State updater thread shutdown");
             }
         }
 
+        // In each iteration:
+        //   1) check if updating tasks need to be paused
+        //   2) check if paused tasks need to be resumed
+        //   3) restore those updating tasks
+        //   4) checkpoint those updating task states
+        //   5) idle waiting if there is no more tasks to be restored
+        //
+        //   Note that, 1-3) are measured as restoring time, while 4) and 5) measured separately
+        //   as checkpointing time and idle time
         private void runOnce() throws InterruptedException {
+            final long totalStartTimeMs = time.milliseconds();
             performActionsOnTasks();
+
             resumeTasks();
-            restoreTasks();
-            checkAllUpdatingTaskStates(time.milliseconds());
+            pauseTasks();

Review Comment:
   This is related to 0) in PR description.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -423,49 +428,22 @@ public Set<TopicPartition> completedChangelogs() {
     // 2. if all changelogs have finished, return early;
     // 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
     @Override
-    public void restore(final Map<TaskId, Task> tasks) {
-
-        // If we are updating only standby tasks, and are not using a separate thread, we should
-        // use a non-blocking poll to unblock the processing as soon as possible.
-        final boolean useNonBlockingPoll = state == ChangelogReaderState.STANDBY_UPDATING && !stateUpdaterEnabled;
-
+    public long restore(final Map<TaskId, Task> tasks) {
         initializeChangelogs(tasks, registeredChangelogs());
 
         if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) {
             throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs");
         }
 
+        long totalRestored = 0L;
         if (allChangelogsCompleted()) {
             log.debug("Finished restoring all changelogs {}", changelogs.keySet());
-            return;
+            return totalRestored;
         }
 
         final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
         if (!restoringChangelogs.isEmpty()) {
-            final ConsumerRecords<byte[], byte[]> polledRecords;
-
-            try {
-                pauseResumePartitions(tasks, restoringChangelogs);
-
-                polledRecords = restoreConsumer.poll(useNonBlockingPoll ? Duration.ZERO : pollTime);
-
-                // TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ?
-                // TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ?
-            } catch (final InvalidOffsetException e) {
-                log.warn("Encountered " + e.getClass().getName() +
-                    " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
-                    "the consumer's position has fallen out of the topic partition offset range because the topic was " +
-                    "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
-                    " it later.", e);
-
-                final Set<TaskId> corruptedTasks = new HashSet<>();
-                e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
-                throw new TaskCorruptedException(corruptedTasks, e);
-            } catch (final InterruptException interruptException) {
-                throw interruptException;
-            } catch (final KafkaException e) {
-                throw new StreamsException("Restore consumer get unexpected error polling records.", e);
-            }
+            final ConsumerRecords<byte[], byte[]> polledRecords = pollRecordsFromRestoreConsumer(tasks, restoringChangelogs);

Review Comment:
   This is to avoid checkstyle rules on func complexity, without much logical change (except letting `restoreChangelog` to return restored records count).



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -650,7 +664,7 @@ private boolean restoreChangelog(final ChangelogMetadata changelogMetadata) {
 
             final Long currentOffset = storeMetadata.offset();
             log.trace("Restored {} records from changelog {} to store {}, end offset is {}, current offset is {}",
-                partition, storeName, numRecords, recordEndOffset(changelogMetadata.restoreEndOffset), currentOffset);
+                numRecords, partition, storeName, recordEndOffset(changelogMetadata.restoreEndOffset), currentOffset);

Review Comment:
   Some minor logging fixes piggy-backed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org