You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "lucasbru (via GitHub)" <gi...@apache.org> on 2023/02/22 14:02:49 UTC

[GitHub] [kafka] lucasbru commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

lucasbru commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1114354933


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -151,9 +202,18 @@ private void resumeTasks() {
             }
         }
 
-        private void restoreTasks() {
+        private void pauseTasks() {
+            for (final Task task : updatingTasks.values()) {

Review Comment:
   Not sure, but is there any performance concern around running this loop in every single iteration of `runOnce` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -598,6 +685,12 @@ public Set<StandbyTask> getUpdatingStandbyTasks() {
             : Collections.emptySet();
     }
 
+    public Set<StreamTask> getUpdatingActiveTasks() {
+        return stateUpdaterThread != null

Review Comment:
   As I understand it, this function will be called quite frequently to export metrics. We only need the size of the collection. It could make sense to avoid the allocations here and just implement a `getNumberOfUpdaingActiveTasks` as a non-essential but free optimization. Similar for `getPausedStandbyTasks` etc. pp.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -399,31 +459,56 @@ private void addToRestoredTasks(final StreamTask task) {
             }
         }
 
-        private void checkAllUpdatingTaskStates(final long now) {
+        private void maybeCheckpointTasks(final long now) {
             final long elapsedMsSinceLastCommit = now - lastCommitMs;
             if (elapsedMsSinceLastCommit > commitIntervalMs) {
                 if (log.isDebugEnabled()) {
                     log.debug("Checking all restoring task states since {}ms has elapsed (commit interval is {}ms)",

Review Comment:
   Update the log message as well. This function isn't really checking task states anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org