You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/26 09:37:41 UTC

[GitHub] [kafka] kowshik opened a new pull request #11814: MINOR: Ensure LocalLog.flush is immune to recoveryPoint change by different thread

kowshik opened a new pull request #11814:
URL: https://github.com/apache/kafka/pull/11814


   **Issue:**
   Imagine a scenario where two threads T1 and T2 are inside `UnifiedLog.flush()` **concurrently**:
    * `KafkaScheduler` thread **T1** -> The periodic work calls `LogManager.flushDirtyLogs()` which in turn calls `UnifiedLog.flush()`. For example, this can happen due to `log.flush.scheduler.interval.ms` [here](https://github.com/apache/kafka/blob/8cca18d7b99d5905a84ccabb813d6a27bc8f44db/core/src/main/scala/kafka/log/LogManager.scala#L467-L471).
    * `KafkaScheduler` thread **T2** -> A `UnifiedLog.flush()` call is triggered asynchronously during segment roll [here](https://github.com/apache/kafka/blob/8cca18d7b99d5905a84ccabb813d6a27bc8f44db/core/src/main/scala/kafka/log/UnifiedLog.scala#L1501).
   
   Supposing if thread T1 advances the recovery point beyond the flush offset of thread T2, then this could trip the check within `LogSegments.values()` [here](https://github.com/apache/kafka/blob/8cca18d7b99d5905a84ccabb813d6a27bc8f44db/core/src/main/scala/kafka/log/LogSegments.scala#L136) for thread T2, when it is called from `LocalLog.flush()` [here](https://github.com/apache/kafka/blob/8cca18d7b99d5905a84ccabb813d6a27bc8f44db/core/src/main/scala/kafka/log/LocalLog.scala#L171). The exception causes the `KafkaScheduler` thread to die, which is not desirable.
   
   **Fix:**
   We fix this by ensuring that `LocalLog.flush()` is immune to the case where the recoveryPoint advances beyond the flush offset.
   
   **Tests:**
   I was able to test this manually by introducing barriers in the code to help simulate the race condition. As such, this is a hard case to write an automated unit test for, so I haven't added a new test case in this PR. So I'm mostly just relying on code review and also ensure there are no regressions in existing tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #11814: MINOR: Ensure LocalLog.flush is immune to recoveryPoint change by different thread

Posted by GitBox <gi...@apache.org>.
kowshik commented on pull request #11814:
URL: https://github.com/apache/kafka/pull/11814#issuecomment-1051910771


   cc @junrao @lbradstreet for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #11814: MINOR: Ensure LocalLog.flush() is immune to recoveryPoint change by different thread

Posted by GitBox <gi...@apache.org>.
kowshik commented on pull request #11814:
URL: https://github.com/apache/kafka/pull/11814#issuecomment-1055098574


   @junrao Thanks for the review! I checked the test failures, and they look unrelated to this PR. I agree, your suggestion is a good way to simplify the code and it will be a lot more maintainable too. I have opened [KAFKA-13701](https://issues.apache.org/jira/browse/KAFKA-13701) to track the improvement.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #11814: MINOR: Ensure LocalLog.flush() is immune to recoveryPoint change by different thread

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #11814:
URL: https://github.com/apache/kafka/pull/11814


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org