You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2022/03/01 18:56:54 UTC

[kafka] branch trunk updated: MINOR: Ensure LocalLog.flush is thread safe to recoveryPoint changes (#11814)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 67e99a4  MINOR: Ensure LocalLog.flush is thread safe to recoveryPoint changes (#11814)
67e99a4 is described below

commit 67e99a42361b8f0febc3e691488c668ebfe257e9
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Tue Mar 1 10:55:17 2022 -0800

    MINOR: Ensure LocalLog.flush is thread safe to recoveryPoint changes (#11814)
    
    Issue:
    Imagine a scenario where two threads T1 and T2 are inside UnifiedLog.flush() concurrently:
    
    KafkaScheduler thread T1 -> The periodic work calls LogManager.flushDirtyLogs() which in turn calls UnifiedLog.flush(). For example, this can happen due to log.flush.scheduler.interval.ms here.
    KafkaScheduler thread T2 -> A UnifiedLog.flush() call is triggered asynchronously during segment roll here.
    Supposing if thread T1 advances the recovery point beyond the flush offset of thread T2, then this could trip the check within LogSegments.values() here for thread T2, when it is called from LocalLog.flush() here. The exception causes the KafkaScheduler thread to die, which is not desirable.
    
    Fix:
    We fix this by ensuring that LocalLog.flush() is immune to the case where the recoveryPoint advances beyond the flush offset.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/LocalLog.scala | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index 7cd3a5d..86ac672 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -168,11 +168,14 @@ class LocalLog(@volatile private var _dir: File,
    * @param offset The offset to flush up to (non-inclusive)
    */
   private[log] def flush(offset: Long): Unit = {
-    val segmentsToFlush = segments.values(recoveryPoint, offset)
-    segmentsToFlush.foreach(_.flush())
-    // If there are any new segments, we need to flush the parent directory for crash consistency.
-    if (segmentsToFlush.exists(_.baseOffset >= this.recoveryPoint))
-      Utils.flushDir(dir.toPath)
+    val currentRecoveryPoint = recoveryPoint
+    if (currentRecoveryPoint <= offset) {
+      val segmentsToFlush = segments.values(currentRecoveryPoint, offset)
+      segmentsToFlush.foreach(_.flush())
+      // If there are any new segments, we need to flush the parent directory for crash consistency.
+      if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint))
+        Utils.flushDir(dir.toPath)
+    }
   }
 
   /**