You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/08 17:13:35 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

vvcephei commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485062295



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -612,63 +612,18 @@ void runOnce() {
         final long startMs = time.milliseconds();
         now = startMs;
 
-        if (state == State.PARTITIONS_ASSIGNED) {
-            // try to fetch some records with zero poll millis
-            // to unblock the restoration as soon as possible
-            records = pollRequests(Duration.ZERO);
-        } else if (state == State.PARTITIONS_REVOKED) {
-            // try to fetch som records with zero poll millis to unblock
-            // other useful work while waiting for the join response
-            records = pollRequests(Duration.ZERO);
-        } else if (state == State.RUNNING || state == State.STARTING) {
-            // try to fetch some records with normal poll time
-            // in order to get long polling
-            records = pollRequests(pollTime);
-        } else if (state == State.PENDING_SHUTDOWN) {
-            // we are only here because there's rebalance in progress,
-            // just poll with zero to complete it
-            records = pollRequests(Duration.ZERO);
-        } else {
-            // any other state should not happen
-            log.error("Unexpected state {} during normal iteration", state);
-            throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
-        }
-
-        final long pollLatency = advanceNowAndComputeLatency();
-
-        pollSensor.record(pollLatency, now);
-        if (records != null && !records.isEmpty()) {
-            pollRecordsSensor.record(records.count(), now);
-            taskManager.addRecordsToTasks(records);
-        }
+        final long pollLatency = pollPhase();

Review comment:
       `runOnce` was too long, according to checkStyle, so I factored out some of the execution phases.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -612,63 +612,18 @@ void runOnce() {
         final long startMs = time.milliseconds();
         now = startMs;
 
-        if (state == State.PARTITIONS_ASSIGNED) {
-            // try to fetch some records with zero poll millis
-            // to unblock the restoration as soon as possible
-            records = pollRequests(Duration.ZERO);
-        } else if (state == State.PARTITIONS_REVOKED) {
-            // try to fetch som records with zero poll millis to unblock
-            // other useful work while waiting for the join response
-            records = pollRequests(Duration.ZERO);
-        } else if (state == State.RUNNING || state == State.STARTING) {
-            // try to fetch some records with normal poll time
-            // in order to get long polling
-            records = pollRequests(pollTime);
-        } else if (state == State.PENDING_SHUTDOWN) {
-            // we are only here because there's rebalance in progress,
-            // just poll with zero to complete it
-            records = pollRequests(Duration.ZERO);
-        } else {
-            // any other state should not happen
-            log.error("Unexpected state {} during normal iteration", state);
-            throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
-        }
-
-        final long pollLatency = advanceNowAndComputeLatency();
-
-        pollSensor.record(pollLatency, now);
-        if (records != null && !records.isEmpty()) {
-            pollRecordsSensor.record(records.count(), now);
-            taskManager.addRecordsToTasks(records);
-        }
+        final long pollLatency = pollPhase();
 
         // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests().
         // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned().
         // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation
         // could affect the task manager state beyond this point within #runOnce().
         if (!isRunning()) {
-            log.debug("State already transits to {}, skipping the run once call after poll request", state);
+            log.debug("Thread state is already {}, skipping the run once call after poll request", state);

Review comment:
       Just a slight rewording I thought could be clearer.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", stateSnapshot);
+
+                // transit to restore active is idempotent so we can call it multiple times
+                changelogReader.enforceRestoreActive();
+
+                if (taskManager.tryToCompleteRestoration()) {
+                    changelogReader.transitToUpdateStandby();
+
+                    setState(State.RUNNING);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization and restore call done. State is {}", state);
+                }

Review comment:
       This time, we really need to read the field, so I'm guarding the method call for the common case in which debug is off.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", stateSnapshot);
+
+                // transit to restore active is idempotent so we can call it multiple times
+                changelogReader.enforceRestoreActive();
+
+                if (taskManager.tryToCompleteRestoration()) {
+                    changelogReader.transitToUpdateStandby();
+
+                    setState(State.RUNNING);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization and restore call done. State is {}", state);
+                }
+            }
+        }
+
+        log.debug("Invoking ChangeLogReader#restore");
+        // we can always let changelog reader try restoring in order to initialize the changelogs;
+        // if there's no active restoring or standby updating it would not try to fetch any data
+        changelogReader.restore();
+    }
+
+    private long pollPhase() {
+        final ConsumerRecords<byte[], byte[]> records;
+        log.debug("Invoking Consumer#poll");
+
+        if (state == State.PARTITIONS_ASSIGNED) {
+            // try to fetch some records with zero poll millis
+            // to unblock the restoration as soon as possible
+            records = pollRequests(Duration.ZERO);
+        } else if (state == State.PARTITIONS_REVOKED) {
+            // try to fetch som records with zero poll millis to unblock
+            // other useful work while waiting for the join response
+            records = pollRequests(Duration.ZERO);
+        } else if (state == State.RUNNING || state == State.STARTING) {
+            // try to fetch some records with normal poll time
+            // in order to get long polling
+            records = pollRequests(pollTime);
+        } else if (state == State.PENDING_SHUTDOWN) {
+            // we are only here because there's rebalance in progress,
+            // just poll with zero to complete it
+            records = pollRequests(Duration.ZERO);
+        } else {
+            // any other state should not happen
+            log.error("Unexpected state {} during normal iteration", state);
+            throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
+        }
+
+        final long pollLatency = advanceNowAndComputeLatency();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Consumer#poll completed in {} ms and fetched {} records", pollLatency, records.count());
+        }
+        pollSensor.record(pollLatency, now);
+
+        if (!records.isEmpty()) {

Review comment:
       Note, this was previously `records != null && !records.isEmpty()`:
   https://github.com/apache/kafka/pull/9267/files#diff-045aeaddb4232a85a8560186b4901e69L640
   
   However, `records` can never be null, except if `Consumer#poll` returns `null`, which it does not. It turned out the reason for checking this condition was that there was exactly one test that relied on a nice mock returning `null`. I fixed the test below.
   
   Note, the only reason I messed with this was to simplify the debug log message on L775. Otherwise, I'd have needed to think of what to say if `records` were `null`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", stateSnapshot);
+
+                // transit to restore active is idempotent so we can call it multiple times
+                changelogReader.enforceRestoreActive();
+
+                if (taskManager.tryToCompleteRestoration()) {
+                    changelogReader.transitToUpdateStandby();
+
+                    setState(State.RUNNING);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization and restore call done. State is {}", state);
+                }
+            }
+        }
+
+        log.debug("Invoking ChangeLogReader#restore");
+        // we can always let changelog reader try restoring in order to initialize the changelogs;
+        // if there's no active restoring or standby updating it would not try to fetch any data
+        changelogReader.restore();
+    }
+
+    private long pollPhase() {
+        final ConsumerRecords<byte[], byte[]> records;
+        log.debug("Invoking Consumer#poll");
+
+        if (state == State.PARTITIONS_ASSIGNED) {
+            // try to fetch some records with zero poll millis
+            // to unblock the restoration as soon as possible
+            records = pollRequests(Duration.ZERO);
+        } else if (state == State.PARTITIONS_REVOKED) {
+            // try to fetch som records with zero poll millis to unblock
+            // other useful work while waiting for the join response
+            records = pollRequests(Duration.ZERO);
+        } else if (state == State.RUNNING || state == State.STARTING) {
+            // try to fetch some records with normal poll time
+            // in order to get long polling
+            records = pollRequests(pollTime);
+        } else if (state == State.PENDING_SHUTDOWN) {
+            // we are only here because there's rebalance in progress,
+            // just poll with zero to complete it
+            records = pollRequests(Duration.ZERO);
+        } else {
+            // any other state should not happen
+            log.error("Unexpected state {} during normal iteration", state);
+            throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
+        }
+
+        final long pollLatency = advanceNowAndComputeLatency();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Consumer#poll completed in {} ms and fetched {} records", pollLatency, records.count());
+        }

