You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/14 15:27:04 UTC
[kafka] branch trunk updated: KAFKA-6975;
Fix replica fetching from non-batch-aligned log start offset (#5133)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 37a4d5e KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5133)
37a4d5e is described below
commit 37a4d5ea46bb5ba1bdf2d7cb03c3678a128b6f9e
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Thu Jun 14 08:26:45 2018 -0700
KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5133)
It is possible that log start offset may fall in the middle of the batch after AdminClient#deleteRecords(). This will cause a follower starting from log start offset to fail fetching (all records). Use-cases when a follower will start fetching from log start offset includes: 1) new replica due to partition re-assignment; 2) new local replica created as a result of AdminClient#AlterReplicaLogDirs(); 3) broker that was down for some time while AdminClient#deleteRecords() move log start [...]
Added two integration tests:
1) Produce and then AdminClient#deleteRecords() while one of the followers is down, and then restart of the follower requires fetching from log start offset;
2) AdminClient#AlterReplicaLogDirs() after AdminClient#deleteRecords()
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/cluster/Partition.scala | 47 +++++-
.../kafka/common/OffsetsOutOfOrderException.scala | 25 +++
.../common/UnexpectedAppendOffsetException.scala | 29 ++++
core/src/main/scala/kafka/log/Log.scala | 50 ++++--
.../kafka/server/ReplicaAlterLogDirsThread.scala | 3 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 2 +-
.../kafka/api/AdminClientIntegrationTest.scala | 77 +++++++++
.../scala/unit/kafka/cluster/PartitionTest.scala | 174 +++++++++++++++++++++
core/src/test/scala/unit/kafka/log/LogTest.scala | 69 +++++++-
9 files changed, 449 insertions(+), 27 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index b9180a4..55f870e 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import com.yammer.metrics.core.Gauge
import kafka.api.LeaderAndIsr
import kafka.api.Request
+import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.KafkaController
import kafka.log.{LogAppendInfo, LogConfig}
import kafka.metrics.KafkaMetricsGroup
@@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.AdminZkClient
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.record.MemoryRecords
@@ -187,6 +188,10 @@ class Partition(val topic: String,
def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId))
+ def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
+ getReplica(replicaId).getOrElse(
+ throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition"))
+
def leaderReplicaIfLocal: Option[Replica] =
leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
@@ -545,15 +550,41 @@ class Partition(val topic: String,
laggingReplicas
}
- def appendRecordsToFutureReplica(records: MemoryRecords) {
- getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records)
+ private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = {
+ if (isFuture)
+ getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records)
+ else {
+ // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
+ // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
+ inReadLock(leaderIsrUpdateLock) {
+ getReplicaOrException().log.get.appendAsFollower(records)
+ }
+ }
}
- def appendRecordsToFollower(records: MemoryRecords) {
- // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
- // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
- inReadLock(leaderIsrUpdateLock) {
- getReplica().get.log.get.appendAsFollower(records)
+ def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) {
+ try {
+ doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+ } catch {
+ case e: UnexpectedAppendOffsetException =>
+ val replica = if (isFuture) getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException()
+ val logEndOffset = replica.logEndOffset.messageOffset
+ if (logEndOffset == replica.logStartOffset &&
+ e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
+ // This may happen if the log start offset on the leader (or current replica) falls in
+ // the middle of the batch due to delete records request and the follower tries to
+ // fetch its first offset from the leader.
+ // We handle this case here instead of Log#append() because we will need to remove the
+ // segment that start with log start offset and create a new one with earlier offset
+ // (base offset of the batch), which will move recoveryPoint backwards, so we will need
+ // to checkpoint the new recovery point before we append
+ val replicaName = if (isFuture) "future replica" else "follower"
+ info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${replica.logStartOffset}." +
+ s" Since this is the first record to be appended to the $replicaName's log, will start the log from offset ${e.firstOffset}.")
+ truncateFullyAndStartAt(e.firstOffset, isFuture)
+ doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+ } else
+ throw e
}
}
diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
new file mode 100644
index 0000000..f8daaa4
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower received records with non-monotonically increasing offsets
+ */
+class OffsetsOutOfOrderException(message: String) extends RuntimeException(message) {
+}
+
diff --git a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
new file mode 100644
index 0000000..e719a93
--- /dev/null
+++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower or the future replica received records from the leader (or current
+ * replica) with first offset less than expected next offset.
+ * @param firstOffset The first offset of the records to append
+ * @param lastOffset The last offset of the records to append
+ */
+class UnexpectedAppendOffsetException(val message: String,
+ val firstOffset: Long,
+ val lastOffset: Long) extends RuntimeException(message) {
+}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c7d2a6e..c92beee 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -29,7 +29,7 @@ import java.util.regex.Pattern
import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_10_0_IV0
-import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef}
+import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef, UnexpectedAppendOffsetException, OffsetsOutOfOrderException}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
@@ -49,11 +49,11 @@ import scala.collection.{Seq, Set, mutable}
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
- RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L)
def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
- RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L)
}
/**
@@ -72,6 +72,7 @@ object LogAppendInfo {
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
+ * @param lastOffsetOfFirstBatch The last offset of the first batch
*/
case class LogAppendInfo(var firstOffset: Option[Long],
var lastOffset: Long,
@@ -84,12 +85,15 @@ case class LogAppendInfo(var firstOffset: Option[Long],
targetCodec: CompressionCodec,
shallowCount: Int,
validBytes: Int,
- offsetsMonotonic: Boolean) {
+ offsetsMonotonic: Boolean,
+ lastOffsetOfFirstBatch: Long) {
/**
- * Get the first offset if it exists, else get the last offset.
- * @return The offset of first message if it exists; else offset of the last message.
+ * Get the first offset if it exists, else get the last offset of the first batch
+ * For magic versions 2 and newer, this method will return first offset. For magic versions
+ * older than 2, we use the last offset of the first batch as an approximation of the first
+ * offset to avoid decompressing the data.
*/
- def firstOrLastOffset: Long = firstOffset.getOrElse(lastOffset)
+ def firstOrLastOffsetOfFirstBatch: Long = firstOffset.getOrElse(lastOffsetOfFirstBatch)
/**
* Get the (maximum) number of messages described by LogAppendInfo
@@ -736,6 +740,8 @@ class Log(@volatile var dir: File,
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
* @throws KafkaStorageException If the append fails due to an I/O error.
+ * @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
+ * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
* @return Information about the appended messages including the first and last offset.
*/
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
@@ -798,9 +804,27 @@ class Log(@volatile var dir: File,
}
} else {
// we are taking the offsets we are given
- if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < nextOffsetMetadata.messageOffset)
- throw new IllegalArgumentException(s"Out of order offsets found in append to $topicPartition: " +
- records.records.asScala.map(_.offset))
+ if (!appendInfo.offsetsMonotonic)
+ throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
+ records.records.asScala.map(_.offset))
+
+ if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
+ // we may still be able to recover if the log is empty
+ // one example: fetching from log start offset on the leader which is not batch aligned,
+ // which may happen as a result of AdminClient#deleteRecords()
+ val firstOffset = appendInfo.firstOffset match {
+ case Some(offset) => offset
+ case None => records.batches.asScala.head.baseOffset()
+ }
+
+ val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
+ throw new UnexpectedAppendOffsetException(
+ s"Unexpected offset in append to $topicPartition. $firstOrLast " +
+ s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
+ s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
+ s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
+ firstOffset, appendInfo.lastOffset)
+ }
}
// update the epoch cache with the epoch stamped onto the message by the leader
@@ -830,7 +854,7 @@ class Log(@volatile var dir: File,
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = LogOffsetMetadata(
- messageOffset = appendInfo.firstOrLastOffset,
+ messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
segmentBaseOffset = segment.baseOffset,
relativePositionInSegment = segment.size)
@@ -970,6 +994,7 @@ class Log(@volatile var dir: File,
var maxTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
var readFirstMessage = false
+ var lastOffsetOfFirstBatch = -1L
for (batch <- records.batches.asScala) {
// we only validate V2 and higher to avoid potential compatibility issues with older clients
@@ -986,6 +1011,7 @@ class Log(@volatile var dir: File,
if (!readFirstMessage) {
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
firstOffset = Some(batch.baseOffset)
+ lastOffsetOfFirstBatch = batch.lastOffset
readFirstMessage = true
}
@@ -1024,7 +1050,7 @@ class Log(@volatile var dir: File,
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
- RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
}
private def updateProducers(batch: RecordBatch,
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 5a505c3..e46473b 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -98,8 +98,7 @@ class ReplicaAlterLogDirsThread(name: String,
throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
- // Append the leader's messages to the log
- partition.appendRecordsToFutureReplica(records)
+ partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
val futureReplicaHighWatermark = futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark)
futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index cf8d829..80940f6 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String,
.format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
// Append the leader's messages to the log
- partition.appendRecordsToFollower(records)
+ partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
if (isTraceEnabled)
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 50ed7ae..d6f349c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -813,6 +813,83 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
}
@Test
+ def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
+ val leaders = createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+ val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
+
+ def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
+ TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.getReplica(topicPartition) != None,
+ "Expected follower to create replica for partition")
+
+ // wait until the follower discovers that log start offset moved beyond its HW
+ TestUtils.waitUntilTrue(() => {
+ servers(followerIndex).replicaManager.getReplica(topicPartition).get.logStartOffset == expectedStartOffset
+ }, s"Expected follower to discover new log start offset $expectedStartOffset")
+
+ TestUtils.waitUntilTrue(() => {
+ servers(followerIndex).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset == expectedEndOffset
+ }, s"Expected follower to catch up to log end offset $expectedEndOffset")
+ }
+
+ // we will produce to topic and delete records while one follower is down
+ killBroker(followerIndex)
+
+ client = AdminClient.create(createConfig)
+ sendRecords(producers.head, 100, topicPartition)
+
+ val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
+ result.all().get()
+
+ // start the stopped broker to verify that it will be able to fetch from new log start offset
+ restartDeadBrokers()
+
+ waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L)
+
+ // after the new replica caught up, all replicas should have same log start offset
+ for (i <- 0 until serverCount)
+ assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+
+ // kill the same follower again, produce more records, and delete records beyond follower's LOE
+ killBroker(followerIndex)
+ sendRecords(producers.head, 100, topicPartition)
+ val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava)
+ result1.all().get()
+ restartDeadBrokers()
+ waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
+ }
+
+ @Test
+ def testAlterLogDirsAfterDeleteRecords(): Unit = {
+ client = AdminClient.create(createConfig)
+ createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+ val expectedLEO = 100
+ sendRecords(producers.head, expectedLEO, topicPartition)
+
+ // delete records to move log start offset
+ val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
+ result.all().get()
+ // make sure we are in the expected state after delete records
+ for (i <- 0 until serverCount) {
+ assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+ assertEquals(expectedLEO, servers(i).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset)
+ }
+
+ // we will create another dir just for one server
+ val futureLogDir = servers(0).config.logDirs(1)
+ val futureReplica = new TopicPartitionReplica(topic, 0, servers(0).config.brokerId)
+
+ // Verify that replica can be moved to the specified log directory
+ client.alterReplicaLogDirs(Map(futureReplica -> futureLogDir).asJava).all.get
+ TestUtils.waitUntilTrue(() => {
+ futureLogDir == servers(0).logManager.getLog(topicPartition).get.dir.getParent
+ }, "timed out waiting for replica movement")
+
+ // once replica moved, its LSO and LEO should match other replicas
+ assertEquals(3, servers(0).replicaManager.getReplica(topicPartition).get.logStartOffset)
+ assertEquals(expectedLEO, servers(0).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset)
+ }
+
+ @Test
def testOffsetsForTimesAfterDeleteRecords(): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
new file mode 100644
index 0000000..fe5d578
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.UnexpectedAppendOffsetException
+import kafka.log.{Log, LogConfig, LogManager, CleanerConfig}
+import kafka.server._
+import kafka.utils.{MockTime, TestUtils, MockScheduler}
+import kafka.utils.timer.MockTimer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record._
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.scalatest.Assertions.assertThrows
+import scala.collection.JavaConverters._
+
+class PartitionTest {
+
+ val brokerId = 101
+ val topicPartition = new TopicPartition("test-topic", 0)
+ val time = new MockTime()
+ val brokerTopicStats = new BrokerTopicStats
+ val metrics = new Metrics
+
+ var tmpDir: File = _
+ var logDir: File = _
+ var replicaManager: ReplicaManager = _
+ var logManager: LogManager = _
+ var logConfig: LogConfig = _
+
+ @Before
+ def setup(): Unit = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
+ logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+ logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+ logConfig = LogConfig(logProps)
+
+ tmpDir = TestUtils.tempDir()
+ logDir = TestUtils.randomPartitionLogDir(tmpDir)
+ logManager = TestUtils.createLogManager(
+ logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
+ logManager.startup()
+
+ val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
+ brokerProps.put("log.dir", logDir.getAbsolutePath)
+ val brokerConfig = KafkaConfig.fromProps(brokerProps)
+ replicaManager = new ReplicaManager(
+ config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time),
+ logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
+ brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size))
+ }
+
+ @After
+ def tearDown(): Unit = {
+ brokerTopicStats.close()
+ metrics.close()
+
+ logManager.shutdown()
+ Utils.delete(tmpDir)
+ logManager.liveLogDirs.foreach(Utils.delete)
+ replicaManager.shutdown(checkpointHW = false)
+ }
+
+ @Test
+ def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
+ val log = logManager.getOrCreateLog(topicPartition, logConfig)
+ val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+ val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+ partition.addReplicaIfNotExists(replica)
+ assertEquals(Some(replica), partition.getReplica(replica.brokerId))
+
+ val initialLogStartOffset = 5L
+ partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false)
+ assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:",
+ initialLogStartOffset, replica.logEndOffset.messageOffset)
+ assertEquals(s"Log start offset after truncate fully and start at $initialLogStartOffset:",
+ initialLogStartOffset, replica.logStartOffset)
+
+ // verify that we cannot append records that do not contain log start offset even if the log is empty
+ assertThrows[UnexpectedAppendOffsetException] {
+ // append one record with offset = 3
+ partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false)
+ }
+ assertEquals(s"Log end offset should not change after failure to append", initialLogStartOffset, replica.logEndOffset.messageOffset)
+
+ // verify that we can append records that contain log start offset, even when first
+ // offset < log start offset if the log is empty
+ val newLogStartOffset = 4L
+ val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes),
+ new SimpleRecord("k3".getBytes, "v3".getBytes)),
+ baseOffset = newLogStartOffset)
+ partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
+ assertEquals(s"Log end offset after append of 3 records with base offset $newLogStartOffset:", 7L, replica.logEndOffset.messageOffset)
+ assertEquals(s"Log start offset after append of 3 records with base offset $newLogStartOffset:", newLogStartOffset, replica.logStartOffset)
+
+ // and we can append more records after that
+ partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false)
+ assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, replica.logEndOffset.messageOffset)
+ assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset)
+
+ // but we cannot append to offset < log start if the log is not empty
+ assertThrows[UnexpectedAppendOffsetException] {
+ val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)),
+ baseOffset = 3L)
+ partition.appendRecordsToFollowerOrFutureReplica(records2, isFuture = false)
+ }
+ assertEquals(s"Log end offset should not change after failure to append", 8L, replica.logEndOffset.messageOffset)
+
+ // we still can append to next offset
+ partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false)
+ assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, replica.logEndOffset.messageOffset)
+ assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset)
+ }
+
+ @Test
+ def testGetReplica(): Unit = {
+ val log = logManager.getOrCreateLog(topicPartition, logConfig)
+ val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+ val partition = new
+ Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+
+ assertEquals(None, partition.getReplica(brokerId))
+ assertThrows[ReplicaNotAvailableException] {
+ partition.getReplicaOrException(brokerId)
+ }
+
+ partition.addReplicaIfNotExists(replica)
+ assertEquals(replica, partition.getReplicaOrException(brokerId))
+ }
+
+ @Test
+ def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
+ val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+ assertThrows[ReplicaNotAvailableException] {
+ partition.appendRecordsToFollowerOrFutureReplica(
+ createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false)
+ }
+ }
+
+ def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
+ val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+ val builder = MemoryRecords.builder(
+ buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.LOG_APPEND_TIME,
+ baseOffset, time.milliseconds, partitionLeaderEpoch)
+ records.foreach(builder.append)
+ builder.build()
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1171e5e..6c62e5e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,7 +22,8 @@ import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
import java.util.Properties
-import kafka.common.KafkaException
+import org.apache.kafka.common.errors._
+import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException, KafkaException}
import kafka.log.Log.DeleteDirSuffix
import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
@@ -42,6 +43,7 @@ import org.junit.{After, Before, Test}
import scala.collection.Iterable
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import org.scalatest.Assertions.{assertThrows, intercept, withClue}
class LogTest {
var config: KafkaConfig = null
@@ -1885,13 +1887,72 @@ class LogTest {
assertTrue("Message payload should be null.", !head.hasValue)
}
- @Test(expected = classOf[IllegalArgumentException])
+ @Test
def testAppendWithOutOfOrderOffsetsThrowsException() {
val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+
+ val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
+ val buffer = ByteBuffer.allocate(512)
+ for (offset <- appendOffsets) {
+ val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
+ TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(),
+ 1L, 0, 0, false, 0)
+ builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+ builder.close()
+ }
+ buffer.flip()
+ val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+ assertThrows[OffsetsOutOfOrderException] {
+ log.appendAsFollower(memoryRecords)
+ }
+ }
+
+ @Test
+ def testAppendBelowExpectedOffsetThrowsException() {
+ val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0))
- val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
- log.appendAsFollower(invalidRecord)
+
+ val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+ val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+ for (magic <- magicVals; compression <- compressionTypes) {
+ val invalidRecord = MemoryRecords.withRecords(magic, compression, new SimpleRecord(1.toString.getBytes))
+ withClue(s"Magic=$magic, compressionType=$compression") {
+ assertThrows[UnexpectedAppendOffsetException] {
+ log.appendAsFollower(invalidRecord)
+ }
+ }
+ }
+ }
+
+ @Test
+ def testAppendEmptyLogBelowLogStartOffsetThrowsException() {
+ createEmptyLogs(logDir, 7)
+ val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ assertEquals(7L, log.logStartOffset)
+ assertEquals(7L, log.logEndOffset)
+
+ val firstOffset = 4L
+ val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+ val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+ for (magic <- magicVals; compression <- compressionTypes) {
+ val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes),
+ new SimpleRecord("k3".getBytes, "v3".getBytes)),
+ magicValue = magic, codec = compression,
+ baseOffset = firstOffset)
+
+ withClue(s"Magic=$magic, compressionType=$compression") {
+ val exception = intercept[UnexpectedAppendOffsetException] {
+ log.appendAsFollower(records = batch)
+ }
+ assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#firstOffset",
+ firstOffset, exception.firstOffset)
+ assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#lastOffset",
+ firstOffset + 2, exception.lastOffset)
+ }
+ }
}
@Test
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.