You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/14 15:27:04 UTC

[kafka] branch trunk updated: KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5133)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37a4d5e  KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5133)
37a4d5e is described below

commit 37a4d5ea46bb5ba1bdf2d7cb03c3678a128b6f9e
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Thu Jun 14 08:26:45 2018 -0700

    KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5133)
    
    It is possible that log start offset may fall in the middle of the batch after AdminClient#deleteRecords(). This will cause a follower starting from log start offset to fail fetching (all records). Use-cases when a follower will start fetching from log start offset includes: 1) new replica due to partition re-assignment; 2) new local replica created as a result of AdminClient#AlterReplicaLogDirs(); 3) broker that was down for some time while AdminClient#deleteRecords() move log start  [...]
    
    Added two integration tests:
    1) Produce and then AdminClient#deleteRecords() while one of the followers is down, and then restart of the follower requires fetching from log start offset;
    2)  AdminClient#AlterReplicaLogDirs() after AdminClient#deleteRecords()
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  47 +++++-
 .../kafka/common/OffsetsOutOfOrderException.scala  |  25 +++
 .../common/UnexpectedAppendOffsetException.scala   |  29 ++++
 core/src/main/scala/kafka/log/Log.scala            |  50 ++++--
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   3 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   2 +-
 .../kafka/api/AdminClientIntegrationTest.scala     |  77 +++++++++
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 174 +++++++++++++++++++++
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  69 +++++++-
 9 files changed, 449 insertions(+), 27 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index b9180a4..55f870e 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.LeaderAndIsr
 import kafka.api.Request
+import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogConfig}
 import kafka.metrics.KafkaMetricsGroup
@@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.AdminZkClient
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
@@ -187,6 +188,10 @@ class Partition(val topic: String,
 
   def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId))
 
+  def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
+    getReplica(replicaId).getOrElse(
+      throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition"))
+
   def leaderReplicaIfLocal: Option[Replica] =
     leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
 
@@ -545,15 +550,41 @@ class Partition(val topic: String,
     laggingReplicas
   }
 
-  def appendRecordsToFutureReplica(records: MemoryRecords) {
-    getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records)
+  private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = {
+      if (isFuture)
+        getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records)
+      else {
+        // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
+        // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
+        inReadLock(leaderIsrUpdateLock) {
+           getReplicaOrException().log.get.appendAsFollower(records)
+        }
+      }
   }
 
-  def appendRecordsToFollower(records: MemoryRecords) {
-    // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
-    // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
-    inReadLock(leaderIsrUpdateLock) {
-      getReplica().get.log.get.appendAsFollower(records)
+  def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) {
+    try {
+      doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+    } catch {
+      case e: UnexpectedAppendOffsetException =>
+        val replica = if (isFuture) getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException()
+        val logEndOffset = replica.logEndOffset.messageOffset
+        if (logEndOffset == replica.logStartOffset &&
+            e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
+          // This may happen if the log start offset on the leader (or current replica) falls in
+          // the middle of the batch due to delete records request and the follower tries to
+          // fetch its first offset from the leader.
+          // We handle this case here instead of Log#append() because we will need to remove the
+          // segment that start with log start offset and create a new one with earlier offset
+          // (base offset of the batch), which will move recoveryPoint backwards, so we will need
+          // to checkpoint the new recovery point before we append
+          val replicaName = if (isFuture) "future replica" else "follower"
+          info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${replica.logStartOffset}." +
+               s" Since this is the first record to be appended to the $replicaName's log, will start the log from offset ${e.firstOffset}.")
+          truncateFullyAndStartAt(e.firstOffset, isFuture)
+          doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+        } else
+          throw e
     }
   }
 
diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
new file mode 100644
index 0000000..f8daaa4
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower received records with non-monotonically increasing offsets
+ */
+class OffsetsOutOfOrderException(message: String) extends RuntimeException(message) {
+}
+
diff --git a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
new file mode 100644
index 0000000..e719a93
--- /dev/null
+++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower or the future replica received records from the leader (or current
+ * replica) with first offset less than expected next offset. 
+ * @param firstOffset The first offset of the records to append
+ * @param lastOffset  The last offset of the records to append
+ */
+class UnexpectedAppendOffsetException(val message: String,
+                                      val firstOffset: Long,
+                                      val lastOffset: Long) extends RuntimeException(message) {
+}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c7d2a6e..c92beee 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -29,7 +29,7 @@ import java.util.regex.Pattern
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_10_0_IV0
-import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef}
+import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef, UnexpectedAppendOffsetException, OffsetsOutOfOrderException}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
@@ -49,11 +49,11 @@ import scala.collection.{Seq, Set, mutable}
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
-    RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+    RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L)
 
   def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
     LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
