You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/03 23:13:25 UTC

[kafka] branch 2.4 updated: KAFKA-9807; Protect LSO reads from concurrent high-watermark updates (#8418)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 627d8e0  KAFKA-9807; Protect LSO reads from concurrent high-watermark updates (#8418)
627d8e0 is described below

commit 627d8e093cddf70b5e092dee44c250433abc3a96
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Apr 3 13:56:42 2020 -0700

    KAFKA-9807; Protect LSO reads from concurrent high-watermark updates (#8418)
    
    If the high-watermark is updated in the middle of a read with the `read_committed` isolation level, it is possible to return data above the LSO. In the worst case, this can lead to the read of an aborted transaction. The root cause is that the logic depends on reading the high-watermark twice. We fix the problem by reading it once and caching the value.
    
    Reviewers: David Arthur <mu...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/log/Log.scala          |  7 ++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 54 ++++++++++++++++++++++++
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 634f216..98790d2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -411,8 +411,11 @@ class Log(@volatile var dir: File,
   private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
     checkIfMemoryMappedBufferClosed()
 
+    // cache the current high watermark to avoid a concurrent update invalidating the range check
+    val highWatermarkMetadata = fetchHighWatermarkMetadata
+
     firstUnstableOffsetMetadata match {
-      case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark =>
+      case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset =>
         if (offsetMetadata.messageOffsetOnly) {
           lock synchronized {
             val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
@@ -423,7 +426,7 @@ class Log(@volatile var dir: File,
         } else {
           offsetMetadata
         }
-      case _ => fetchHighWatermarkMetadata
+      case _ => highWatermarkMetadata
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1e2e047..0831a85 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,6 +20,7 @@ package kafka.log
 import java.io._
 import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
+import java.util.concurrent.{Callable, Executors}
 import java.util.regex.Pattern
 import java.util.{Collections, Optional, Properties}
 
@@ -3643,6 +3644,59 @@ class LogTest {
   }
 
   @Test
+  def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
+    val lastOffset = 50L
+
+    val producerEpoch = 0.toShort
+    val producerId = 15L
+    val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
+
+    // Thread 1 writes single-record transactions and attempts to read them
+    // before they have been aborted, and then aborts them
+    val txnWriteAndReadLoop: Callable[Int] = () => {
+      var nonEmptyReads = 0
+      while (log.logEndOffset < lastOffset) {
+        val currentLogEndOffset = log.logEndOffset
+
+        appendProducer(1)
+
+        val readInfo = log.read(
+          startOffset = currentLogEndOffset,
+          maxLength = Int.MaxValue,
+          isolation = FetchTxnCommitted,
+          minOneMessage = false)
+
+        if (readInfo.records.sizeInBytes() > 0)
+          nonEmptyReads += 1
+
+        appendEndTxnMarkerAsLeader(log, producerId, producerEpoch, ControlRecordType.ABORT)
+      }
+      nonEmptyReads
+    }
+
+    // Thread 2 watches the log and updates the high watermark
+    val hwUpdateLoop: Runnable = () => {
+      while (log.logEndOffset < lastOffset) {
+        log.updateHighWatermark(log.logEndOffset)
+      }
+    }
+
+    val executor = Executors.newFixedThreadPool(2)
+    try {
+      executor.submit(hwUpdateLoop)
+
+      val future = executor.submit(txnWriteAndReadLoop)
+      val nonEmptyReads = future.get()
+
+      assertEquals(0, nonEmptyReads)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
   def testTransactionIndexUpdated(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)