You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "philipnee (via GitHub)" <gi...@apache.org> on 2023/02/17 05:00:33 UTC

[GitHub] [kafka] philipnee opened a new pull request, #13269: Kafka 12634 enforce checkpoint after restoration

philipnee opened a new pull request, #13269:
URL: https://github.com/apache/kafka/pull/13269

   *We want to ensure checkpointing the progress after completing the restoration to prevent losing the progress and needing to restore from scratch
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1467588143

   > Sorry for jumping into this PR late @philipnee , for this ticket, we have fixed it in the new state updater thread ([KAFKA-10199](https://issues.apache.org/jira/browse/KAFKA-10199). And we are expecting to enable the state updater soon. So I'd suggest we do not try to fix it inside the main stream thread any more. WDYT?
   
   @guozhangwang 
   Do you think this change is too risky? 
   I do not know how long "soon" means. At the moment, it seems It will still take us some time until we will enable the state updater by default. Even then, we will keep the option to switch back to the old restoration for a couple of releases. 
   So, if we think this PR is not too risky, I would go for it. 
   Maybe I am a bit out of the loop currently and I am missing something.  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1500105174

   Build failures are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / kafka.security.authorizer.AuthorizerTest.testAllowAccessWithCustomPrincipal(String).quorum=
   Build / JDK 11 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testSuperUserHasAccess(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testAccessAllowedIfAllowAclExistsOnPrefixedResource(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.testFallbackPriorTaskAssignorLargePartitionCount
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1453921659

   i took a peek at it because @mjsax mentioned it is easy to work on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156483107


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -44,6 +44,7 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;

Review Comment:
   The IDE moved this, but it seems to be in an alphabetical sequence now. Can we keep it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1499289597

   Hey @cadonna - sorry for the back and forth, your comment is addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1149285314


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1699,7 +1705,8 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).times(2);

Review Comment:
   Thanks, I think that make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1486266466

   Hey @cadonna - Thanks for getting back on this. I added two tests covering EOS enabled/disabled cases - LMK if this is what you are looking for. I tried to make the test case explicit there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156165447


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())

Review Comment:
   Ah, thanks.  I think it's from auto formatting from the IDE.  I'll revert these indentation changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156164789


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -255,6 +255,9 @@ public void completeRestoration(final java.util.function.Consumer<Set<TopicParti
                 resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
                 initializeTopology();
                 processorContext.initialize();
+                if (!eosEnabled) {
+                    maybeCheckpoint(true); // enforce checkpoint upon completing restoration

Review Comment:
   agreed, thanks for catching that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1502392008

   @cadonna -- Should we cherry-pick to 3.4 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1453776408

   @cadonna - Thanks for the feedback there.  I added the EOS check before the checkpoint. I also made a bunch of changes to the tests.  To make things obvious, I added a comment to these altered tests to let people know about the checkpointing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1488874445

   Not entirely sure what this is: `[2023-03-29T05:12:47.884Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-13269/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala:436:50: value addOne is not a member of scala.collection.mutable.ArrayBuffer[scala.collection.mutable.Buffer[org.apache.kafka.server.common.ApiMessageAndVersion]]`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1114238733


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -255,6 +255,7 @@ public void completeRestoration(final java.util.function.Consumer<Set<TopicParti
                 resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
                 initializeTopology();
                 processorContext.initialize();
+                maybeCheckpoint(true); // enforce checkpoint upon completing restoration

Review Comment:
   As the ticket says, we can only write this checkpoint if we process under at-least-once (ALOS) processing guarantee , but we must not write the checkpoint when we process under exactly-once (EOS). In EOS, we need to rebuild local state when the Streams client crashed or closed dirty. Otherwise, the Streams would reprocess the  records from the last committed offset and add the those same records again to the local state store violating EOS. We know that a Streams client crashed or closed dirty if there is no checkpoint file in the state directory. Now, if we write a checkpoint right after restoration and the Streams client processes records, but before the first commit the Streams client crashes or closes dirty, it would violate EOS for the aforementioned reasons.   
   So you need to differentiate between ALOS and EOS similar when a commit is executed: https://github.com/apache/kafka/blob/98c2f88e1c605195ccfac19c49a83216e26146a1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L492



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1453919620

   Oh yeah, I was wondering about that, too - it's up to you. We can abandon this and wait for 10199 to be completed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156197845


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {

Review Comment:
   Thanks, i'm adopting your suggestion here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1155775071


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null

Review Comment:
   Indentation of 4 is fine. Could you revert this change, please?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1546,8 +1551,9 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
         stateManager.checkpoint();
         EasyMock.expectLastCall().once();
         EasyMock.expect(stateManager.changelogOffsets())
-            .andReturn(singletonMap(changelogPartition, 10L))
-            .andReturn(singletonMap(changelogPartition, 20L));
+                .andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint
+                .andReturn(singletonMap(changelogPartition, 10L))
+                .andReturn(singletonMap(changelogPartition, 20L));

Review Comment:
   ```suggestion
               .andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint
               .andReturn(singletonMap(changelogPartition, 10L))
               .andReturn(singletonMap(changelogPartition, 20L));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())

Review Comment:
   Could you revert this change, please? An indentation of 4 is fine.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {

Review Comment:
   We usually use the form `should...` for test names. My proposal here is to use `shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration checkpoint
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void testRestoration_CheckpointNotWrittenWhenEOSEnabled() {
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().andStubThrow(new AssertionError("should not checkpoint on EOS"));
+        stateManager.changelogOffsets();
+        EasyMock.expectLastCall().andStubThrow(new AssertionError("should not checkpoint on EOS"));
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });

Review Comment:
   Could you please use Mockito instead of EasyMock here? We are migrating to Mockito and we would like to have new tests using Mockito. See `TaskManagerTest` for an example where a test class uses Mockito and EasyMock. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null
         );
 
         task = new StreamTask(
-            taskId,
-            mkSet(partition1, repartition),
-            topology,
-            consumer,
-            new TopologyConfig(null,  config, new Properties()).getTaskConfig(),
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            stateManager,
-            recordCollector,
-            context,
-            logContext);
+                taskId,
+                mkSet(partition1, repartition),
+                topology,
+                consumer,
+                new TopologyConfig(null, config, new Properties()).getTaskConfig(),
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context,
+                logContext);
 
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> {
+        });

Review Comment:
   This change is not needed for the purpose of this PR.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration checkpoint
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);

Review Comment:
   Could you please use Mockito instead of EasyMock here? We are migrating to Mockito and we would like to have new tests using Mockito. See `TaskManagerTest` for an example where a test class uses Mockito and EasyMock. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration checkpoint
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void testRestoration_CheckpointNotWrittenWhenEOSEnabled() {

Review Comment:
   See aboveWe usually use the form `should...` for test names. My proposal here is to use `shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null
         );
 
         task = new StreamTask(
-            taskId,
-            mkSet(partition1, repartition),
-            topology,
-            consumer,
-            new TopologyConfig(null,  config, new Properties()).getTaskConfig(),
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            stateManager,
-            recordCollector,
-            context,
-            logContext);
+                taskId,
+                mkSet(partition1, repartition),
+                topology,
+                consumer,
+                new TopologyConfig(null, config, new Properties()).getTaskConfig(),
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context,
+                logContext);

Review Comment:
   Thank you for removing the superfluous space! However, the indentation of 4 is fine. Could you revert the indentation, please?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -255,6 +255,9 @@ public void completeRestoration(final java.util.function.Consumer<Set<TopicParti
                 resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
                 initializeTopology();
                 processorContext.initialize();
+                if (!eosEnabled) {
+                    maybeCheckpoint(true); // enforce checkpoint upon completing restoration

Review Comment:
   Could you please remove the inline comment? I do not think it adds any information that is not already contained in the call to `maybeCheckpoint(true)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1453770724

   There's a bunch of failing DedicatedMirrorIntegrationTest - most likely unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1453914588

   Sorry for jumping into this PR late @philipnee , for this ticket, we have fixed it in the new state updater thread ([KAFKA-10199](https://issues.apache.org/jira/browse/KAFKA-10199). And we are expecting to enable the state updater soon. So I'd suggest we do not try to fix it inside the main stream thread any more. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1149189233


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1699,7 +1705,8 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).times(2);

Review Comment:
   Since for this test these calls are just stubs (i.e., nothing needs to be verified about these calls), I would replace them with:
   ```suggestion
   EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
           EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1503,7 +1505,9 @@ public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration()
     public void shouldReInitializeTopologyWhenResuming() throws IOException {
         stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
         EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()); // restoration checkpoint
         EasyMock.expect(recordCollector.offsets()).andThrow(new AssertionError("Should not try to read offsets")).anyTimes();

Review Comment:
   nit:
   ```suggestion
           EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap())
               .andThrow(new AssertionError("Should not try to read offsets")).anyTimes();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1157024108


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, times(1)).checkpoint();
+        verify(recordCollector, times(1)).offsets();

Review Comment:
   `times(1)` is default and can be omitted.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, times(1)).checkpoint();
