You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/08 19:19:25 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

guozhangwang commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485132348



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -706,13 +662,17 @@ void runOnce() {
                     totalProcessed += processed;
                 }
 
+                log.debug("TaskManager#process handled {} records; invoking TaskManager#punctuate", processed);

Review comment:
       Nit: I'd suggest we do not expose internal class names in log entries, e.g. here we can say "Processed {} records with {} iterations, invoking punctuation now", ditto below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -689,6 +644,7 @@ void runOnce() {
              *  6. Otherwise, increment N.
              */
             do {
+                log.debug("Invoking TaskManager#process with {} iterations.", numIterations);

Review comment:
       What's the rationale of recording both the starting and the ending of a procedure? If it is for trouble shooting purposes only maybe the starting log entry can be trace while ending entry is debug?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", stateSnapshot);
+
+                // transit to restore active is idempotent so we can call it multiple times
+                changelogReader.enforceRestoreActive();
+
+                if (taskManager.tryToCompleteRestoration()) {
+                    changelogReader.transitToUpdateStandby();
+
+                    setState(State.RUNNING);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization and restore call done. State is {}", state);
+                }
+            }
+        }
+
+        log.debug("Invoking ChangeLogReader#restore");
+        // we can always let changelog reader try restoring in order to initialize the changelogs;
+        // if there's no active restoring or standby updating it would not try to fetch any data
+        changelogReader.restore();
+    }
+
+    private long pollPhase() {
+        final ConsumerRecords<byte[], byte[]> records;
+        log.debug("Invoking Consumer#poll");
+
+        if (state == State.PARTITIONS_ASSIGNED) {
+            // try to fetch some records with zero poll millis
+            // to unblock the restoration as soon as possible
+            records = pollRequests(Duration.ZERO);
+        } else if (state == State.PARTITIONS_REVOKED) {
+            // try to fetch som records with zero poll millis to unblock
+            // other useful work while waiting for the join response
+            records = pollRequests(Duration.ZERO);
+        } else if (state == State.RUNNING || state == State.STARTING) {
+            // try to fetch some records with normal poll time
+            // in order to get long polling
+            records = pollRequests(pollTime);
+        } else if (state == State.PENDING_SHUTDOWN) {
+            // we are only here because there's rebalance in progress,
+            // just poll with zero to complete it
+            records = pollRequests(Duration.ZERO);
+        } else {
+            // any other state should not happen
+            log.error("Unexpected state {} during normal iteration", state);
+            throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
+        }
+
+        final long pollLatency = advanceNowAndComputeLatency();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Consumer#poll completed in {} ms and fetched {} records", pollLatency, records.count());
+        }
+        pollSensor.record(pollLatency, now);
+
+        if (!records.isEmpty()) {

Review comment:
       SG.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", stateSnapshot);
+
+                // transit to restore active is idempotent so we can call it multiple times
+                changelogReader.enforceRestoreActive();
+
+                if (taskManager.tryToCompleteRestoration()) {
+                    changelogReader.transitToUpdateStandby();
+
+                    setState(State.RUNNING);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization and restore call done. State is {}", state);
+                }
+            }
+        }
+
+        log.debug("Invoking ChangeLogReader#restore");

Review comment:
       Not sure what's the purpose of this log entry?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org