Review comment:
       Guarded to avoid calling `records.count()` when debug is off.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -845,8 +876,8 @@ private void addToResetList(final TopicPartition partition, final Set<TopicParti
     int maybeCommit() {
         final int committed;
         if (now - lastCommitMs > commitTimeMs) {
-            if (log.isTraceEnabled()) {
-                log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)",
+            if (log.isDebugEnabled()) {
+                log.debug("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)",

Review comment:
       Switched these to debug now, since they seem to fit with the newly added logs.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", stateSnapshot);

Review comment:
       I wanted to make all my new `debug` statements zero-cost if debug isn't enabled. Since `state` is volatile, resolving the arguments for this call would result in an uncached read of main memory regardless of the log level. Since we don't expect state to actually change in these few lines, I just read it once and saved it in `stateSnapshot`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -760,7 +791,7 @@ void runOnce() {
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) {
-        ConsumerRecords<byte[], byte[]> records = null;
+        ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();

Review comment:
       Not necessary, but also not harmful, since it's a static final instance anyway. I thought it was nicer for self-documentation this way.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -496,6 +498,7 @@ public void shouldEnforceRebalanceAfterNextScheduledProbingRebalanceTime() throw
         );
         
         final Consumer<byte[], byte[]> mockConsumer = EasyMock.createNiceMock(Consumer.class);
+        expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty());

Review comment:
       This was a bad input for the test, since `poll` can never actually return `null`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org