+        verify(recordCollector, times(1)).offsets();

Review Comment:
   `verify(recordCollector, times(1)).offsets();` is not strictly needed. The important thing in this test is that the checkpoint is written.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();

Review Comment:
   ```suggestion
           final ProcessorStateManager processorStateManager = mock(ProcessorStateManager.class);
           when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE);
           when(processorStateManager.taskId()).thenReturn(taskId);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -44,6 +44,7 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;

Review Comment:
   That is fine!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, times(1)).checkpoint();
+        verify(recordCollector, times(1)).offsets();
+    }
+
+    @Test
+    public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+
+        task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, never()).checkpoint();
+        verify(processorStateManager, never()).changelogOffsets();
+        verify(recordCollector, never()).offsets();
+    }
+
+    private ProcessorStateManager mockStateManager() {
+        final ProcessorStateManager manager = spy(new ProcessorStateManager(

Review Comment:
   I am afraid I cannot follow here. With Mockito you can define strict stubbings. That means Mockito will throw an exception if you have defined stubs that are not used. To use strict mocks, we usually specify 
   ```java
   @MockitoSettings(strictness = Strictness.STRICT_STUBS)
   ```
   on the test class level. Here, this does not work because I think (I am not sure) we use
   ```java
   @RunWith(EasyMockRunner.class)
   ```    
   What works is using
   ```java
       @Before
       public void setup() {
           mockito = Mockito.mockitoSession()
               .initMocks(this)
               .strictness(Strictness.STRICT_STUBS)
               .startMocking();
   ...
   ```
   as first call in the setup method and 
   ```java
       @After
       public void cleanup() throws IOException {
           ...
           mockito.finishMocking();
       }
   ```
   as last call in the teardown method.
   
   With this the unit test becomes:
   ```java
       @Test
       public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
           final ProcessorStateManager processorStateManager = mock(ProcessorStateManager.class);
           when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE);
           when(processorStateManager.taskId()).thenReturn(taskId);
           recordCollector = mock(RecordCollectorImpl.class);
   
           task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, processorStateManager);
           task.initializeIfNeeded();
           task.completeRestoration(noOpResetter -> { });
           verify(processorStateManager, never()).checkpoint();
           verify(processorStateManager, never()).changelogOffsets();
           verify(recordCollector, never()).offsets();
       }
   ```
   
   If you like you could extract this part into a method and reuse it in both unit tests (that is basically what you originally did, but w\o spy):
   ```java
           final ProcessorStateManager processorStateManager = mock(ProcessorStateManager.class);
           when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE);
           when(processorStateManager.taskId()).thenReturn(taskId);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();

