You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "guozhangwang (via GitHub)" <gi...@apache.org> on 2023/04/06 19:01:42 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #13523: MINOR: Follow-up on failing streams test

guozhangwang opened a new pull request, #13523:
URL: https://github.com/apache/kafka/pull/13523

   1) I've verified and made sure the only case that `task` would be null and not stream task would be in testing code only; I've revamped the restoration recording func, mainly to make just one method on the Task interface, to make sure we would never get `task == null` and do not need to cast to `StreamTask`.
   2) Use `numRecords` directly to avoid calling `records.size()` again that triggers concurrent modifications.
   3) Rewrite the TaskMetricsTest to not use the removed impl functions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1162102863


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -880,6 +896,9 @@ private void initializeChangelogs(final Map<TaskId, Task> tasks,
     }
 
     private void addChangelogsToRestoreConsumer(final Set<TopicPartition> partitions) {
+        if (partitions.isEmpty())

Review Comment:
   Yes that's right, but as long as `partitions.isEmpty()` then we could actually skip the whole func, so I made it applied at larger scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1162107892


##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -188,12 +188,11 @@ public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnab
         kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
         kafkaStreams.pause();
         kafkaStreams.start();
-        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+        waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
         assertTrue(kafkaStreams.isPaused());
 
         produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
-        waitUntilStreamsHasPolled(kafkaStreams, 2);

Review Comment:
   Sorry my bad, it should still be there, will add back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13523: [DO NOT MERGE] MINOR: Follow-up on failing streams test

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1500663315

   Thanks @lihaosky  @mjsax , I've refactored the PR to go with option 2), primarily:
   
   * Make sure state-updater enabled would also block the main thread from transiting to RUNNING if the restoring active tasks are paused. 
   * Modify the `PauseResumeIntegrationTest` to reflect the option 2).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1503820002

   @mjsax could you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "lucasbru (via GitHub)" <gi...@apache.org>.
lucasbru commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1503216778

   I agree that it makes sense to stick with behavior 2 for the time being, and make a change later on if it simplifies the code now. But long-term, I wouldn't be super happy with this situation. In the end, the instance state's most important job is to inform the user, and I find would find it confusing that the instance can be "stuck in REBALANCING" indefinitely because of a paused task. Ideally, rebalancing is a temporary situation and if everything goes well we should leave that state as soon as possible. Not 100% sure if Kafka Streams users feel the same way, but my guess would be that transitioning back to REBALANCING once the task is unpaused would seem cleaner feel better than being stuck in REBALANCING.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160996173


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##########
@@ -96,7 +96,10 @@ public boolean isActive() {
     }
 
     @Override
