You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/29 21:58:25 UTC

[GitHub] [kafka] hutchiko opened a new pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

hutchiko opened a new pull request #11283:
URL: https://github.com/apache/kafka/pull/11283


   When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the `maybeWriteCheckpoint` method is only ever called when `commitNeeded=false`. This change will force the update if `enforceCheckpoint=true` .
   
   I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-918706779


   @guozhangwang can you cherrypick this back to 2.8 at least? Maybe also 2.7 if there aren't any conflicts (seems like it should be a smooth merge but 🤷‍♀️ )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-930593991


   @guozhangwang Should this fix be cherry-picked to `3.0` branch (and the jira be updated with fixed version `3.0.1`?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-915657410


   > but there was a relevant failure in the build: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11283/5/testReport/org.apache.kafka.streams.integration/EosIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldWriteLatestOffsetsToCheckpointOnShutdown_at_least_once_/
   
   > Guessing it's just some flakiness in the test, can you check that out before I merge?
   
   @hutchiko I looked at the test code, and it seems to me there's indeed a timing-related flakiness. Could you try to fix it before we merge (you can first try to reproduce it, e.g. on IDE with repeated runs and see how often it could fail; and after you fix it usually we would try to verify that after say 1000 runs, there's no more failure).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hutchiko edited a comment on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
hutchiko edited a comment on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908804447


   @ableegoldman I did not test against 3.0 just 2.7 and 2.8. I'll rebase onto `trunk` as see how that goes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908748576


   Hey @hutchiko , thanks for digging into this bug so thoroughly and providing a patch! One quick question before I review -- does this only affect version 2.8 or below specifically, or could this be present on trunk/3.0 as well? Unless you already checked this, my guess would be the latter. If you can verify that is true, then can you please retarget this PR against the `trunk` branch? Once it gets merged then we can cherrypick back to 2.8 from there. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-919580172


   I resolved some conflicts and cherry-picked to 2.8; there are too many conflicts in 2.7 though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax edited a comment on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
mjsax edited a comment on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-930593991


   @guozhangwang Now that `3.0.0` is released, should this fix be cherry-picked to `3.0` branch (and the jira be updated with fixed version `3.0.1`?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hutchiko commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
hutchiko commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r698898574



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since last commit
         // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {

Review comment:
       I thought the same thing but not knowing enough about all the streams internals I thought I'd just go with the most minimal change possible. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r703953000



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since last commit
         // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {

Review comment:
       Fair enough 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-930593991


   @guozhangwang Should this fix be cherry-picked to `3.0` branch (and the jira be updated with fixed version `3.0.1`?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax edited a comment on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
mjsax edited a comment on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-930593991


   @guozhangwang Now that `3.0.0` is released, should this fix be cherry-picked to `3.0` branch (and the jira be updated with fixed version `3.0.1`?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
hutchiko commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908804447


   @ableegoldman I did not test against 3.0 just 2.7 and 2.8. I'll rebase onto trunk/3.0 as see how that goes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r704825002



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since last commit
         // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {

Review comment:
       The reason that I added this check is that `checkpointableOffsets()` can potentially be expensive. I think the fix to have `commitNeeded || enforceCheckpoint` is actually elegant as we did not introduce extra unnecessary overhead much, since it is only true when closing the task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-931541107


   SG, will cherry-pick.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hutchiko edited a comment on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
hutchiko edited a comment on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908804447


   @ableegoldman I did not test against 3.0 just 2.7 and 2.8. I'll rebase onto `trunk` as see how that goes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-918584383


   cc @kkonstantine , this is a critical bug fix hence I'm cherry-picking to 3.0 as well. If RC2 is voted through then it will fall on 3.0.1, otherwise if we vote for another RC3 I think it'd be a great-to-have in 3.0.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
hutchiko commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908804447






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hutchiko commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
hutchiko commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r698898574



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since last commit
         // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {

Review comment:
       I thought the same thing but not knowing enough about all the streams internals I thought I'd just go with the most minimal change possible. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-918582404


   I checked the failures of the new run and they are not related to the new tests. Merging to trunk now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
hutchiko commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-918537415


   @guozhangwang @ableegoldman unfortunately I could never reproduce the CI failures however I have pushed up a refactor of the method which I think was responsible for the flakiness.  
   
   The original version of the the method was scanning backwards through the changelog topic searching for the top record so I could cross check that record's offset with the checkpointed offset. It had an implicit assumption that the consumer it was driving backwards would always get some records after a 50ms `poll` - thinking this through it's obviously a false assumption. 
   
   I switched the logic around so it just consumes forwards until it finds the end of the topic there are no assumptions about timing in the new logic so I'm hoping that will fix the flakiness.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908748576


   Hey @hutchiko , thanks for digging into this bug so thoroughly and providing a patch! One quick question before I review -- does this only affect version 2.8 or below specifically, or could this be present on trunk/3.0 as well? Unless you already checked this, my guess would be the latter. If you can verify that is true, then can you please retarget this PR against the `trunk` branch? Once it gets merged then we can cherrypick back to 2.8 from there. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
hutchiko commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908826996


   Yeah I've rebased and verified the issue is there in `trunk` too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r698868805



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since last commit
         // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {

Review comment:
       What if we just removed the check altogether? It's not like updating the changelog offsets is a particularly "heavy" call, we may as well future-proof things even more by just updating the offsets any time.
   
   In fact, why do we even have this weird split brain logic to begin with...it would make more sense to just update the offsets inside the `StreamTask#maybeWriteCheckpoint` and `stateMgr.checkpoint()` methods, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11283:
URL: https://github.com/apache/kafka/pull/11283


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r698868805



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since last commit
         // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {

Review comment:
       What if we just removed the check altogether? It's not like updating the changelog offsets is a particularly "heavy" call, we may as well future-proof things even more by just updating the offsets any time.
   
   In fact, why do we even have this weird split brain logic to begin with...it would make more sense to just update the offsets inside the `StreamTask#maybeWriteCheckpoint` and `stateMgr.checkpoint()` methods, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org