You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Kowshik Prakasam (Jira)" <ji...@apache.org> on 2021/07/13 16:21:00 UTC
[jira] [Updated] (KAFKA-13070) LogManager shutdown races with
periodic work scheduled by the instance
[ https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kowshik Prakasam updated KAFKA-13070:
-------------------------------------
Description:
In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest.
```
// set val maxLogAgeMs = 60000 in the test
@Test
def testRetentionPeriodicWorkAfterShutdown(): Unit = {
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
val logFile = new File(logDir, name + "-0")
assertTrue(logFile.exists)
log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log.updateHighWatermark(log.logEndOffset)
logManager.shutdown()
assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath))
time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1)
logManager = null
}
```
was:
In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest.
```
// set val maxLogAgeMs = 60000 in the test
@Test
def testRetentionPeriodicWorkAfterShutdown(): Unit = {
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
val logFile = new File(logDir, name + "-0")
assertTrue(logFile.exists)
log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log.updateHighWatermark(log.logEndOffset)
logManager.shutdown()
assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath))
time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1)
logManager = null
}
```
> LogManager shutdown races with periodic work scheduled by the instance
> ----------------------------------------------------------------------
>
> Key: KAFKA-13070
> URL: https://issues.apache.org/jira/browse/KAFKA-13070
> Project: Kafka
> Issue Type: Bug
> Reporter: Kowshik Prakasam
> Assignee: Manasvi Gupta
> Priority: Major
>
> In the LogManager shutdown sequence (in LogManager.shutdown()), we don't cancel the periodic work scheduled by it prior to shutdown. As a result, the periodic work could race with the shutdown sequence causing some unwanted side effects. This is reproducible by a unit test in LogManagerTest.
>
> ```
> // set val maxLogAgeMs = 60000 in the test
> @Test
> def testRetentionPeriodicWorkAfterShutdown(): Unit = {
> val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
> val logFile = new File(logDir, name + "-0")
> assertTrue(logFile.exists)
> log.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
> log.updateHighWatermark(log.logEndOffset)
> logManager.shutdown()
> assertTrue(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath))
> time.sleep(maxLogAgeMs + logManager.InitialTaskDelayMs + logManager.retentionCheckMs + 1)
> logManager = null
> }
> ```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)