-    public void maybeRecordRestored(final Time time, final long numRecords) {
+    public void recordRestoration(final Time time, final long numRecords, final boolean initRemaining) {
+        if (initRemaining) {
+            throw new IllegalStateException("Stanby task would not record remaining records to restore");
+

Review Comment:
   ```suggestion
           }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160997049


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -456,15 +456,21 @@ public long restore(final Map<TaskId, Task> tasks) {
                 // TODO: we always try to restore as a batch when some records are accumulated, which may result in
                 //       small batches; this can be optimized in the future, e.g. wait longer for larger batches.
                 final TaskId taskId = changelogs.get(partition).stateManager.taskId();
-                try {
-                    final Task task = tasks.get(taskId);
-                    final ChangelogMetadata changelogMetadata = changelogs.get(partition);
-                    totalRestored += restoreChangelog(task, changelogMetadata);
-                } catch (final TimeoutException timeoutException) {
-                    tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
-                        time.milliseconds(),
-                        timeoutException
-                    );
+                final Task task = tasks.get(taskId);

Review Comment:
   Why did you move it our of the `try-catch` ?
   
   Wanna re-use `task` in the `catch` block that does call `tasks.get(taskId)` again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13523: MINOR: Follow-up on failing streams test

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1499493707

   @showuon @lucasbru this is the follow-up I made after merging your fixes, to re-enable the optimization and the test case; will double check that the build is still green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160999602


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -988,6 +1005,14 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogMetadata.stateManager.taskId();
+                final Task task = tasks.get(taskId);
+                // if the log is truncated between when we get the log end offset and when we get the
+                // consumer position, then it's possible that the difference become negative and there's actually
+                // no records to restore; in this case we just initialize the sensor to zero

Review Comment:
   Not sure if I can follow? If we get the end-offset first, the end-offset we have won't change even if the log gets truncated? And the consumer position should not change anyway? Can you elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1161000136


##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -188,12 +188,11 @@ public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnab
         kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
         kafkaStreams.pause();
         kafkaStreams.start();
-        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+        waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
         assertTrue(kafkaStreams.isPaused());
 
         produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
-        waitUntilStreamsHasPolled(kafkaStreams, 2);

Review Comment:
   Why do we remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on pull request #13523: [DO NOT MERGE] MINOR: Follow-up on failing streams test

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1500571126

   I have a similar clarification question as Hao: what is the significance of the instance state transitioning to `RUNNING`? Is it the case that until the instance state transitions to `RUNNING`, no active tasks can process data? If so, then option 2 would imply that if one task is paused while restoring, then no other tasks can make progress on processing because the instance state will not transition `RUNNING`, whereas option 1 would mean that other tasks would be unblocked to process even while the one task is paused during restoration. The latter behavior sounds consistent with the state updater scenario, whereas the first one is more strict.
   
   Given that we expect it to be rare that users will pause tasks during restoration and that the use cases for wanting to pause one task during restoration while other tasks continue to process messages, I am onboard with option 2 as the cleaner code change even though it sounds like the behavior may differ from the state updater world (assuming my understanding above is correct). Just my two cents though :)
   
   > what is the task state if they are paused? Is it SUSPENDED? I don't see a PAUSED state in https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java#L71-L76
   
   I seem to recall that there is no dedicated task state for paused tasks, and that the task state transitions as usual (could even be `RUNNING`, for example, while the task is paused). Perhaps best for Guozhang to confirm, though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: [DO NOT MERGE] MINOR: Follow-up on failing streams test

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160970925


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -219,13 +219,13 @@ public boolean isActive() {
     }
 
     @Override
-    public void maybeRecordRestored(final Time time, final long numRecords) {
-        maybeRecordSensor(numRecords, time, restoreSensor);
-        maybeRecordSensor(-1 * numRecords, time, restoreRemainingSensor);
-    }
-
-    public void initRemainingRecordsToRestore(final Time time, final long numRecords) {
-        maybeRecordSensor(numRecords, time, restoreRemainingSensor);
+    public void recordRestoration(final Time time, final long numRecords, final boolean initRemaining) {

Review Comment:
   This is a minor refactor to make sure all sensor recordings are inherited from the `Task` interface so that we do not need to cast to StreamTask.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -748,7 +748,7 @@ public Set<Task> getTasks() {
     @Override
     public boolean restoresActiveTasks() {
         return !executeWithQueuesLocked(
-            () -> getStreamOfNonPausedTasks().filter(Task::isActive).collect(Collectors.toSet())
+            () -> getStreamOfTasks().filter(Task::isActive).collect(Collectors.toSet())

Review Comment:
   This is to make the case with state-updater consistent with the one without state-updater: when tasks are paused, they are still considered as the ones that need to be restoration completed, before we can transit the main thread to RUNNING.
   
   See below the TODO marker on the interface API for details.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -644,20 +650,19 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
         final int numRecords = changelogMetadata.bufferedLimitIndex;
 
         if (numRecords != 0) {
-            final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(changelogMetadata.bufferedRecords.subList(0, numRecords));
+            final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords);

Review Comment:
   This is the fix 2) in the description.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -749,6 +754,11 @@ private Map<TopicPartition, Long> committedOffsetForChangelogs(final Map<TaskId,
         }
     }
 
+    private void filterNewPartitionsToRestore(final Map<TaskId, Task> tasks, final Set<ChangelogMetadata> newPartitionsToRestore) {

Review Comment:
   This is to fix the issue found before: if a task is paused, we should not initialize their changelogs, and hence the restore consumer would not poll their records, and hence we would never got a task to be restore while it's not in the `tasks` set.



##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -188,12 +188,11 @@ public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnab
         kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
         kafkaStreams.pause();
         kafkaStreams.start();
-        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);
+        waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);

Review Comment:
   This is mainly for 4) in the description: when we pause the tasks, the state should not transit to RUNNING. This test did not fail because of the bug we are not fixing in line 823 of StoreChangelogReader above.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -988,6 +1010,14 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogMetadata.stateManager.taskId();

Review Comment:
   This is to bring back the existing code that previous hot-fix has to remove.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -482,7 +482,7 @@ public void shouldReturnFalseForRestoreActiveTasksIfTaskPaused() throws Exceptio
         verifyRemovedTasks();
         verifyPausedTasks(task);
 
-        assertFalse(stateUpdater.restoresActiveTasks());
+        assertTrue(stateUpdater.restoresActiveTasks());

Review Comment:
   This is related to the change making sure with and without state updater has consistent behavior.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -456,15 +456,21 @@ public long restore(final Map<TaskId, Task> tasks) {
                 // TODO: we always try to restore as a batch when some records are accumulated, which may result in
                 //       small batches; this can be optimized in the future, e.g. wait longer for larger batches.
                 final TaskId taskId = changelogs.get(partition).stateManager.taskId();
-                try {
-                    final Task task = tasks.get(taskId);
-                    final ChangelogMetadata changelogMetadata = changelogs.get(partition);
-                    totalRestored += restoreChangelog(task, changelogMetadata);
-                } catch (final TimeoutException timeoutException) {
-                    tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
-                        time.milliseconds(),
-                        timeoutException
-                    );
+                final Task task = tasks.get(taskId);

Review Comment:
   Similarly, given the fix below in line 757, the task should never be null.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -691,7 +696,7 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
             }
         }
 
-        if (task != null && (numRecords > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED))) {
+        if (numRecords > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) {

Review Comment:
   Given the fix in 757 below, the task should never be null.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -880,6 +896,9 @@ private void initializeChangelogs(final Map<TaskId, Task> tasks,
     }
 
     private void addChangelogsToRestoreConsumer(final Set<TopicPartition> partitions) {
+        if (partitions.isEmpty())

Review Comment:
   This and the line 929 below is to reduce verbose logging lines of KafkaConsumer when it calls `unsubscribe`: if the partitions set is empty, while the current assignment is also empty, we should not proceed at all.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java:
##########
@@ -266,12 +264,21 @@ public void shouldGetDroppedRecordsSensor() {
         try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
             final Sensor sensor = TaskMetrics.droppedRecordsSensor(THREAD_ID, TASK_ID, streamsMetrics);
             streamsMetricsStaticMock.verify(
-                () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+                () -> StreamsMetricsImpl.addInvocationRateToSensor(

Review Comment:
   This is related to 3) in the description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #13523: [DO NOT MERGE] MINOR: Follow-up on failing streams test

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1500590801

   We did not introduce a `PAUSED` state, because of named topologies (which are still internal, but if we make it public would "break" `PAUSED` state). So when we are pausing, state would stay the same as it was when `PAUSE` is called, so it could either be `REBALANCING` or `RUNNING`. (Not ideal, but this is how it works right now.)
   
   >  Is it the case that until the instance state transitions to RUNNING, no active tasks can process data?
   
   No. The state is rather a reflection of what the runtime is doing, not the other way around.
   
   When/if we change the behavior in the future, and let the main thread process active task, while the restore thread is still restoring other active tasks, we need to reconsider what state we want to expose. Neither `RUNNING` nor `REBALANCING` (ie, restoring) would reflect that the runtime is doing any longer. But this is a question for a future KIP I guess, and not related to this PR.
   
   I also prefer option (2) -- seems to be cleanest. Right now we have a binary `RUNNING` or `REBALANCING` (ie, restoring), and given their either the restore thread does active restoring or the main thread does active processing, it seems best to stay in `REBALANCING` state if the restore thread is not done when we pause.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1162110812


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -988,6 +1005,14 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
+
+                final TaskId taskId = changelogMetadata.stateManager.taskId();
+                final Task task = tasks.get(taskId);
+                // if the log is truncated between when we get the log end offset and when we get the
+                // consumer position, then it's possible that the difference become negative and there's actually
+                // no records to restore; in this case we just initialize the sensor to zero

Review Comment:
   I was thinking about the case where the consumer's position is set as the checkpointed offset, whereas the log end offset was truncated after the checkpoint was written. Maybe I did not make that clear in the above comment? Will reword a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on pull request #13523: [DO NOT MERGE] MINOR: Follow-up on failing streams test

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1500494292

   I feel option 2 makes more sense. To clarify: option 1 means that the task is transitioned to `RUNNING` state while it's paused and restoring? Somehow this kind of doesn't make sense if `RUNNING` means the task starts processing? 
   
   A related question: what is the task state if they are paused? Is it `SUSPENDED`? I don't see a `PAUSED` state in https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java#L71-L76


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160996434


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##########
@@ -96,7 +96,11 @@ public boolean isActive() {
     }
 
     @Override
-    public void maybeRecordRestored(final Time time, final long numRecords) {
+    public void recordRestoration(final Time time, final long numRecords, final boolean initRemaining) {
+        if (initRemaining) {
+            throw new IllegalStateException("Stanby task would not record remaining records to restore");

Review Comment:
   ```suggestion
               throw new IllegalStateException("Standby task would not record remaining records to restore");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang merged PR #13523:
URL: https://github.com/apache/kafka/pull/13523


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160998595


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -907,6 +920,9 @@ private void pauseChangelogsFromRestoreConsumer(final Collection<TopicPartition>
     }
 
     private void removeChangelogsFromRestoreConsumer(final Collection<TopicPartition> partitions) {
+        if (partitions.isEmpty())

Review Comment:
   ```suggestion
           if (partitions.isEmpty()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13523: [DO NOT MERGE] MINOR: Follow-up on failing streams test

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1499854043

   While working on fixing the follow-ups I found a somewhat tricky issue with pausing tasks, related to state transitions: previously we only transit to `RUNNING` state when all tasks, no matter if they are paused or resumed, have finished restoration. This transition requirement is not strict with state-updater today, e.g. if a task was paused at the very beginning, like what we did in `PauseResumeIntegrationTest` where we call `streams.pause()` and then `streams.start()`, then the state updater would not try to restore those received tasks, while it would report it has no active tasks undergoing restoration for the moment (since those tasks are paused) to the main thread, which would take it as a signal to transit to `RUNNING`.
   
   Without the state-updater though, the main thread would not transit to `RUNNING` if user calls 
   
   ```
   kafkaStreams.pause();
   kafkaStreams.start();
   ```
   
   Since they would need to see those tasks, though paused, to complete restoration still. Previously those tests do not fail because of the bug I'm actually fixing here, and now with the fix in place, those tests would not pass since they would never see the state transiting to `RUNNING` when `enableStateUpdater` is false.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13523: [DO NOT MERGE] MINOR: Follow-up on failing streams test

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1500665780

   And just to clarify some context here: currently we have a binary state for the main thread regarding `REBALANCING` and `RUNNING`: the thread would either be spending all its time on restoring active tasks as long as there are any, or spend time on processing tasks which would only happen until all tasks have finished restorations. In the past we actually did allow the thread to interleave restoring some tasks while processing some other tasks that have finished restorations, but we found that this interleaving of processing/restoration would make the restoration end-to-end latency much larger, so the rationale behind the current behavior is to reduce our restoration as much as possible, and only start any processing after all restoration are finished.
   
   Part of the state-updater's introduction is to resolve that, i.e. the main thread can just do processing, while the restoration can be done by the state-updater thread, and processing / restoration can happen in parallel. But before we baked in the state-updater, i.e. remove the old code and make state-updater always enabled, we decided to still honor the current behavior, i.e. we only transit to `RUNNING` and process any tasks when all restoration tasks have been completed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160996049


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##########
@@ -96,7 +96,10 @@ public boolean isActive() {
     }
 
     @Override
-    public void maybeRecordRestored(final Time time, final long numRecords) {
+    public void recordRestoration(final Time time, final long numRecords, final boolean initRemaining) {
+        if (initRemaining)

Review Comment:
   ```suggestion
           if (initRemaining) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1161010646


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -456,15 +456,21 @@ public long restore(final Map<TaskId, Task> tasks) {
                 // TODO: we always try to restore as a batch when some records are accumulated, which may result in
                 //       small batches; this can be optimized in the future, e.g. wait longer for larger batches.
                 final TaskId taskId = changelogs.get(partition).stateManager.taskId();
-                try {
-                    final Task task = tasks.get(taskId);
-                    final ChangelogMetadata changelogMetadata = changelogs.get(partition);
-                    totalRestored += restoreChangelog(task, changelogMetadata);
-                } catch (final TimeoutException timeoutException) {
-                    tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
-                        time.milliseconds(),
-                        timeoutException
-                    );
+                final Task task = tasks.get(taskId);

Review Comment:
   Nope, I think it's just for me to write on a diff. It's not intentional. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160998193


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -880,6 +896,9 @@ private void initializeChangelogs(final Map<TaskId, Task> tasks,
     }
 
     private void addChangelogsToRestoreConsumer(final Set<TopicPartition> partitions) {
+        if (partitions.isEmpty())

Review Comment:
   > while the current assignment is also empty
   
   How do we know this? We did not get the assignment yet? Should this go after
   ```
   final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
   
   if (partitions.isEmpty() && assignment.isEmpty()) {
     return;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160998689


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -907,6 +920,9 @@ private void pauseChangelogsFromRestoreConsumer(final Collection<TopicPartition>
     }
 
     private void removeChangelogsFromRestoreConsumer(final Collection<TopicPartition> partitions) {
+        if (partitions.isEmpty())
+            return;
+

Review Comment:
   ```suggestion
           }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13523:
URL: https://github.com/apache/kafka/pull/13523#issuecomment-1503662179

   > Ideally, rebalancing is a temporary situation and if everything goes well we should leave that state as soon as possible.
   
   I totally agree with you @lucasbru . As I mentioned in the above comment, "before we baked in the state-updater, i.e. remove the old code and make state-updater always enabled, we decided to still honor the current behavior". After state-updater becomes the only choice, I think we should allow restoration and state-updater and processing on main thread to happen at the same time, in which case `REBALANCING` state should be pretty short-lives, if any. And for us the question becomes whether we would want a new state to indicate that some restoration is happening in parallel while we are processing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org