You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/12 16:54:10 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

guozhangwang opened a new pull request #9170:
URL: https://github.com/apache/kafka/pull/9170


   In order to do this, I also removed the optimization such that once enforced checkpoint is set to true, we always checkpoint unless the state stores are not initialized at all (i.e. the snapshot is null).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#issuecomment-673090664


   > Do you have any theories as to why this was causing `StreamsUpgradeTest.test_app_upgrade` to be flaky? Do we really get that many TaskCorruptedExceptions in this system test?
   
   I observed several of the failed tests are due to the endless loop of the task-corruption. Basically we shutdown all applications and then restart them, and in between the log truncation may trigger and hence cause restoration hit out-of-range. And due to this bug we would fall in the loop forever.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#discussion_r469509680



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,13 +193,17 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith
 
             try {
                 task.suspend();
+                // we need to enforce a checkpoint that removes the corrupted partitions
+                task.postCommit(true);
             } catch (final RuntimeException swallow) {
                 log.error("Error suspending corrupted task {} ", task.id(), swallow);
             }
             task.closeDirty();
+
+            // For active tasks pause their input partitions so we won't poll any more records
+            // for this task until it has been re-initialized;
+            // Note, closeDirty already clears the partitiongroup for the task.

Review comment:
       partition group

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,13 +193,17 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith
 
             try {
                 task.suspend();
+                // we need to enforce a checkpoint that removes the corrupted partitions
+                task.postCommit(true);

Review comment:
       I'm not sure this is 100% ensuring the snapshot gets done, as we have branching logic in postCommit. Do you think we should just get a helper to cleanup instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -58,6 +58,9 @@ static boolean checkpointNeeded(final boolean enforceCheckpoint,
             return false;
         }
 
+        if (enforceCheckpoint)

Review comment:
       Could we move this logic to the `StandbyTask` only? It is the only case I have seen which could have null snapshot passed in, which could make this helper assume both snapshots are not null.
   ```
   if (oldOffsetSnapshot == null) {
               return false;	          
   }	        
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#discussion_r469450345



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1609,12 +1609,11 @@ public void shouldReturnStateManagerChangelogOffsets() {
     }
 
     @Test
-    public void shouldCheckpointWithCreatedStateOnClose() {
+    public void shouldNotCheckpointOnCloseCreated() {

Review comment:
       Why did this test change?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1629,16 +1628,18 @@ public void shouldCheckpointWithCreatedStateOnClose() {
         assertFalse(source1.initialized);
         assertFalse(source1.closed);
 
+        EasyMock.verify(stateManager, recordCollector);
+
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
     }
 
     @Test
-    public void shouldNotCheckpointOnCloseRestoringIfNoProgress() {
+    public void shouldCheckpointOnCloseRestoringIfNoProgress() {

Review comment:
       Same with this one, why did it change? Or was the test just wrong to begin with..?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#issuecomment-672999981


   cc @ableegoldman @abbccdda for reviews


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#discussion_r469518409



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -58,6 +58,9 @@ static boolean checkpointNeeded(final boolean enforceCheckpoint,
             return false;
         }
 
+        if (enforceCheckpoint)

Review comment:
       The null snapshot is for both active / standby tasks before the initializeIfNeeded is triggered, in that case, we should not overwrite the checkpoint even if it was enforced.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#issuecomment-672992402


   Successful system tests: 
   
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4121/
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4122/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #9170:
URL: https://github.com/apache/kafka/pull/9170


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#discussion_r469517595



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,13 +193,17 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith
 
             try {
                 task.suspend();
+                // we need to enforce a checkpoint that removes the corrupted partitions
+                task.postCommit(true);

Review comment:
       Here since the task is guaranteed to be in SUSPENDED `postCommit()` is ensured to trigger checkpointing.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1609,12 +1609,11 @@ public void shouldReturnStateManagerChangelogOffsets() {
     }
 
     @Test
-    public void shouldCheckpointWithCreatedStateOnClose() {
+    public void shouldNotCheckpointOnCloseCreated() {

Review comment:
       It was the same as the other one, that it was wrong to begin with.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org