You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/02 15:48:31 UTC

[GitHub] [kafka] tombentley opened a new pull request #9676: KAFKA-10778: Fence appends after write failure

tombentley opened a new pull request #9676:
URL: https://github.com/apache/kafka/pull/9676


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r544611285



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1556,6 +1572,8 @@ class Log(@volatile private var _dir: File,
           done = fetchDataInfo != null || segmentEntry == null
         }
 
+        checkForLogDirFailure()

Review comment:
       Seems more intuitive to move this check before the segment read. I don't think we can totally avoid race conditions with a failure in append since we don't have the lock here. Perhaps we could even move this check to `maybeHandleIOException` so that we handle all cases? 

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -2818,6 +2818,22 @@ class LogTest {
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
   }
 
+  @Test
+  def testAppendToOrReadFromLogInFailedLogDir(): Unit = {
+    val log = createLog(logDir, LogConfig())
+    log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
+    assertEquals(0, readLog(log, 0, 4096).records.records.iterator.next().offset)
+    try {
+      log.maybeHandleIOException("Simulating failed log dir") {

Review comment:
       Another way to trigger an IO exception is to rename the log file. This trick is used in `testAppendToTransactionIndexFailure`. Then we don't need to expose `maybeHandleIOException` for testing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#issuecomment-737317322


   @hachikuji is this what you had in mind for what you described in [KAFKA-10778](https://issues.apache.org/jira/browse/KAFKA-10778)?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#issuecomment-747481897


   Thanks for the review @hachikuji.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r535691107



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1219,6 +1219,9 @@ class Log(@volatile private var _dir: File,
               appendInfo.logAppendTime = duplicate.timestamp
               appendInfo.logStartOffset = logStartOffset
             case None =>
+              if (logDirFailureChannel.logDirIsOffline(parentDir)) {
+                throw new KafkaStorageException(s"The log dir $parentDir has failed.");

Review comment:
       Maybe we can leave a more informative error message here? Sth like "... dir has failed due to a previous IO exception", just indicating it is not failed because of the current calling trace.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#issuecomment-737815296


   @hachikuji thanks for taking a look, I've fixed the test failures. Do we also need something on the read path, KAFKA-10778 suggested so?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r535972740



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1219,6 +1219,9 @@ class Log(@volatile private var _dir: File,
               appendInfo.logAppendTime = duplicate.timestamp
               appendInfo.logStartOffset = logStartOffset
             case None =>
+              if (logDirFailureChannel.logDirIsOffline(parentDir)) {
+                throw new KafkaStorageException(s"The log dir $parentDir has failed.");

Review comment:
       @guozhangwang good point. I also changed the language to 'offline' rather than 'failed', since this is more consistent with other usage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r545144134



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1556,6 +1572,8 @@ class Log(@volatile private var _dir: File,
           done = fetchDataInfo != null || segmentEntry == null
         }
 
+        checkForLogDirFailure()

Review comment:
       Yeah, I wondered about putting it in `maybeHandleIOException` too, and I've made this change. There are a few places where changing the order in which we do `maybeHandleIOException` with respect to `lock synchronized` would seem to avoid some possible races with no side effects other than increasing the duration the lock is held (for the `try` and the volatile read). I'm thinking specifically `maybeIncrementLogStartOffset`, `roll`, `delete`, `truncateFullyAndStartAt` and perhaps even `truncateTo`. But given that there will always exist some races maybe it's not worth it, WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r545145005



##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -2818,6 +2818,22 @@ class LogTest {
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
   }
 
+  @Test
+  def testAppendToOrReadFromLogInFailedLogDir(): Unit = {
+    val log = createLog(logDir, LogConfig())
+    log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
+    assertEquals(0, readLog(log, 0, 4096).records.records.iterator.next().offset)
+    try {
+      log.maybeHandleIOException("Simulating failed log dir") {

Review comment:
       That only seems to work when writing to the transaction index (because the file opening is deferred so that it's only at the point where we try to append some transaction markers that the index file open is attempted and the path found to be a directory, which means that the exception propagates through `Log`). Renaming the segment file, on the other hand, doesn't propagate through `Log`, because the rename throws immediately and propagates directly from `FileRecords` to the test. Obviously when the exception doesn't propagate via `Log` the `logDirOffline` doesn't get set.
   
   Making this change kind-of mixes transactional appends into a test for something with is orthogonal to transactional logs. In any case, I've done as you suggest, I just thought it worth mentioning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#issuecomment-755208318


   Thanks for the review @hachikuji, now fixed. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9676:
URL: https://github.com/apache/kafka/pull/9676


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r534502482



##########
File path: core/src/main/scala/kafka/server/LogDirFailureChannel.scala
##########
@@ -49,6 +49,13 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
       offlineLogDirQueue.add(logDir)
   }
 
+  /*
+   * Return whether the given log dir is offline.
+   */
+  def logDirIsFailed(logDir: String): Boolean = {

Review comment:
       Maybe `isOffline` or `hasFailed`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r552084319



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1093,19 +1097,25 @@ class Log(@volatile private var _dir: File,
                      assignOffsets: Boolean,
                      leaderEpoch: Int,
                      ignoreRecordSize: Boolean): LogAppendInfo = {
-    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
-      val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize)
 
-      // return if we have no valid messages or if this is a duplicate of the last appended entry
-      if (appendInfo.shallowCount == 0) appendInfo
-      else {
+    val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize)
 
-        // trim any invalid bytes or partial messages before appending it to the on-disk log
-        var validRecords = trimInvalidBytes(records, appendInfo)
+    // return if we have no valid messages or if this is a duplicate of the last appended entry
+    if (appendInfo.shallowCount == 0) appendInfo
+    else {
 
-        // they are valid, insert them in the log
-        lock synchronized {
+      // trim any invalid bytes or partial messages before appending it to the on-disk log
+      var validRecords = trimInvalidBytes(records, appendInfo)
+
+      // they are valid, insert them in the log
+      lock synchronized {
+        maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
           checkIfMemoryMappedBufferClosed()
+
+          // check for offline log dir in case a retry following an IOException happens before the log dir
+          // is taken offline, which would result in inconsistent producer state
+          checkForLogDirFailure()

Review comment:
       Do we still need this since this check is in `maybeHandleIOException`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r538686784



##########
File path: core/src/main/scala/kafka/server/LogDirFailureChannel.scala
##########
@@ -49,6 +49,13 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
       offlineLogDirQueue.add(logDir)
   }
 
+  /*
+   * Return whether the given log dir is offline.
+   */
+  def logDirIsOffline(logDir: String): Boolean = {
+    offlineLogDirs.containsKey(logDir)

Review comment:
       I am not sure if it is really necessary, but since offline dirs are a rare situation, I'm wondering if makes sense to optimize for the common case to avoid the lookup. For example, maybe we could leave `offlineLogDirs` uninitialized until the first log dir failure.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1106,6 +1106,13 @@ class Log(@volatile private var _dir: File,
         // they are valid, insert them in the log
         lock synchronized {
           checkIfMemoryMappedBufferClosed()
+
+          // check for offline log dir in case a retry following an IOException happens before the log dir

Review comment:
       Hmm.. In case there is an IOException on an append, we will release the lock and fail the log dir in `maybeHandleIOException`. There is a window for another append to sneak by. It looks like it should be possible to pull `maybeHandleIOException` into the locked section here since `analyzeAndValidateRecords` does not do any IO.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#issuecomment-742396820


   @hachikuji @guozhangwang I decided to switch to a flag rather than implementing the uninitialised `offlineLogDirs` with an `AtomicReference` which would be contented by every thread operating on logs in the same parent directory. Please could you take another look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r534502282



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1219,6 +1219,9 @@ class Log(@volatile private var _dir: File,
               appendInfo.logAppendTime = duplicate.timestamp
               appendInfo.logStartOffset = logStartOffset
             case None =>
+              if (logDirFailureChannel.logDirIsFailed(parentDir)) {

Review comment:
       Perhaps we could move this to somewhere near the top? I don't think we get much benefit by delaying the check since duplicates would be a rare case. We probably don't want to have to trust the producer state anyway after an append failure.

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -2818,6 +2818,21 @@ class LogTest {
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
   }
 
+  @Test
+  def testAppendToLogInFailedLogDir(): Unit = {
+    val log = createLog(logDir, LogConfig())
+    log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
+    assertEquals(0, readLog(log, 0, 4096).records.records.iterator.next().offset)
+    log.logDirFailureChannel.maybeAddOfflineLogDir(logDir.getParent, "Simulating failed log dir", new IOException("Test failure"))
+    try {
+      log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
+      fail()

Review comment:
       nit: we can use `assertThrows` or `intercept`

##########
File path: core/src/main/scala/kafka/server/LogDirFailureChannel.scala
##########
@@ -49,6 +49,13 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
       offlineLogDirQueue.add(logDir)
   }
 
+  /*
+   * Return whether the given log dir is offline.
+   */
+  def logDirIsFailed(logDir: String): Boolean = {

Review comment:
       Maybe `isOffline`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org