-      RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+      RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L)
 }
 
 /**
@@ -72,6 +72,7 @@ object LogAppendInfo {
  * @param shallowCount The number of shallow messages
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
+ * @param lastOffsetOfFirstBatch The last offset of the first batch
  */
 case class LogAppendInfo(var firstOffset: Option[Long],
                          var lastOffset: Long,
@@ -84,12 +85,15 @@ case class LogAppendInfo(var firstOffset: Option[Long],
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
                          validBytes: Int,
-                         offsetsMonotonic: Boolean) {
+                         offsetsMonotonic: Boolean,
+                         lastOffsetOfFirstBatch: Long) {
   /**
-   * Get the first offset if it exists, else get the last offset.
-   * @return The offset of first message if it exists; else offset of the last message.
+   * Get the first offset if it exists, else get the last offset of the first batch
+   * For magic versions 2 and newer, this method will return first offset. For magic versions
+   * older than 2, we use the last offset of the first batch as an approximation of the first
+   * offset to avoid decompressing the data.
    */
-  def firstOrLastOffset: Long = firstOffset.getOrElse(lastOffset)
+  def firstOrLastOffsetOfFirstBatch: Long = firstOffset.getOrElse(lastOffsetOfFirstBatch)
 
   /**
    * Get the (maximum) number of messages described by LogAppendInfo
@@ -736,6 +740,8 @@ class Log(@volatile var dir: File,
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
    * @throws KafkaStorageException If the append fails due to an I/O error.
+   * @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
+   * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
    * @return Information about the appended messages including the first and last offset.
    */
   private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
@@ -798,9 +804,27 @@ class Log(@volatile var dir: File,
           }
         } else {
           // we are taking the offsets we are given
-          if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < nextOffsetMetadata.messageOffset)
-            throw new IllegalArgumentException(s"Out of order offsets found in append to $topicPartition: " +
-              records.records.asScala.map(_.offset))
+          if (!appendInfo.offsetsMonotonic)
+            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
+                                                 records.records.asScala.map(_.offset))
+
+          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
+            // we may still be able to recover if the log is empty
+            // one example: fetching from log start offset on the leader which is not batch aligned,
+            // which may happen as a result of AdminClient#deleteRecords()
+            val firstOffset = appendInfo.firstOffset match {
+              case Some(offset) => offset
+              case None => records.batches.asScala.head.baseOffset()
+            }
+
+            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
+            throw new UnexpectedAppendOffsetException(
+              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
+              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
+              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
+              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
+              firstOffset, appendInfo.lastOffset)
+          }
         }
 
         // update the epoch cache with the epoch stamped onto the message by the leader
