You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/08 17:58:31 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

wcarlson5 commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485098403



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", stateSnapshot);
+
+                // transit to restore active is idempotent so we can call it multiple times
+                changelogReader.enforceRestoreActive();
+
+                if (taskManager.tryToCompleteRestoration()) {
+                    changelogReader.transitToUpdateStandby();
+
+                    setState(State.RUNNING);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization and restore call done. State is {}", state);
+                }
+            }
+        }
+
+        log.debug("Invoking ChangeLogReader#restore");

Review comment:
       Is this necessary with the logs inside restore()? 
   
   maybe can include snapshotState so we can see if it's STARTING or RUNNING? because we don't see the state unless it enters the initialization. Not sure if this would be useful

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -706,13 +662,17 @@ void runOnce() {
                     totalProcessed += processed;
                 }
 
+                log.debug("TaskManager#process handled {} records; invoking TaskManager#punctuate", processed);
+
                 final int punctuated = taskManager.punctuate();
                 final long punctuateLatency = advanceNowAndComputeLatency();
                 totalPunctuateLatency += punctuateLatency;
                 if (punctuated > 0) {
                     punctuateSensor.record(punctuateLatency / (double) punctuated, now);
                 }
 
+                log.debug("TaskManager#punctuate executed: {}", punctuated);

Review comment:
       trace?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org