You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/16 06:48:13 UTC

[kafka] branch 2.4 updated: KAFKA-9838; Add log concurrency test and fix minor race condition (#8476)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new e1f18df  KAFKA-9838; Add log concurrency test and fix minor race condition (#8476)
e1f18df is described below

commit e1f18df7f6615109b6cc77b66d9be37b09256a0a
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Apr 15 17:18:30 2020 -0700

    KAFKA-9838; Add log concurrency test and fix minor race condition (#8476)
    
    The patch adds a new test case for validating concurrent read/write behavior in the `Log` implementation. In the process of verifying this, we found a race condition in `read`. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, `LogSegment.read` fails because  [...]
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            |  11 +-
 .../scala/unit/kafka/log/LogConcurrencyTest.scala  | 183 +++++++++++++++++++++
 2 files changed, 188 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 98790d2..91bb1eb 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1480,10 +1480,7 @@ class Log(@volatile var dir: File,
       // Because we don't use the lock for reading, the synchronization is a little bit tricky.
       // We create the local variables to avoid race conditions with updates to the log.
       val endOffsetMetadata = nextOffsetMetadata
-      val endOffset = nextOffsetMetadata.messageOffset
-      if (startOffset == endOffset)
-        return emptyFetchDataInfo(endOffsetMetadata, includeAbortedTxns)
-
+      val endOffset = endOffsetMetadata.messageOffset
       var segmentEntry = segments.floorEntry(startOffset)
 
       // return error on attempt to read beyond the log end offset or read below log start offset
@@ -1492,12 +1489,14 @@ class Log(@volatile var dir: File,
           s"but we only have log segments in the range $logStartOffset to $endOffset.")
 
       val maxOffsetMetadata = isolation match {
-        case FetchLogEnd => nextOffsetMetadata
+        case FetchLogEnd => endOffsetMetadata
         case FetchHighWatermark => fetchHighWatermarkMetadata
         case FetchTxnCommitted => fetchLastStableOffsetMetadata
       }
 
-      if (startOffset > maxOffsetMetadata.messageOffset) {
+      if (startOffset == maxOffsetMetadata.messageOffset) {
+        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
+      } else if (startOffset > maxOffsetMetadata.messageOffset) {
         val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
         return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
new file mode 100644
index 0000000..0a9e71f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.Properties
+import java.util.concurrent.{Callable, Executors}
+
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, LogDirFailureChannel}
+import kafka.utils.{KafkaScheduler, TestUtils}
+import org.apache.kafka.common.record.SimpleRecord
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
+
+class LogConcurrencyTest {
+  private val brokerTopicStats = new BrokerTopicStats
+  private val random = new Random()
+  private val scheduler = new KafkaScheduler(1)
+  private val tmpDir = TestUtils.tempDir()
+  private val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+
+  @Before
+  def setup(): Unit = {
+    scheduler.startup()
+  }
+
+  @After
+  def shutdown(): Unit = {
+    scheduler.shutdown()
+    Utils.delete(tmpDir)
+  }
+
+  @Test
+  def testUncommittedDataNotConsumed(): Unit = {
+    testUncommittedDataNotConsumed(createLog())
+  }
+
+  @Test
+  def testUncommittedDataNotConsumedFrequentSegmentRolls(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 237: Integer)
+    val logConfig = LogConfig(logProps)
+    testUncommittedDataNotConsumed(createLog(logConfig))
+  }
+
+  def testUncommittedDataNotConsumed(log: Log): Unit = {
+    val executor = Executors.newFixedThreadPool(2)
+    try {
+      val maxOffset = 5000
+      val consumer = new ConsumerTask(log, maxOffset)
+      val appendTask = new LogAppendTask(log, maxOffset)
+
+      val consumerFuture = executor.submit(consumer)
+      val fetcherTaskFuture = executor.submit(appendTask)
+
+      fetcherTaskFuture.get()
+      consumerFuture.get()
+
+      validateConsumedData(log, consumer.consumedBatches)
+    } finally executor.shutdownNow()
+  }
+
+  /**
+   * Simple consumption task which reads the log in ascending order and collects
+   * consumed batches for validation
+   */
+  private class ConsumerTask(log: Log, lastOffset: Int) extends Callable[Unit] {
+    val consumedBatches = ListBuffer.empty[FetchedBatch]
+
+    override def call(): Unit = {
+      var fetchOffset = 0L
+      while (log.highWatermark < lastOffset) {
+        val readInfo = log.read(
+          startOffset = fetchOffset,
+          maxLength = 1,
+          isolation = FetchHighWatermark,
+          minOneMessage = true
+        )
+        readInfo.records.batches().forEach { batch =>
+          consumedBatches += FetchedBatch(batch.baseOffset, batch.partitionLeaderEpoch)
+          fetchOffset = batch.lastOffset + 1
+        }
+      }
+    }
+  }
+
+  /**
+   * This class simulates basic leader/follower behavior.
+   */
+  private class LogAppendTask(log: Log, lastOffset: Long) extends Callable[Unit] {
+    override def call(): Unit = {
+      var leaderEpoch = 1
+      var isLeader = true
+
+      while (log.highWatermark < lastOffset) {
+        random.nextInt(2) match {
+          case 0 =>
+            val logEndOffsetMetadata = log.logEndOffsetMetadata
+            val logEndOffset = logEndOffsetMetadata.messageOffset
+            val batchSize = random.nextInt(9) + 1
+            val records = (0 to batchSize).map(i => new SimpleRecord(s"$i".getBytes))
+
+            if (isLeader) {
+              log.appendAsLeader(TestUtils.records(records), leaderEpoch)
+              log.maybeIncrementHighWatermark(logEndOffsetMetadata)
+            } else {
+              log.appendAsFollower(TestUtils.records(records,
+                baseOffset = logEndOffset,
+                partitionLeaderEpoch = leaderEpoch))
+              log.updateHighWatermark(logEndOffset)
+            }
+
+          case 1 =>
+            isLeader = !isLeader
+            leaderEpoch += 1
+
+            if (!isLeader) {
+              log.truncateTo(log.highWatermark)
+            }
+        }
+      }
+    }
+  }
+
+  private def createLog(config: LogConfig = LogConfig(new Properties())): Log = {
+    Log(dir = logDir,
+      config = config,
+      logStartOffset = 0L,
+      recoveryPoint = 0L,
+      scheduler = scheduler,
+      brokerTopicStats = brokerTopicStats,
+      time = Time.SYSTEM,
+      maxProducerIdExpirationMs = 60 * 60 * 1000,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10))
+  }
+
+  private def validateConsumedData(log: Log, consumedBatches: Iterable[FetchedBatch]): Unit = {
+    val iter = consumedBatches.iterator
+    log.logSegments.foreach { segment =>
+      segment.log.batches.forEach { batch =>
+        if (iter.hasNext) {
+          val consumedBatch = iter.next()
+          try {
+            assertEquals("Consumed batch with unexpected leader epoch",
+              batch.partitionLeaderEpoch, consumedBatch.epoch)
+            assertEquals("Consumed batch with unexpected base offset",
+              batch.baseOffset, consumedBatch.baseOffset)
+          } catch {
+            case t: Throwable =>
+              throw new AssertionError(s"Consumed batch $consumedBatch " +
+                s"does not match next expected batch in log $batch", t)
+          }
+        }
+      }
+    }
+  }
+
+  private case class FetchedBatch(baseOffset: Long, epoch: Int) {
+    override def toString: String = {
+      s"FetchedBatch(baseOffset=$baseOffset, epoch=$epoch)"
+    }
+  }
+
+}