@@ -830,7 +854,7 @@ class Log(@volatile var dir: File,
         val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
 
         val logOffsetMetadata = LogOffsetMetadata(
-          messageOffset = appendInfo.firstOrLastOffset,
+          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
           segmentBaseOffset = segment.baseOffset,
           relativePositionInSegment = segment.size)
 
@@ -970,6 +994,7 @@ class Log(@volatile var dir: File,
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
     var readFirstMessage = false
+    var lastOffsetOfFirstBatch = -1L
 
     for (batch <- records.batches.asScala) {
       // we only validate V2 and higher to avoid potential compatibility issues with older clients
@@ -986,6 +1011,7 @@ class Log(@volatile var dir: File,
       if (!readFirstMessage) {
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
           firstOffset = Some(batch.baseOffset)
+        lastOffsetOfFirstBatch = batch.lastOffset
         readFirstMessage = true
       }
 
@@ -1024,7 +1050,7 @@ class Log(@volatile var dir: File,
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
     LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
-      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
   }
 
   private def updateProducers(batch: RecordBatch,
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 5a505c3..e46473b 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -98,8 +98,7 @@ class ReplicaAlterLogDirsThread(name: String,
       throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
         topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
 
-    // Append the leader's messages to the log
-    partition.appendRecordsToFutureReplica(records)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
     val futureReplicaHighWatermark = futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
     futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark)
     futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index cf8d829..80940f6 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String,
         .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
-    partition.appendRecordsToFollower(records)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
 
     if (isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 50ed7ae..d6f349c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -813,6 +813,83 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
+  def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
+    val leaders = createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+    val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
+
+    def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
+      TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.getReplica(topicPartition) != None,
+                              "Expected follower to create replica for partition")
+
+      // wait until the follower discovers that log start offset moved beyond its HW
+      TestUtils.waitUntilTrue(() => {
+        servers(followerIndex).replicaManager.getReplica(topicPartition).get.logStartOffset == expectedStartOffset
+      }, s"Expected follower to discover new log start offset $expectedStartOffset")
+
+      TestUtils.waitUntilTrue(() => {
+        servers(followerIndex).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset == expectedEndOffset
+      }, s"Expected follower to catch up to log end offset $expectedEndOffset")
+    }
+
+    // we will produce to topic and delete records while one follower is down
+    killBroker(followerIndex)
+
+    client = AdminClient.create(createConfig)
+    sendRecords(producers.head, 100, topicPartition)
+
+    val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
+    result.all().get()
+
+    // start the stopped broker to verify that it will be able to fetch from new log start offset
+    restartDeadBrokers()
+
+    waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L)
+
+    // after the new replica caught up, all replicas should have same log start offset
+    for (i <- 0 until serverCount)
+      assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+
+    // kill the same follower again, produce more records, and delete records beyond follower's LOE
+    killBroker(followerIndex)
+    sendRecords(producers.head, 100, topicPartition)
+    val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava)
+    result1.all().get()
+    restartDeadBrokers()
+    waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
+  }
+
+  @Test
+  def testAlterLogDirsAfterDeleteRecords(): Unit = {
+    client = AdminClient.create(createConfig)
+    createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+    val expectedLEO = 100
+    sendRecords(producers.head, expectedLEO, topicPartition)
+
+    // delete records to move log start offset
+    val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
+    result.all().get()
+    // make sure we are in the expected state after delete records
+    for (i <- 0 until serverCount) {
+      assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+      assertEquals(expectedLEO, servers(i).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset)
+    }
+
+    // we will create another dir just for one server
+    val futureLogDir = servers(0).config.logDirs(1)
+    val futureReplica = new TopicPartitionReplica(topic, 0, servers(0).config.brokerId)
+
+    // Verify that replica can be moved to the specified log directory
+    client.alterReplicaLogDirs(Map(futureReplica -> futureLogDir).asJava).all.get
+    TestUtils.waitUntilTrue(() => {
+      futureLogDir == servers(0).logManager.getLog(topicPartition).get.dir.getParent
+    }, "timed out waiting for replica movement")
+
+    // once replica moved, its LSO and LEO should match other replicas
+    assertEquals(3, servers(0).replicaManager.getReplica(topicPartition).get.logStartOffset)
+    assertEquals(expectedLEO, servers(0).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset)
+  }
+
+  @Test
   def testOffsetsForTimesAfterDeleteRecords(): Unit = {
     createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
 
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
new file mode 100644
index 0000000..fe5d578
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.UnexpectedAppendOffsetException
+import kafka.log.{Log, LogConfig, LogManager, CleanerConfig}
+import kafka.server._
+import kafka.utils.{MockTime, TestUtils, MockScheduler}
+import kafka.utils.timer.MockTimer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record._
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.scalatest.Assertions.assertThrows
+import scala.collection.JavaConverters._
+
+class PartitionTest {
+
+  val brokerId = 101
+  val topicPartition = new TopicPartition("test-topic", 0)
+  val time = new MockTime()
+  val brokerTopicStats = new BrokerTopicStats
+  val metrics = new Metrics
+
+  var tmpDir: File = _
+  var logDir: File = _
+  var replicaManager: ReplicaManager = _
+  var logManager: LogManager = _
+  var logConfig: LogConfig = _
+
+  @Before
+  def setup(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+    logConfig = LogConfig(logProps)
+
+    tmpDir = TestUtils.tempDir()
+    logDir = TestUtils.randomPartitionLogDir(tmpDir)
+    logManager = TestUtils.createLogManager(
+      logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
+    logManager.startup()
+
+    val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
+    brokerProps.put("log.dir", logDir.getAbsolutePath)
+    val brokerConfig = KafkaConfig.fromProps(brokerProps)
+    replicaManager = new ReplicaManager(
+      config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time),
+      logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
+      brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size))
+  }
+
+  @After
+  def tearDown(): Unit = {
+    brokerTopicStats.close()
+    metrics.close()
+
+    logManager.shutdown()
+    Utils.delete(tmpDir)
+    logManager.liveLogDirs.foreach(Utils.delete)
+    replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
+  def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+    partition.addReplicaIfNotExists(replica)
+    assertEquals(Some(replica), partition.getReplica(replica.brokerId))
+
+    val initialLogStartOffset = 5L
+    partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false)
+    assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:",
+                 initialLogStartOffset, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset after truncate fully and start at $initialLogStartOffset:",
+                 initialLogStartOffset, replica.logStartOffset)
+
+    // verify that we cannot append records that do not contain log start offset even if the log is empty
+    assertThrows[UnexpectedAppendOffsetException] {
+      // append one record with offset = 3
+      partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false)
+    }
+    assertEquals(s"Log end offset should not change after failure to append", initialLogStartOffset, replica.logEndOffset.messageOffset)
+
+    // verify that we can append records that contain log start offset, even when first
+    // offset < log start offset if the log is empty
+    val newLogStartOffset = 4L
+    val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+                                     new SimpleRecord("k2".getBytes, "v2".getBytes),
+                                     new SimpleRecord("k3".getBytes, "v3".getBytes)),
+                                baseOffset = newLogStartOffset)
+    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
+    assertEquals(s"Log end offset after append of 3 records with base offset $newLogStartOffset:", 7L, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset after append of 3 records with base offset $newLogStartOffset:", newLogStartOffset, replica.logStartOffset)
+
+    // and we can append more records after that
+    partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false)
+    assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset)
+
+    // but we cannot append to offset < log start if the log is not empty
+    assertThrows[UnexpectedAppendOffsetException] {
+      val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+                                        new SimpleRecord("k2".getBytes, "v2".getBytes)),
+                                   baseOffset = 3L)
+      partition.appendRecordsToFollowerOrFutureReplica(records2, isFuture = false)
+    }
+    assertEquals(s"Log end offset should not change after failure to append", 8L, replica.logEndOffset.messageOffset)
+
+    // we still can append to next offset
+    partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false)
+    assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset)
+  }
+
+  @Test
+  def testGetReplica(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+    val partition = new
+        Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+
+    assertEquals(None, partition.getReplica(brokerId))
+    assertThrows[ReplicaNotAvailableException] {
+      partition.getReplicaOrException(brokerId)
+    }
+
+    partition.addReplicaIfNotExists(replica)
+    assertEquals(replica, partition.getReplicaOrException(brokerId))
+  }
+
+  @Test
+  def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+    assertThrows[ReplicaNotAvailableException] {
+      partition.appendRecordsToFollowerOrFutureReplica(
+           createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false)
+    }
+  }
+
+  def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
+    val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+    val builder = MemoryRecords.builder(
+      buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.LOG_APPEND_TIME,
+      baseOffset, time.milliseconds, partitionLeaderEpoch)
+    records.foreach(builder.append)
+    builder.build()
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1171e5e..6c62e5e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,7 +22,8 @@ import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
 import java.util.Properties
 