Review Comment:
   This is not needed since the checkpoint is forced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna merged pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna merged PR #13269:
URL: https://github.com/apache/kafka/pull/13269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156196422


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null
         );
 
         task = new StreamTask(
-            taskId,
-            mkSet(partition1, repartition),
-            topology,
-            consumer,
-            new TopologyConfig(null,  config, new Properties()).getTaskConfig(),
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            stateManager,
-            recordCollector,
-            context,
-            logContext);
+                taskId,
+                mkSet(partition1, repartition),
+                topology,
+                consumer,
+                new TopologyConfig(null, config, new Properties()).getTaskConfig(),
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context,
+                logContext);

Review Comment:
   Sorry! I think I'm in the habit of using 8 spaces for "continuous indentation".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156483833


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, times(1)).checkpoint();
+        verify(recordCollector, times(1)).offsets();
+    }
+
+    @Test
+    public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+
+        task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, never()).checkpoint();
+        verify(processorStateManager, never()).changelogOffsets();
+        verify(recordCollector, never()).offsets();
+    }
+
+    private ProcessorStateManager mockStateManager() {
+        final ProcessorStateManager manager = spy(new ProcessorStateManager(

Review Comment:
   I'm doing a partial mock here, to make sure the correct methods are being invoked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1495021777

   Hey @cadonna - Again, thanks for the reviews; sorry for the auto-cleanup for my IDE, I've cleaned up most of the unnecessary changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1496380886

   Thank you @cadonna - Based on your suggestions I configured the mockito strictness based on your suggestion.  I also cleaned the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1158824245


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -274,6 +287,7 @@ public void cleanup() throws IOException {
             task.closeDirty();
             task = null;
         }
+        mockito.finishMocking();

Review Comment:
   Could you put this after `Utils.delete(BASE_DIR)` to ensure that the directory is deleted also if mockito throws an exception when a stub is unnecessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee closed pull request #13269: KAFKA-12634 enforce checkpoint after restoration

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee closed pull request #13269: KAFKA-12634 enforce checkpoint after restoration
URL: https://github.com/apache/kafka/pull/13269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org