-import kafka.common.KafkaException
+import org.apache.kafka.common.errors._
+import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException, KafkaException}
 import kafka.log.Log.DeleteDirSuffix
 import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
@@ -42,6 +43,7 @@ import org.junit.{After, Before, Test}
 import scala.collection.Iterable
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import org.scalatest.Assertions.{assertThrows, intercept, withClue}
 
 class LogTest {
   var config: KafkaConfig = null
@@ -1885,13 +1887,72 @@ class LogTest {
     assertTrue("Message payload should be null.", !head.hasValue)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test
   def testAppendWithOutOfOrderOffsetsThrowsException() {
     val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+
+    val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
+    val buffer = ByteBuffer.allocate(512)
+    for (offset <- appendOffsets) {
+      val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
+                                          TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(),
+                                          1L, 0, 0, false, 0)
+      builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+      builder.close()
+    }
+    buffer.flip()
+    val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+    assertThrows[OffsetsOutOfOrderException] {
+      log.appendAsFollower(memoryRecords)
+    }
+  }
+
+  @Test
+  def testAppendBelowExpectedOffsetThrowsException() {
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
     val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0))
-    val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
-    log.appendAsFollower(invalidRecord)
+
+    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+    for (magic <- magicVals; compression <- compressionTypes) {
+      val invalidRecord = MemoryRecords.withRecords(magic, compression, new SimpleRecord(1.toString.getBytes))
+      withClue(s"Magic=$magic, compressionType=$compression") {
+        assertThrows[UnexpectedAppendOffsetException] {
+          log.appendAsFollower(invalidRecord)
+        }
+      }
+    }
+  }
+
+  @Test
+  def testAppendEmptyLogBelowLogStartOffsetThrowsException() {
+    createEmptyLogs(logDir, 7)
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    assertEquals(7L, log.logStartOffset)
+    assertEquals(7L, log.logEndOffset)
+
+    val firstOffset = 4L
+    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+    for (magic <- magicVals; compression <- compressionTypes) {
+      val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+                                         new SimpleRecord("k2".getBytes, "v2".getBytes),
+                                         new SimpleRecord("k3".getBytes, "v3".getBytes)),
+                                    magicValue = magic, codec = compression,
+                                    baseOffset = firstOffset)
+
+      withClue(s"Magic=$magic, compressionType=$compression") {
+        val exception = intercept[UnexpectedAppendOffsetException] {
+          log.appendAsFollower(records = batch)
+        }
+        assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#firstOffset",
+                     firstOffset, exception.firstOffset)
+        assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#lastOffset",
+                     firstOffset + 2, exception.lastOffset)
+      }
+    }
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.