You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/23 20:20:53 UTC
[kafka] branch trunk updated: KAFKA-8371: Remove dependence on
ReplicaManager from Partition (#6705)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3696b98 KAFKA-8371: Remove dependence on ReplicaManager from Partition (#6705)
3696b98 is described below
commit 3696b9882d0267871a31fc3df03da568b59433b7
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu May 23 13:20:39 2019 -0700
KAFKA-8371: Remove dependence on ReplicaManager from Partition (#6705)
This patch attempts to simplify the interaction between Partition and the various components from `ReplicaManager`. This is primarily to make unit testing easier. I have also tried to eliminate the OfflinePartition sentinel which has always been unsafe.
Reviewers: Boyang Chen <bc...@outlook.com>, David Arthur <mu...@gmail.com>
---
core/src/main/scala/kafka/cluster/Partition.scala | 341 ++++++++++++-------
.../src/main/scala/kafka/server/AdminManager.scala | 2 +-
.../scala/kafka/server/DelayedDeleteRecords.scala | 25 +-
.../main/scala/kafka/server/DelayedOperation.scala | 15 +-
.../main/scala/kafka/server/DelayedProduce.scala | 15 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 4 +-
.../kafka/server/ReplicaAlterLogDirsThread.scala | 4 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 11 +-
.../main/scala/kafka/server/ReplicaManager.scala | 376 +++++++++++----------
.../server/checkpoints/OffsetCheckpointFile.scala | 15 +
.../main/scala/kafka/utils/ReplicationUtils.scala | 4 +-
.../admin/ReassignPartitionsClusterTest.scala | 2 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 308 ++++++++---------
.../coordinator/group/GroupCoordinatorTest.scala | 14 +-
.../group/GroupMetadataManagerTest.scala | 2 +-
.../kafka/server/BrokerEpochIntegrationTest.scala | 5 +-
.../unit/kafka/server/DelayedOperationTest.scala | 8 +-
.../server/HighwatermarkPersistenceTest.scala | 6 +-
.../unit/kafka/server/ISRExpirationTest.scala | 2 +-
.../unit/kafka/server/LogDirFailureTest.scala | 7 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 3 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 4 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 23 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 2 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 41 +--
.../scala/unit/kafka/server/SimpleFetchTest.scala | 2 +-
...chDrivenReplicationProtocolAcceptanceTest.scala | 2 +-
.../server/epoch/OffsetsForLeaderEpochTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 12 +-
29 files changed, 696 insertions(+), 563 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 30ce756..256f1a0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,7 +16,7 @@
*/
package kafka.cluster
-import java.util.Optional
+import java.util.{Optional, Properties}
import java.util.concurrent.locks.ReentrantReadWriteLock
import com.yammer.metrics.core.Gauge
@@ -26,6 +26,7 @@ import kafka.controller.KafkaController
import kafka.log._
import kafka.metrics.KafkaMetricsGroup
import kafka.server._
+import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -42,19 +43,114 @@ import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.collection.Map
-object Partition {
+trait PartitionStateStore {
+ def fetchTopicConfig(): Properties
+ def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
+ def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
+}
+
+class ZkPartitionStateStore(topicPartition: TopicPartition,
+ zkClient: KafkaZkClient,
+ replicaManager: ReplicaManager) extends PartitionStateStore {
+
+ override def fetchTopicConfig(): Properties = {
+ val adminZkClient = new AdminZkClient(zkClient)
+ adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
+ }
+
+ override def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
+ val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
+ if (newVersionOpt.isDefined)
+ replicaManager.isrShrinkRate.mark()
+ newVersionOpt
+ }
+
+ override def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
+ val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
+ if (newVersionOpt.isDefined)
+ replicaManager.isrExpandRate.mark()
+ newVersionOpt
+ }
+
+ private def updateIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
+ val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition,
+ leaderAndIsr, controllerEpoch)
+
+ if (updateSucceeded) {
+ replicaManager.recordIsrChange(topicPartition)
+ Some(newVersion)
+ } else {
+ replicaManager.failedIsrUpdatesRate.mark()
+ None
+ }
+ }
+}
+
+class DelayedOperations(topicPartition: TopicPartition,
+ produce: DelayedOperationPurgatory[DelayedProduce],
+ fetch: DelayedOperationPurgatory[DelayedFetch],
+ deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords]) {
+
+ def checkAndCompleteAll(): Unit = {
+ val requestKey = new TopicPartitionOperationKey(topicPartition)
+ fetch.checkAndComplete(requestKey)
+ produce.checkAndComplete(requestKey)
+ deleteRecords.checkAndComplete(requestKey)
+ }
+
+ def checkAndCompleteFetch(): Unit = {
+ fetch.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+ }
+
+ def checkAndCompleteProduce(): Unit = {
+ produce.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+ }
+
+ def checkAndCompleteDeleteRecords(): Unit = {
+ deleteRecords.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+ }
+
+ def numDelayedDelete: Int = deleteRecords.numDelayed
+
+ def numDelayedFetch: Int = fetch.numDelayed
+
+ def numDelayedProduce: Int = produce.numDelayed
+}
+
+object Partition extends KafkaMetricsGroup {
def apply(topicPartition: TopicPartition,
time: Time,
replicaManager: ReplicaManager): Partition = {
+ val zkIsrBackingStore = new ZkPartitionStateStore(
+ topicPartition,
+ replicaManager.zkClient,
+ replicaManager)
+
+ val delayedOperations = new DelayedOperations(
+ topicPartition,
+ replicaManager.delayedProducePurgatory,
+ replicaManager.delayedFetchPurgatory,
+ replicaManager.delayedDeleteRecordsPurgatory)
+
new Partition(topicPartition,
- isOffline = false,
replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
localBrokerId = replicaManager.config.brokerId,
time = time,
- replicaManager = replicaManager,
- logManager = replicaManager.logManager,
- zkClient = replicaManager.zkClient)
+ stateStore = zkIsrBackingStore,
+ delayedOperations = delayedOperations,
+ metadataCache = replicaManager.metadataCache,
+ logManager = replicaManager.logManager)
+ }
+
+ def removeMetrics(topicPartition: TopicPartition): Unit = {
+ val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
+ removeMetric("UnderReplicated", tags)
+ removeMetric("UnderMinIsr", tags)
+ removeMetric("InSyncReplicasCount", tags)
+ removeMetric("ReplicasCount", tags)
+ removeMetric("LastStableOffsetLag", tags)
+ removeMetric("AtMinIsr", tags)
}
}
@@ -62,14 +158,14 @@ object Partition {
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
*/
class Partition(val topicPartition: TopicPartition,
- val isOffline: Boolean,
private val replicaLagTimeMaxMs: Long,
private val interBrokerProtocolVersion: ApiVersion,
private val localBrokerId: Int,
private val time: Time,
- private val replicaManager: ReplicaManager,
- private val logManager: LogManager,
- private val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
+ private val stateStore: PartitionStateStore,
+ private val delayedOperations: DelayedOperations,
+ private val metadataCache: MetadataCache,
+ private val logManager: LogManager) extends HostedPartition with Logging with KafkaMetricsGroup {
def topic: String = topicPartition.topic
def partitionId: Int = topicPartition.partition
@@ -94,68 +190,65 @@ class Partition(val topicPartition: TopicPartition,
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
- private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId
+ private def isReplicaLocal(replicaId: Int): Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId
private val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
- // Do not create metrics if this partition is ReplicaManager.OfflinePartition
- if (!isOffline) {
- newGauge("UnderReplicated",
- new Gauge[Int] {
- def value = {
- if (isUnderReplicated) 1 else 0
- }
- },
- tags
- )
-
- newGauge("InSyncReplicasCount",
- new Gauge[Int] {
- def value = {
- if (isLeaderReplicaLocal) inSyncReplicas.size else 0
- }
- },
- tags
- )
-
- newGauge("UnderMinIsr",
- new Gauge[Int] {
- def value = {
- if (isUnderMinIsr) 1 else 0
- }
- },
- tags
- )
-
- newGauge("AtMinIsr",
- new Gauge[Int] {
- def value = {
- if (isAtMinIsr) 1 else 0
- }
- },
- tags
- )
-
- newGauge("ReplicasCount",
- new Gauge[Int] {
- def value = {
- if (isLeaderReplicaLocal) assignedReplicas.size else 0
- }
- },
- tags
- )
-
- newGauge("LastStableOffsetLag",
- new Gauge[Long] {
- def value = {
- leaderReplicaIfLocal.map { replica =>
- replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset
- }.getOrElse(0)
- }
- },
- tags
- )
- }
+ newGauge("UnderReplicated",
+ new Gauge[Int] {
+ def value = {
+ if (isUnderReplicated) 1 else 0
+ }
+ },
+ tags
+ )
+
+ newGauge("InSyncReplicasCount",
+ new Gauge[Int] {
+ def value = {
+ if (isLeaderReplicaLocal) inSyncReplicas.size else 0
+ }
+ },
+ tags
+ )
+
+ newGauge("UnderMinIsr",
+ new Gauge[Int] {
+ def value = {
+ if (isUnderMinIsr) 1 else 0
+ }
+ },
+ tags
+ )
+
+ newGauge("AtMinIsr",
+ new Gauge[Int] {
+ def value = {
+ if (isAtMinIsr) 1 else 0
+ }
+ },
+ tags
+ )
+
+ newGauge("ReplicasCount",
+ new Gauge[Int] {
+ def value = {
+ if (isLeaderReplicaLocal) assignedReplicas.size else 0
+ }
+ },
+ tags
+ )
+
+ newGauge("LastStableOffsetLag",
+ new Gauge[Long] {
+ def value = {
+ leaderReplicaIfLocal.map { replica =>
+ replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset
+ }.getOrElse(0)
+ }
+ },
+ tags
+ )
private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined
@@ -185,15 +278,17 @@ class Partition(val topicPartition: TopicPartition,
* does not exist. This method assumes that the current replica has already been created.
*
* @param logDir log directory
+ * @param highWatermarkCheckpoints Checkpoint to load initial high watermark from
* @return true iff the future replica is created
*/
- def maybeCreateFutureReplica(logDir: String): Boolean = {
+ def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
// The writeLock is needed to make sure that while the caller checks the log directory of the
// current replica and the existence of the future replica, no other thread can update the log directory of the
// current replica or remove the future replica.
inWriteLock(leaderIsrUpdateLock) {
val currentReplica = localReplicaOrException
- if (currentReplica.log.get.dir.getParent == logDir)
+ val currentLog = currentReplica.log.get
+ if (currentLog.dir.getParent == logDir)
false
else {
futureLocalReplica match {
@@ -204,26 +299,25 @@ class Partition(val topicPartition: TopicPartition,
s"different from the requested log dir $logDir")
false
case None =>
- getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false)
+ getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false, highWatermarkCheckpoints)
true
}
}
}
}
- def getOrCreateReplica(replicaId: Int, isNew: Boolean = false): Replica = {
+ def getOrCreateReplica(replicaId: Int, isNew: Boolean, offsetCheckpoints: OffsetCheckpoints): Replica = {
allReplicasMap.getAndMaybePut(replicaId, {
if (isReplicaLocal(replicaId)) {
- val adminZkClient = new AdminZkClient(zkClient)
- val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+ val props = stateStore.fetchTopicConfig()
val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId)
- val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
- val offsetMap = checkpoint.read()
- if (!offsetMap.contains(topicPartition))
+ val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
- val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
- new Replica(replicaId, topicPartition, time, offset, Some(log))
+ 0L
+ }
+ val initialHighWatermark = math.min(checkpointHighWatermark, log.logEndOffset)
+ new Replica(replicaId, topicPartition, time, initialHighWatermark, Some(log))
} else new Replica(replicaId, topicPartition, time)
})
}
@@ -300,6 +394,7 @@ class Partition(val topicPartition: TopicPartition,
}
}
+ // Visible for testing
def addReplicaIfNotExists(replica: Replica): Replica =
allReplicasMap.putIfNotExists(replica.brokerId, replica)
@@ -370,7 +465,7 @@ class Partition(val topicPartition: TopicPartition,
inSyncReplicas = Set.empty[Replica]
leaderReplicaIdOpt = None
leaderEpochStartOffsetOpt = None
- removePartitionMetrics()
+ Partition.removeMetrics(topicPartition)
logManager.asyncDelete(topicPartition)
if (logManager.getLog(topicPartition, isFuture = true).isDefined)
logManager.asyncDelete(topicPartition, isFuture = true)
@@ -384,18 +479,23 @@ class Partition(val topicPartition: TopicPartition,
* from the time when this broker was the leader last time) and setting the new leader and ISR.
* If the leader replica id does not change, return false to indicate the replica manager.
*/
- def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
+ def makeLeader(controllerId: Int,
+ partitionStateInfo: LeaderAndIsrRequest.PartitionState,
+ correlationId: Int,
+ highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
// add replicas that are new
- val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
+ val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map {
+ id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints)
+ }.toSet
// remove assigned replicas that have been removed by the controller
(assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas
- newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))
+ newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints))
val leaderReplica = localReplicaOrException
val leaderEpochStartOffset = leaderReplica.logEndOffset
@@ -448,7 +548,10 @@ class Partition(val topicPartition: TopicPartition,
* greater (that is, no updates have been missed), return false to indicate to the
* replica manager that state is already correct and the become-follower steps can be skipped
*/
- def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
+ def makeFollower(controllerId: Int,
+ partitionStateInfo: LeaderAndIsrRequest.PartitionState,
+ correlationId: Int,
+ highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
@@ -457,7 +560,7 @@ class Partition(val topicPartition: TopicPartition,
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
// add replicas that are new
- newAssignedReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
+ newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints))
// remove assigned replicas that have been removed by the controller
(assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
inSyncReplicas = Set.empty[Replica]
@@ -483,9 +586,9 @@ class Partition(val topicPartition: TopicPartition,
def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = {
val replicaId = replica.brokerId
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
- val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
+ val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
replica.updateLogReadResult(logReadResult)
- val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
+ val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
// since the replica's logStartOffset may have incremented
val leaderLWIncremented = newLeaderLW > oldLeaderLW
@@ -533,9 +636,9 @@ class Partition(val topicPartition: TopicPartition,
val newInSyncReplicas = inSyncReplicas + replica
info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +
s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
+
// update ISR in ZK and cache
- updateIsr(newInSyncReplicas)
- replicaManager.isrExpandRate.mark()
+ expandIsr(newInSyncReplicas)
}
// check if the HW of the partition can now be incremented
// since the replica may already be in the ISR and its LEO has just incremented
@@ -634,7 +737,7 @@ class Partition(val topicPartition: TopicPartition,
if (!isLeaderReplicaLocal)
throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
val logStartOffsets = allReplicas.collect {
- case replica if replicaManager.metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId == Request.FutureLocalReplicaId => replica.logStartOffset
+ case replica if metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId == Request.FutureLocalReplicaId => replica.logStartOffset
}
CoreUtils.min(logStartOffsets, 0L)
}
@@ -642,12 +745,7 @@ class Partition(val topicPartition: TopicPartition,
/**
* Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
*/
- private def tryCompleteDelayedRequests() {
- val requestKey = new TopicPartitionOperationKey(topicPartition)
- replicaManager.tryCompleteDelayedFetch(requestKey)
- replicaManager.tryCompleteDelayedProduce(requestKey)
- replicaManager.tryCompleteDelayedDeleteRecords(requestKey)
- }
+ private def tryCompleteDelayedRequests(): Unit = delayedOperations.checkAndCompleteAll()
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
@@ -669,8 +767,7 @@ class Partition(val topicPartition: TopicPartition,
)
// update ISR in zk and in cache
- updateIsr(newInSyncReplicas)
- replicaManager.isrShrinkRate.mark()
+ shrinkIsr(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
@@ -785,7 +882,7 @@ class Partition(val topicPartition: TopicPartition,
tryCompleteDelayedRequests()
else {
// probably unblock some follower fetch requests since log end offset has been updated
- replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition))
+ delayedOperations.checkAndCompleteFetch()
}
info
@@ -1015,41 +1112,37 @@ class Partition(val topicPartition: TopicPartition,
}
}
- private def updateIsr(newIsr: Set[Replica]) {
+ private def expandIsr(newIsr: Set[Replica]): Unit = {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
- val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition, newLeaderAndIsr,
- controllerEpoch)
+ val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
+ maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+ }
- if (updateSucceeded) {
- replicaManager.recordIsrChange(topicPartition)
- inSyncReplicas = newIsr
- zkVersion = newVersion
- trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
- } else {
- replicaManager.failedIsrUpdatesRate.mark()
- info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
- }
+ private def shrinkIsr(newIsr: Set[Replica]): Unit = {
+ val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
+ val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
+ maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
}
- /**
- * remove deleted log metrics
- */
- def removePartitionMetrics() {
- removeMetric("UnderReplicated", tags)
- removeMetric("UnderMinIsr", tags)
- removeMetric("InSyncReplicasCount", tags)
- removeMetric("ReplicasCount", tags)
- removeMetric("LastStableOffsetLag", tags)
- removeMetric("AtMinIsr", tags)
+ private def maybeUpdateIsrAndVersion(isr: Set[Replica], zkVersionOpt: Option[Int]): Unit = {
+ zkVersionOpt match {
+ case Some(newVersion) =>
+ inSyncReplicas = isr
+ zkVersion = newVersion
+ info("ISR updated to [%s] and zkVersion updated to [%d]".format(isr.mkString(","), zkVersion))
+
+ case None =>
+ info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, skip updating ISR")
+ }
}
override def equals(that: Any): Boolean = that match {
- case other: Partition => partitionId == other.partitionId && topic == other.topic && isOffline == other.isOffline
+ case other: Partition => partitionId == other.partitionId && topic == other.topic
case _ => false
}
override def hashCode: Int =
- 31 + topic.hashCode + 17 * partitionId + (if (isOffline) 1 else 0)
+ 31 + topic.hashCode + 17 * partitionId
override def toString(): String = {
val partitionString = new StringBuilder
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index d424700..85d272c 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -60,7 +60,7 @@ class AdminManager(val config: KafkaConfig,
private val alterConfigPolicy =
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
- def hasDelayedTopicOperations = topicPurgatory.delayed != 0
+ def hasDelayedTopicOperations = topicPurgatory.numDelayed != 0
/**
* Try to complete delayed topic operations with the request key
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index a977d9a..dac9f79 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,6 +20,7 @@ package kafka.server
import java.util.concurrent.TimeUnit
+import kafka.cluster.Partition
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.TopicPartition
@@ -73,19 +74,19 @@ class DelayedDeleteRecords(delayMs: Long,
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
- case Some(partition) =>
- if (partition eq ReplicaManager.OfflinePartition) {
- (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
- } else {
- partition.leaderReplicaIfLocal match {
- case Some(_) =>
- val leaderLW = partition.lowWatermarkIfLeader
- (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
- case None =>
- (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
- }
+ case partition: Partition =>
+ partition.leaderReplicaIfLocal match {
+ case Some(_) =>
+ val leaderLW = partition.lowWatermarkIfLeader
+ (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
+ case None =>
+ (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
- case None =>
+
+ case HostedPartition.Offline =>
+ (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+
+ case HostedPartition.None =>
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
if (error != Errors.NONE || lowWatermarkReached) {
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index eb20e6d..33187bb 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -44,7 +44,8 @@ import scala.collection.mutable.ListBuffer
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
*/
abstract class DelayedOperation(override val delayMs: Long,
- lockOpt: Option[Lock] = None) extends TimerTask with Logging {
+ lockOpt: Option[Lock] = None)
+ extends TimerTask with Logging {
private val completed = new AtomicBoolean(false)
private val tryCompletePending = new AtomicBoolean(false)
@@ -209,7 +210,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
newGauge(
"NumDelayedOperations",
new Gauge[Int] {
- def value: Int = delayed
+ def value: Int = numDelayed
},
metricsTags
)
@@ -288,10 +289,12 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
def checkAndComplete(key: Any): Int = {
val wl = watcherList(key)
val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }
- if(watchers == null)
+ val numCompleted = if (watchers == null)
0
else
watchers.tryCompleteWatched()
+ debug(s"Request key $key unblocked $numCompleted $purgatoryName operations")
+ numCompleted
}
/**
@@ -306,7 +309,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
/**
* Return the number of delayed operations in the expiry queue
*/
- def delayed: Int = timeoutTimer.size
+ def numDelayed: Int = timeoutTimer.size
/**
* Cancel watching on any delayed operations for the given key. Note the operation will not be completed
@@ -435,11 +438,11 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
// Trigger a purge if the number of completed but still being watched operations is larger than
// the purge threshold. That number is computed by the difference btw the estimated total number of
// operations and the number of pending delayed operations.
- if (estimatedTotalOperations.get - delayed > purgeInterval) {
+ if (estimatedTotalOperations.get - numDelayed > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
// clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
// a little overestimated total number of operations.
- estimatedTotalOperations.getAndSet(delayed)
+ estimatedTotalOperations.getAndSet(numDelayed)
debug("Begin purging watch lists")
val purged = watcherLists.foldLeft(0) {
case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index dbecba4..1570d4b 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Meter
+import kafka.cluster.Partition
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Pool
-
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -88,12 +88,13 @@ class DelayedProduce(delayMs: Long,
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
- case Some(partition) =>
- if (partition eq ReplicaManager.OfflinePartition)
- (false, Errors.KAFKA_STORAGE_ERROR)
- else
- partition.checkEnoughReplicasReachOffset(status.requiredOffset)
- case None =>
+ case partition: Partition =>
+ partition.checkEnoughReplicasReachOffset(status.requiredOffset)
+
+ case HostedPartition.Offline =>
+ (false, Errors.KAFKA_STORAGE_ERROR)
+
+ case HostedPartition.None =>
// Case A
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d362d64..9cedb01 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -263,8 +263,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
if (replicaManager.hasDelayedElectionOperations) {
- updateMetadataRequest.partitionStates.asScala.foreach { case (tp, ps) =>
- replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp.topic(), tp.partition()))
+ updateMetadataRequest.partitionStates.asScala.foreach { case (tp, _) =>
+ replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp))
}
}
sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE))
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 4312a92..0622b30 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThread(name: String,
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: PartitionData[Records]): Option[LogAppendInfo] = {
- val futureReplica = replicaMgr.futureLocalReplicaOrException(topicPartition)
- val partition = replicaMgr.getPartition(topicPartition).get
+ val partition = replicaMgr.nonOfflinePartition(topicPartition).get
+ val futureReplica = partition.futureLocalReplicaOrException
val records = toMemoryRecords(partitionData.records)
if (fetchOffset != futureReplica.logEndOffset)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ab5be6e..b1b5dd0 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -143,8 +143,8 @@ class ReplicaFetcherThread(name: String,
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: FetchData): Option[LogAppendInfo] = {
- val replica = replicaMgr.localReplicaOrException(topicPartition)
- val partition = replicaMgr.getPartition(topicPartition).get
+ val partition = replicaMgr.nonOfflinePartition(topicPartition).get
+ val replica = partition.localReplicaOrException
val records = toMemoryRecords(partitionData.records)
maybeWarnIfOversizedRecords(records, topicPartition)
@@ -277,8 +277,9 @@ class ReplicaFetcherThread(name: String,
* The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState
*/
override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
- val replica = replicaMgr.localReplicaOrException(tp)
- val partition = replicaMgr.getPartition(tp).get
+ val partition = replicaMgr.nonOfflinePartition(tp).get
+ val replica = partition.localReplicaOrException
+
partition.truncateTo(offsetTruncationState.offset, isFuture = false)
if (offsetTruncationState.offset < replica.highWatermark.messageOffset)
@@ -292,7 +293,7 @@ class ReplicaFetcherThread(name: String,
}
override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
- val partition = replicaMgr.getPartition(topicPartition).get
+ val partition = replicaMgr.nonOfflinePartition(topicPartition).get
partition.truncateFullyAndStartAt(offset, isFuture = false)
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2023a97..54d35ef 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.Lock
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Meter}
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition, Replica}
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.server.checkpoints.OffsetCheckpointFile
+import kafka.server.checkpoints.{OffsetCheckpointFile, OffsetCheckpoints, SimpleOffsetCheckpoints}
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._
@@ -118,19 +118,30 @@ object LogReadResult {
lastStableOffset = None)
}
+/**
+ * Trait to represent the state of hosted partitions. We create a concrete (active) Partition
+ * instance when the broker receives a LeaderAndIsr request from the controller indicating
+ * that it should be either a leader or follower of a partition.
+ */
+trait HostedPartition
+
+object HostedPartition {
+ /**
+ * This broker does not have any state for this partition locally.
+ */
+ object None extends HostedPartition
+
+ /**
+ * This broker hosts the partition, but it is in an offline log directory.
+ */
+ object Offline extends HostedPartition
+}
+
+
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
val IsrChangePropagationBlackOut = 5000L
val IsrChangePropagationInterval = 60000L
- val OfflinePartition: Partition = new Partition(new TopicPartition("", -1),
- isOffline = true,
- replicaLagTimeMaxMs = 0L,
- interBrokerProtocolVersion = ApiVersion.latestVersion,
- localBrokerId = -1,
- time = null,
- replicaManager = null,
- logManager = null,
- zkClient = null)
}
class ReplicaManager(val config: KafkaConfig,
@@ -181,7 +192,7 @@ class ReplicaManager(val config: KafkaConfig,
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
private val localBrokerId = config.brokerId
- private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
+ private val allPartitions = new Pool[TopicPartition, HostedPartition](valueFactory = Some(tp =>
Partition(tp, time, this)))
private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
@@ -226,7 +237,7 @@ class ReplicaManager(val config: KafkaConfig,
val offlineReplicaCount = newGauge(
"OfflineReplicaCount",
new Gauge[Int] {
- def value = offlinePartitionsIterator.size
+ def value = offlinePartitionCount
}
)
val underReplicatedPartitions = newGauge(
@@ -248,9 +259,9 @@ class ReplicaManager(val config: KafkaConfig,
}
)
- val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
- val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
- val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
+ val isrExpandRate: Meter = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
+ val isrShrinkRate: Meter = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
+ val failedIsrUpdatesRate: Meter = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated)
@@ -295,40 +306,7 @@ class ReplicaManager(val config: KafkaConfig,
def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition)
- /**
- * Try to complete some delayed produce requests with the request key;
- * this can be triggered when:
- *
- * 1. The partition HW has changed (for acks = -1)
- * 2. A follower replica's fetch operation is received (for acks > 1)
- */
- def tryCompleteDelayedProduce(key: DelayedOperationKey) {
- val completed = delayedProducePurgatory.checkAndComplete(key)
- debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed))
- }
-
- /**
- * Try to complete some delayed fetch requests with the request key;
- * this can be triggered when:
- *
- * 1. The partition HW has changed (for regular fetch)
- * 2. A new message set is appended to the local log (for follower fetch)
- */
- def tryCompleteDelayedFetch(key: DelayedOperationKey) {
- val completed = delayedFetchPurgatory.checkAndComplete(key)
- debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
- }
-
- /**
- * Try to complete some delayed DeleteRecordsRequest with the request key;
- * this needs to be triggered when the partition low watermark has changed
- */
- def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) {
- val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key)
- debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed))
- }
-
- def hasDelayedElectionOperations = delayedElectPreferredLeaderPurgatory.delayed != 0
+ def hasDelayedElectionOperations: Boolean = delayedElectPreferredLeaderPurgatory.numDelayed != 0
def tryCompleteElection(key: DelayedOperationKey): Unit = {
val completed = delayedElectPreferredLeaderPurgatory.checkAndComplete(key)
@@ -350,24 +328,33 @@ class ReplicaManager(val config: KafkaConfig,
logDirFailureHandler.start()
}
+ private def maybeRemoveTopicMetrics(topic: String): Unit = {
+ val topicHasOnlinePartition = allPartitions.values.exists {
+ case partition: Partition => topic == partition.topic
+ case _ => false
+ }
+ if (!topicHasOnlinePartition)
+ brokerTopicStats.removeMetrics(topic)
+ }
+
def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean) = {
stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition")
if (deletePartition) {
- val removedPartition = allPartitions.remove(topicPartition)
- if (removedPartition eq ReplicaManager.OfflinePartition) {
- allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
- throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")
- }
+ getPartition(topicPartition) match {
+ case HostedPartition.Offline =>
+ throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")
+
+ case removedPartition: Partition =>
+ if (allPartitions.remove(topicPartition, removedPartition)) {
+ maybeRemoveTopicMetrics(topicPartition.topic)
+ // this will delete the local log. This call may throw exception if the log is on offline directory
+ removedPartition.delete()
+ }
- if (removedPartition != null) {
- val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic)
- if (!topicHasPartitions)
- brokerTopicStats.removeMetrics(topicPartition.topic)
- // this will delete the local log. This call may throw exception if the log is on offline directory
- removedPartition.delete()
- } else {
- stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker")
+ case HostedPartition.None =>
+ stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition " +
+ s"$topicPartition as replica doesn't exist on broker")
}
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
@@ -409,35 +396,46 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- def getOrCreatePartition(topicPartition: TopicPartition): Partition =
- allPartitions.getAndMaybePut(topicPartition)
+ def getPartition(topicPartition: TopicPartition): HostedPartition = {
+ Option(allPartitions.get(topicPartition)).getOrElse(HostedPartition.None)
+ }
- def getPartition(topicPartition: TopicPartition): Option[Partition] =
- Option(allPartitions.get(topicPartition))
+ // Visible for testing
+ def createPartition(topicPartition: TopicPartition): Partition = {
+ val partition = Partition(topicPartition, time, this)
+ allPartitions.put(topicPartition, partition)
+ partition
+ }
- def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] =
- getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition)
+ def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = {
+ getPartition(topicPartition) match {
+ case partition: Partition => Some(partition)
+ case _ => None
+ }
+ }
// An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after
// the iterator has been constructed could still be returned by this iterator.
- private def nonOfflinePartitionsIterator: Iterator[Partition] =
- allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)
-
- // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the
- // iterator has been constructed may not be visible.
- private def offlinePartitionsIterator: Iterator[Partition] =
- allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition)
+ private def nonOfflinePartitionsIterator: Iterator[Partition] = {
+ allPartitions.values.iterator.flatMap {
+ case p: Partition => Some(p)
+ case _ => None
+ }
+ }
+ private def offlinePartitionCount: Int = {
+ allPartitions.values.iterator.count(_ == HostedPartition.Offline)
+ }
def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = {
getPartition(topicPartition) match {
- case Some(partition) =>
- if (partition eq ReplicaManager.OfflinePartition)
- throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory")
- else
- partition
+ case partition: Partition =>
+ partition
- case None if metadataCache.contains(topicPartition) =>
+ case HostedPartition.Offline =>
+ throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory")
+
+ case HostedPartition.None if metadataCache.contains(topicPartition) =>
if (expectLeader) {
// The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which
// forces clients to refresh metadata to find the new location. This can happen, for example,
@@ -448,7 +446,7 @@ class ReplicaManager(val config: KafkaConfig,
throw new ReplicaNotAvailableException(s"Partition $topicPartition is not available")
}
- case None =>
+ case HostedPartition.None =>
throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist")
}
}
@@ -583,15 +581,17 @@ class ReplicaManager(val config: KafkaConfig,
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is offline")
- getPartition(topicPartition).foreach { partition =>
- if (partition eq ReplicaManager.OfflinePartition)
+ getPartition(topicPartition) match {
+ case partition: Partition =>
+ // Stop current replica movement if the destinationDir is different from the existing destination log directory
+ if (partition.futureReplicaDirChanged(destinationDir)) {
+ replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
+ partition.removeFutureLocalReplica()
+ }
+ case HostedPartition.Offline =>
throw new KafkaStorageException(s"Partition $topicPartition is offline")
- // Stop current replica movement if the destinationDir is different from the existing destination log directory
- if (partition.futureReplicaDirChanged(destinationDir)) {
- replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
- partition.removeFutureLocalReplica()
- }
+ case _ => // Do nothing
}
// If the log for this partition has not been created yet:
@@ -609,7 +609,8 @@ class ReplicaManager(val config: KafkaConfig,
// start ReplicaAlterDirThread to move data of this partition from the current log to the future log
// - Otherwise, return KafkaStorageException. We do not create the future log while there is offline log directory
// so that we can avoid creating future log for the same partition in multiple log directories.
- if (partition.maybeCreateFutureReplica(destinationDir)) {
+ val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
+ if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
val futureReplica = futureLocalReplicaOrException(topicPartition)
logManager.abortAndPauseCleaning(topicPartition)
@@ -779,10 +780,8 @@ class ReplicaManager(val config: KafkaConfig,
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
case t: Throwable =>
val logStartOffset = getPartition(topicPartition) match {
- case Some(partition) =>
- partition.logStartOffset
- case _ =>
- -1
+ case partition: Partition => partition.logStartOffset
+ case _ => -1L
}
brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
@@ -1060,43 +1059,52 @@ class ReplicaManager(val config: KafkaConfig,
val newPartitions = new mutable.HashSet[Partition]
leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
- val partition = getPartition(topicPartition).getOrElse {
- val createdPartition = getOrCreatePartition(topicPartition)
- newPartitions.add(createdPartition)
- createdPartition
+ val partitionOpt = getPartition(topicPartition) match {
+ case HostedPartition.Offline =>
+ stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+ s"controller $controllerId with correlation id $correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
+ "partition is in an offline log directory")
+ responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+ None
+
+ case partition: Partition => Some(partition)
+
+ case HostedPartition.None =>
+ val partition = Partition(topicPartition, time, this)
+ allPartitions.putIfNotExists(topicPartition, partition)
+ newPartitions.add(partition)
+ Some(partition)
}
- val currentLeaderEpoch = partition.getLeaderEpoch
- val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
- if (partition eq ReplicaManager.OfflinePartition) {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
- s"controller $controllerId with correlation id $correlationId " +
- s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
- "partition is in an offline log directory")
- responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
- } else if (requestLeaderEpoch > currentLeaderEpoch) {
- // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
- // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
- if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
- partitionState.put(partition, stateInfo)
- else {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
- s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
- s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
- responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+
+ partitionOpt.foreach { partition =>
+ val currentLeaderEpoch = partition.getLeaderEpoch
+ val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
+ if (requestLeaderEpoch > currentLeaderEpoch) {
+ // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
+ // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
+ if (stateInfo.basePartitionState.replicas.contains(localBrokerId))
+ partitionState.put(partition, stateInfo)
+ else {
+ stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
+ s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
+ s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
+ responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ }
+ } else if (requestLeaderEpoch < currentLeaderEpoch) {
+ stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+ s"controller $controllerId with correlation id $correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+ s"leader epoch $requestLeaderEpoch is smaller than the current " +
+ s"leader epoch $currentLeaderEpoch")
+ responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+ } else {
+ stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
+ s"controller $controllerId with correlation id $correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+ s"leader epoch $requestLeaderEpoch matches the current leader epoch")
+ responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
- } else if (requestLeaderEpoch < currentLeaderEpoch) {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
- s"controller $controllerId with correlation id $correlationId " +
- s"epoch $controllerEpoch for partition $topicPartition since its associated " +
- s"leader epoch $requestLeaderEpoch is smaller than the current " +
- s"leader epoch $currentLeaderEpoch")
- responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
- } else {
- stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
- s"controller $controllerId with correlation id $correlationId " +
- s"epoch $controllerEpoch for partition $topicPartition since its associated " +
- s"leader epoch $requestLeaderEpoch matches the current leader epoch")
- responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
}
@@ -1105,12 +1113,15 @@ class ReplicaManager(val config: KafkaConfig,
}
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
+ val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints)
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
- makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
+ makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
+ highWatermarkCheckpoints)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
- makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
+ makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
+ highWatermarkCheckpoints)
else
Set.empty[Partition]
@@ -1121,10 +1132,11 @@ class ReplicaManager(val config: KafkaConfig,
* In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
* we need to map this topic-partition to OfflinePartition instead.
*/
- if (localReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
- allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
+ if (localReplica(topicPartition).isEmpty)
+ markPartitionOffline(topicPartition)
}
+
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
if (!hwThreadInitialized) {
@@ -1140,7 +1152,7 @@ class ReplicaManager(val config: KafkaConfig,
val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
// Add future replica to partition's map
- partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false)
+ partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false, highWatermarkCheckpoints)
// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
@@ -1175,13 +1187,14 @@ class ReplicaManager(val config: KafkaConfig,
* TODO: the above may need to be fixed later
*/
private def makeLeaders(controllerId: Int,
- epoch: Int,
+ controllerEpoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
- responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
+ responseMap: mutable.Map[TopicPartition, Errors],
+ highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = {
partitionState.keys.foreach { partition =>
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " +
- s"controller $controllerId epoch $epoch starting the become-leader transition for " +
+ s"controller $controllerId epoch $controllerEpoch starting the become-leader transition for " +
s"partition ${partition.topicPartition}")
}
@@ -1196,20 +1209,20 @@ class ReplicaManager(val config: KafkaConfig,
// Update the partition information to be the leader
partitionState.foreach{ case (partition, partitionStateInfo) =>
try {
- if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
+ if (partition.makeLeader(controllerId, partitionStateInfo, correlationId, highWatermarkCheckpoints)) {
partitionsToMakeLeaders += partition
stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
- s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " +
+ s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})")
} else
stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
- s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " +
+ s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
s"since it is already the leader for the partition.")
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Skipped the become-leader state change with " +
- s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+ s"correlation id $correlationId from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " +
s"the replica for the partition is offline due to disk error $e")
val dirOpt = getLogDir(partition.topicPartition)
@@ -1222,7 +1235,7 @@ class ReplicaManager(val config: KafkaConfig,
case e: Throwable =>
partitionState.keys.foreach { partition =>
stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " +
- s"from controller $controllerId epoch $epoch for partition ${partition.topicPartition}", e)
+ s"from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition}", e)
}
// Re-throw the exception for it to be caught in KafkaApis
throw e
@@ -1230,7 +1243,7 @@ class ReplicaManager(val config: KafkaConfig,
partitionState.keys.foreach { partition =>
stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
- s"epoch $epoch for the become-leader transition for partition ${partition.topicPartition}")
+ s"epoch $controllerEpoch for the become-leader transition for partition ${partition.topicPartition}")
}
partitionsToMakeLeaders
@@ -1255,13 +1268,14 @@ class ReplicaManager(val config: KafkaConfig,
* return the set of partitions that are made follower due to this method
*/
private def makeFollowers(controllerId: Int,
- epoch: Int,
+ controllerEpoch: Int,
partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
- responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
+ responseMap: mutable.Map[TopicPartition, Errors],
+ highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = {
partitionStates.foreach { case (partition, partitionState) =>
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
- s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
+ s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
s"${partitionState.basePartitionState.leader}")
}
@@ -1269,7 +1283,6 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put(partition.topicPartition, Errors.NONE)
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
-
try {
// TODO: Delete leaders from LeaderAndIsrRequest
partitionStates.foreach { case (partition, partitionStateInfo) =>
@@ -1278,11 +1291,11 @@ class ReplicaManager(val config: KafkaConfig,
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(_) =>
- if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
+ if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, highWatermarkCheckpoints))
partitionsToMakeFollower += partition
else
stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
- s"follower with correlation id $correlationId from controller $controllerId epoch $epoch " +
+ s"follower with correlation id $correlationId from controller $controllerId epoch $controllerEpoch " +
s"for partition ${partition.topicPartition} (last update " +
s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
s"since the new leader $newLeaderBrokerId is the same as the old leader")
@@ -1290,17 +1303,17 @@ class ReplicaManager(val config: KafkaConfig,
// The leader broker should always be present in the metadata cache.
// If not, we should record the error message and abort the transition process for this partition
stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
- s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+ s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
// Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647)
- partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew)
+ partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew, highWatermarkCheckpoints)
}
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " +
- s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+ s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) with leader " +
s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e")
val dirOpt = getLogDir(partition.topicPartition)
@@ -1313,57 +1326,56 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
- s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " +
+ s"epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " +
s"${partitionStates(partition).basePartitionState.leader}")
}
partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
- tryCompleteDelayedProduce(topicPartitionOperationKey)
- tryCompleteDelayedFetch(topicPartitionOperationKey)
+ delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
+ delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
}
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " +
s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " +
- s"controller $controllerId epoch $epoch with leader ${partitionStates(partition).basePartitionState.leader}")
+ s"controller $controllerId epoch $controllerEpoch with leader ${partitionStates(partition).basePartitionState.leader}")
}
if (isShuttingDown.get()) {
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " +
- s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " +
+ s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader} " +
"since it is shutting down")
}
- }
- else {
+ } else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get
.brokerEndPoint(config.interBrokerListenerName)
val fetchOffset = partition.localReplicaOrException.highWatermark.messageOffset
partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
- }.toMap
- replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
+ }.toMap
- partitionsToMakeFollower.foreach { partition =>
+ replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
+ partitionsToMakeFollowerWithLeaderAndOffset.foreach { case (partition, initialFetchState) =>
stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " +
- s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " +
- s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}")
+ s"request from controller $controllerId epoch $controllerEpoch with correlation id $correlationId for " +
+ s"partition $partition with leader ${initialFetchState.leader}")
}
}
} catch {
case e: Throwable =>
stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " +
- s"received from controller $controllerId epoch $epoch", e)
+ s"received from controller $controllerId epoch $controllerEpoch", e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionStates.keys.foreach { partition =>
stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
- s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition} with leader " +
+ s"epoch $controllerEpoch for the become-follower transition for partition ${partition.topicPartition} with leader " +
s"${partitionStates(partition).basePartitionState.leader}")
}
@@ -1439,8 +1451,9 @@ class ReplicaManager(val config: KafkaConfig,
}
// Used only by test
- def markPartitionOffline(tp: TopicPartition) {
- allPartitions.put(tp, ReplicaManager.OfflinePartition)
+ def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock synchronized {
+ allPartitions.put(tp, HostedPartition.Offline)
+ Partition.removeMetrics(tp)
}
// logDir should be an absolute path
@@ -1467,13 +1480,10 @@ class ReplicaManager(val config: KafkaConfig,
partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false))
newOfflinePartitions.foreach { topicPartition =>
- val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
- partition.removePartitionMetrics()
+ markPartitionOffline(topicPartition)
}
newOfflinePartitions.map(_.topic).foreach { topic: String =>
- val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic)
- if (!topicHasPartitions)
- brokerTopicStats.removeMetrics(topic)
+ maybeRemoveTopicMetrics(topic)
}
highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir)
@@ -1524,17 +1534,17 @@ class ReplicaManager(val config: KafkaConfig,
def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = {
requestedEpochInfo.map { case (tp, partitionData) =>
val epochEndOffset = getPartition(tp) match {
- case Some(partition) =>
- if (partition eq ReplicaManager.OfflinePartition)
- new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
- else
- partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch,
- fetchOnlyFromLeader = true)
+ case partition: Partition =>
+ partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch,
+ fetchOnlyFromLeader = true)
+
+ case HostedPartition.Offline =>
+ new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
- case None if metadataCache.contains(tp) =>
+ case HostedPartition.None if metadataCache.contains(tp) =>
new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
- case None =>
+ case HostedPartition.None =>
new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
}
tp -> epochEndOffset
@@ -1552,7 +1562,7 @@ class ReplicaManager(val config: KafkaConfig,
results: Map[TopicPartition, ApiError]): Unit = {
if (expectedLeaders.nonEmpty) {
val watchKeys = expectedLeaders.map{
- case (tp, leader) => new TopicPartitionOperationKey(tp.topic, tp.partition)
+ case (tp, _) => new TopicPartitionOperationKey(tp)
}.toSeq
delayedElectPreferredLeaderPurgatory.tryCompleteElseWatch(
new DelayedElectPreferredLeader(deadline - time.milliseconds(), expectedLeaders, results,
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 2769cb4..715f42f 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -61,3 +61,18 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def read(): Map[TopicPartition, Long] = checkpoint.read().toMap
}
+
+trait OffsetCheckpoints {
+ def fetch(logDir: String, topicPartition: TopicPartition): Option[Long]
+}
+
+class SimpleOffsetCheckpoints(checkpointFilesByLogDir: Map[String, OffsetCheckpointFile])
+ extends OffsetCheckpoints {
+
+ override def fetch(logDir: String, topicPartition: TopicPartition): Option[Long] = {
+ val checkpoint = checkpointFilesByLogDir(logDir)
+ val offsetMap = checkpoint.read()
+ offsetMap.get(topicPartition)
+ }
+
+}
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 33de22b..e2733b8 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -39,13 +39,13 @@ object ReplicationUtils extends Logging {
try {
val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path)
val expectedLeaderOpt = TopicPartitionStateZNode.decode(expectedLeaderAndIsrInfo, writtenStat)
- val succeeded = writtenLeaderOpt.map { writtenData =>
+ val succeeded = writtenLeaderOpt.exists { writtenData =>
val writtenLeaderOpt = TopicPartitionStateZNode.decode(writtenData, writtenStat)
(expectedLeaderOpt, writtenLeaderOpt) match {
case (Some(expectedLeader), Some(writtenLeader)) if expectedLeader == writtenLeader => true
case _ => false
}
- }.getOrElse(false)
+ }
if (succeeded) (true, writtenStat.getVersion)
else (false, -1)
} catch {
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 654a92e..c5763ad 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -99,7 +99,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val newLeaderServer = servers.find(_.config.brokerId == 101).get
TestUtils.waitUntilTrue (
- () => newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
+ () => newLeaderServer.replicaManager.nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
"broker 101 should be the new leader", pause = 1L
)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index e3f7b4d..b3e5ade 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -22,16 +22,16 @@ import java.util.{Optional, Properties}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.api.{ApiVersion, Request}
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Metric
+import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
-import kafka.server.QuotaFactory.QuotaManagers
import kafka.server._
+import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
-import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException}
-import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.utils.Utils
@@ -39,26 +39,30 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, ListOffsetRequest}
import org.junit.{After, Before, Test}
import org.junit.Assert._
+import org.mockito.Mockito.{doNothing, mock, when}
import org.scalatest.Assertions.assertThrows
-import org.easymock.{Capture, EasyMock, IAnswer}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.ArgumentMatchers
import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
class PartitionTest {
val brokerId = 101
val topicPartition = new TopicPartition("test-topic", 0)
val time = new MockTime()
- val brokerTopicStats = new BrokerTopicStats
- val metrics = new Metrics
-
var tmpDir: File = _
var logDir1: File = _
var logDir2: File = _
- var replicaManager: ReplicaManager = _
var logManager: LogManager = _
var logConfig: LogConfig = _
- var quotaManagers: QuotaManagers = _
+ val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
+ val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
+ val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+ val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+ var partition: Partition = _
@Before
def setup(): Unit = {
@@ -72,20 +76,19 @@ class PartitionTest {
logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
logManager.startup()
- val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
- brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
- val brokerConfig = KafkaConfig.fromProps(brokerProps)
- val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
- quotaManagers = QuotaFactory.instantiate(brokerConfig, metrics, time, "")
- replicaManager = new ReplicaManager(
- config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time),
- logManager, new AtomicBoolean(false), quotaManagers,
- brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size))
-
- EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(logProps).anyTimes()
- EasyMock.expect(kafkaZkClient.conditionalUpdatePath(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
- .andReturn((true, 0)).anyTimes()
- EasyMock.replay(kafkaZkClient)
+ partition = new Partition(topicPartition,
+ replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+ interBrokerProtocolVersion = ApiVersion.latestVersion,
+ localBrokerId = brokerId,
+ time,
+ stateStore,
+ delayedOperations,
+ metadataCache,
+ logManager)
+
+ when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
+ when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
+ .thenReturn(None)
}
private def createLogProperties(overrides: Map[String, String]): Properties = {
@@ -99,14 +102,8 @@ class PartitionTest {
@After
def tearDown(): Unit = {
- brokerTopicStats.close()
- metrics.close()
-
logManager.shutdown()
Utils.delete(tmpDir)
- logManager.liveLogDirs.foreach(Utils.delete)
- replicaManager.shutdown(checkpointHW = false)
- quotaManagers.shutdown()
}
@Test
@@ -168,17 +165,9 @@ class PartitionTest {
val latch = new CountDownLatch(1)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
- val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+ partition.getOrCreateReplica(brokerId, isNew = true, offsetCheckpoints)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
- val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
- val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
- val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
- val partition = Partition(topicPartition, time, replicaManager)
-
- partition.addReplicaIfNotExists(futureReplica)
- partition.addReplicaIfNotExists(currentReplica)
- assertEquals(Some(currentReplica), partition.localReplica)
- assertEquals(Some(futureReplica), partition.futureLocalReplica)
+ partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
val thread1 = new Thread {
override def run(): Unit = {
@@ -207,10 +196,15 @@ class PartitionTest {
// Verify that replacement works when the replicas have the same log end offset but different base offsets in the
// active segment
def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
- // Write records with duplicate keys to current replica and roll at offset 6
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
- val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
- log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+ val currentReplica = partition.getOrCreateReplica(brokerId, isNew = true, offsetCheckpoints)
+ logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+ partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
+ val futureReplica = partition.futureLocalReplicaOrException
+
+ // Write records with duplicate keys to current replica and roll at offset 6
+ val currentLog = currentReplica.log.get
+ currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k1".getBytes, "v2".getBytes),
new SimpleRecord("k1".getBytes, "v3".getBytes),
@@ -218,15 +212,14 @@ class PartitionTest {
new SimpleRecord("k2".getBytes, "v5".getBytes),
new SimpleRecord("k2".getBytes, "v6".getBytes)
), leaderEpoch = 0)
- log1.roll()
- log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+ currentLog.roll()
+ currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
new SimpleRecord("k3".getBytes, "v7".getBytes),
new SimpleRecord("k4".getBytes, "v8".getBytes)
), leaderEpoch = 0)
// Write to the future replica as if the log had been compacted, and do not roll the segment
- logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
- val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+
val buffer = ByteBuffer.allocate(1024)
val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0)
@@ -235,16 +228,7 @@ class PartitionTest {
builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
- log2.appendAsFollower(builder.build())
-
- val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
- val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
- val partition = Partition(topicPartition, time, replicaManager)
-
- partition.addReplicaIfNotExists(futureReplica)
- partition.addReplicaIfNotExists(currentReplica)
- assertEquals(Some(currentReplica), partition.localReplica)
- assertEquals(Some(futureReplica), partition.futureLocalReplica)
+ futureReplica.log.get.appendAsFollower(builder.build())
assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
}
@@ -491,9 +475,10 @@ class PartitionTest {
new SimpleRecord(20,"k4".getBytes, "v2".getBytes),
new SimpleRecord(21,"k5".getBytes, "v3".getBytes)))
- val partition = Partition(topicPartition, time, replicaManager)
+ val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true)
+
assertTrue("Expected first makeLeader() to return 'leader changed'",
- partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0))
+ partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
@@ -532,13 +517,16 @@ class PartitionTest {
}
}
+ when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch,
+ List(leader, follower2, follower1), 1)))
+ .thenReturn(Some(2))
+
// Update follower 1
partition.updateReplicaLogReadResult(
follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
partition.updateReplicaLogReadResult(
follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(2), batch2), leaderReplica))
- // Update follower 2
partition.updateReplicaLogReadResult(
follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
partition.updateReplicaLogReadResult(
@@ -565,12 +553,14 @@ class PartitionTest {
assertEquals(Right(None), fetchOffsetsForTimestamp(30, Some(IsolationLevel.READ_UNCOMMITTED)))
// Make into a follower
- assertTrue(partition.makeFollower(controllerId,
- new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1, replicas, false), 1))
+ val followerState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2,
+ leaderEpoch + 1, isr, 4, replicas, false)
+ assertTrue(partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints))
// Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition
- assertTrue(partition.makeLeader(controllerId,
- new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas, false), 2))
+ val newLeaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 5,
+ replicas, false)
+ assertTrue(partition.makeLeader(controllerId, newLeaderState, 2, offsetCheckpoints))
// Try to get offsets as a client
fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
@@ -611,6 +601,9 @@ class PartitionTest {
case Left(e: ApiException) => fail(s"Should have seen OffsetNotAvailableException, saw $e")
}
+ when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2,
+ List(leader, follower2, follower1), 5)))
+ .thenReturn(Some(2))
// Next fetch from replicas, HW is moved up to 5 (ahead of the LEO)
partition.updateReplicaLogReadResult(
@@ -629,27 +622,10 @@ class PartitionTest {
assertEquals(Right(None), fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED)))
}
-
private def setupPartitionWithMocks(leaderEpoch: Int,
isLeader: Boolean,
log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
- val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
- val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
- val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
-
- val partition = new Partition(topicPartition,
- isOffline = false,
- replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
- interBrokerProtocolVersion = ApiVersion.latestVersion,
- localBrokerId = brokerId,
- time,
- replicaManager,
- logManager,
- zkClient)
-
- EasyMock.replay(replicaManager, zkClient)
-
- partition.addReplicaIfNotExists(replica)
+ val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
val controllerId = 0
val controllerEpoch = 0
@@ -659,13 +635,13 @@ class PartitionTest {
if (isLeader) {
assertTrue("Expected become leader transition to succeed",
partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
- leaderEpoch, isr, 1, replicas, true), 0))
+ leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(Some(replica), partition.leaderReplicaIfLocal)
} else {
assertTrue("Expected become follower transition to succeed",
partition.makeFollower(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId + 1,
- leaderEpoch, isr, 1, replicas, true), 0))
+ leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(None, partition.leaderReplicaIfLocal)
}
@@ -675,12 +651,7 @@ class PartitionTest {
@Test
def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
- val log = logManager.getOrCreateLog(topicPartition, logConfig)
- val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
- val partition = Partition(topicPartition, time, replicaManager)
- partition.addReplicaIfNotExists(replica)
- assertEquals(Some(replica), partition.localReplica)
-
+ val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
val initialLogStartOffset = 5L
partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false)
assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:",
@@ -728,37 +699,19 @@ class PartitionTest {
@Test
def testListOffsetIsolationLevels(): Unit = {
- val log = logManager.getOrCreateLog(topicPartition, logConfig)
- val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
- val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
- val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
-
- val partition = new Partition(topicPartition,
- isOffline = false,
- replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
- interBrokerProtocolVersion = ApiVersion.latestVersion,
- localBrokerId = brokerId,
- time,
- replicaManager,
- logManager,
- zkClient)
-
val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicas
- EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.anyObject[TopicPartitionOperationKey]))
- .andVoid()
-
- EasyMock.replay(replicaManager, zkClient)
+ doNothing().when(delayedOperations).checkAndCompleteFetch()
- partition.addReplicaIfNotExists(replica)
+ val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
- leaderEpoch, isr, 1, replicas, true), 0))
+ leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(Some(replica), partition.leaderReplicaIfLocal)
@@ -804,23 +757,18 @@ class PartitionTest {
@Test
def testGetReplica(): Unit = {
- val log = logManager.getOrCreateLog(topicPartition, logConfig)
- val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
- val partition = Partition(topicPartition, time, replicaManager)
-
assertEquals(None, partition.localReplica)
assertThrows[ReplicaNotAvailableException] {
partition.localReplicaOrException
}
- partition.addReplicaIfNotExists(replica)
+ val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints)
assertEquals(Some(replica), partition.localReplica)
assertEquals(replica, partition.localReplicaOrException)
}
@Test
def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
- val partition = Partition(topicPartition, time, replicaManager)
assertThrows[ReplicaNotAvailableException] {
partition.appendRecordsToFollowerOrFutureReplica(
createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false)
@@ -829,22 +777,20 @@ class PartitionTest {
@Test
def testMakeFollowerWithNoLeaderIdChange(): Unit = {
- val partition = Partition(topicPartition, time, replicaManager)
-
// Start off as follower
var partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 1,
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
- partition.makeFollower(0, partitionStateInfo, 0)
+ partition.makeFollower(0, partitionStateInfo, 0, offsetCheckpoints)
// Request with same leader and epoch increases by only 1, do become-follower steps
partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4,
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
- assertTrue(partition.makeFollower(0, partitionStateInfo, 2))
+ assertTrue(partition.makeFollower(0, partitionStateInfo, 2, offsetCheckpoints))
// Request with same leader and same epoch, skip become-follower steps
partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4,
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
- assertFalse(partition.makeFollower(0, partitionStateInfo, 2))
+ assertFalse(partition.makeFollower(0, partitionStateInfo, 2, offsetCheckpoints))
}
@Test
@@ -865,9 +811,9 @@ class PartitionTest {
val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes),
new SimpleRecord("k7".getBytes, "v2".getBytes)))
- val partition = Partition(topicPartition, time, replicaManager)
+ val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true)
assertTrue("Expected first makeLeader() to return 'leader changed'",
- partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0))
+ partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
@@ -899,11 +845,14 @@ class PartitionTest {
assertEquals("Expected leader's HW", lastOffsetOfFirstBatch, leaderReplica.highWatermark.messageOffset)
// current leader becomes follower and then leader again (without any new records appended)
- partition.makeFollower(
- controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1, replicas, false), 1)
+ val followerState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1,
+ replicas, false)
+ partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints)
+
+ val newLeaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 1,
+ replicas, false)
assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()",
- partition.makeLeader(controllerEpoch, new LeaderAndIsrRequest.PartitionState(
- controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas, false), 2))
+ partition.makeLeader(controllerEpoch, newLeaderState, 2, offsetCheckpoints))
val currentLeaderEpochStartOffset = leaderReplica.logEndOffset
// append records with the latest leader epoch
@@ -918,8 +867,10 @@ class PartitionTest {
// fetch from the follower not in ISR from start offset of the current leader epoch should
// add this follower to ISR
+ when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2,
+ List(leader, follower2, follower1), 1))).thenReturn(Some(2))
partition.updateReplicaLogReadResult(follower1Replica,
- readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica))
+ readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica))
assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId))
}
@@ -932,8 +883,6 @@ class PartitionTest {
*/
@Test
def testDelayedFetchAfterAppendRecords(): Unit = {
- val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
- val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
val controllerId = 0
val controllerEpoch = 0
val leaderEpoch = 5
@@ -944,35 +893,38 @@ class PartitionTest {
val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) }
val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, logConfig) }
val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) }
- val partitions = replicas.map { replica =>
+ val partitions = ListBuffer.empty[Partition]
+
+ replicas.foreach { replica =>
val tp = replica.topicPartition
+ val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val partition = new Partition(tp,
- isOffline = false,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
- replicaManager,
- logManager,
- zkClient)
+ stateStore,
+ delayedOperations,
+ metadataCache,
+ logManager)
+
+ when(delayedOperations.checkAndCompleteFetch())
+ .thenAnswer(new Answer[Unit] {
+ override def answer(invocation: InvocationOnMock): Unit = {
+ // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
+ val anotherPartition = (tp.partition + 1) % topicPartitions.size
+ val partition = partitions(anotherPartition)
+ partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
+ }
+ })
+
partition.addReplicaIfNotExists(replica)
- partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
- leaderEpoch, isr, 1, replicaIds, true), 0)
- partition
+ val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
+ leaderEpoch, isr, 1, replicaIds, true)
+ partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+ partitions += partition
}
- // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
- val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture()
- EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey)))
- .andAnswer(new IAnswer[Unit] {
- override def answer(): Unit = {
- val anotherPartition = (tpKey.getValue.partition + 1) % topicPartitions.size
- val partition = partitions(anotherPartition)
- partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
- }
- }).anyTimes()
- EasyMock.replay(replicaManager, zkClient)
-
def createRecords(baseOffset: Long): MemoryRecords = {
val records = List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
@@ -1050,10 +1002,60 @@ class PartitionTest {
val isr = List[Integer](leader).asJava
val leaderEpoch = 8
- val partition = Partition(topicPartition, time, replicaManager)
assertFalse(partition.isAtMinIsr)
// Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1)
- partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)
+ val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true)
+ partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
assertTrue(partition.isAtMinIsr)
}
+
+ @Test
+ def testUseCheckpointToInitializeHighWatermark(): Unit = {
+ val log = logManager.getOrCreateLog(topicPartition, logConfig)
+ log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes),
+ new SimpleRecord("k3".getBytes, "v3".getBytes),
+ new SimpleRecord("k4".getBytes, "v4".getBytes)
+ ), leaderEpoch = 0)
+ log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 5,
+ new SimpleRecord("k5".getBytes, "v5".getBytes),
+ new SimpleRecord("k5".getBytes, "v5".getBytes)
+ ), leaderEpoch = 5)
+
+ when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition))
+ .thenReturn(Some(4L))
+
+ val controllerId = 0
+ val controllerEpoch = 3
+ val replicas = List[Integer](brokerId, brokerId + 1).asJava
+ val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
+ 6, replicas, 1, replicas, false)
+ partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+ assertEquals(4, partition.localReplicaOrException.highWatermark.messageOffset)
+ }
+
+ @Test
+ def testAddAndRemoveMetrics(): Unit = {
+ val metricsToCheck = List(
+ "UnderReplicated",
+ "UnderMinIsr",
+ "InSyncReplicasCount",
+ "ReplicasCount",
+ "LastStableOffsetLag",
+ "AtMinIsr")
+
+ def getMetric(metric: String): Option[Metric] = {
+ Metrics.defaultRegistry().allMetrics().asScala.filterKeys { metricName =>
+ metricName.getName == metric && metricName.getType == "Partition"
+ }.headOption.map(_._2)
+ }
+
+ assertTrue(metricsToCheck.forall(getMetric(_).isDefined))
+
+ Partition.removeMetrics(topicPartition)
+
+ assertEquals(Set(), Metrics.defaultRegistry().allMetrics().asScala.keySet.filter(_.getType == "Partition"))
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 280fc8e..770868c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
import java.util.Optional
import kafka.common.OffsetAndMetadata
-import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
+import kafka.server.{DelayedOperationPurgatory, KafkaConfig, HostedPartition, ReplicaManager}
import kafka.utils._
import kafka.utils.timer.MockTimer
import org.apache.kafka.common.TopicPartition
@@ -1092,7 +1092,8 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, syncGroupError)
EasyMock.reset(replicaManager)
- EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None)
+ EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
+ .andReturn(HostedPartition.None)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
@@ -1837,7 +1838,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
- EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
@@ -2334,7 +2335,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
- EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
@@ -2375,7 +2376,7 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
- EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
@@ -2687,7 +2688,8 @@ class GroupCoordinatorTest {
private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
val (responseFuture, responseCallback) = setupHeartbeatCallback
- EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None)
+ EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
+ .andReturn(HostedPartition.None)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index dab2d72..0487178 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -2025,7 +2025,7 @@ class GroupMetadataManagerTest {
}
private def mockGetPartition(): Unit = {
- EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition)
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index 23ac2dc..6c74ce3 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -191,10 +191,9 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
if (isEpochInRequestStale) {
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
- }
- else {
+ } else {
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
- assertTrue(broker2.replicaManager.getPartition(tp).isEmpty)
+ assertEquals(HostedPartition.None, broker2.replicaManager.getPartition(tp))
}
}
} finally {
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 3b077a0..c1e0b9f 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -86,17 +86,17 @@ class DelayedOperationTest {
purgatory.tryCompleteElseWatch(r2, Array("test1", "test2"))
purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3"))
- assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed)
+ assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.numDelayed)
assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched)
// complete the operations, it should immediately be purged from the delayed operation
r2.completable = true
r2.tryComplete()
- assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed, 2, purgatory.delayed)
+ assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed, 2, purgatory.numDelayed)
r3.completable = true
r3.tryComplete()
- assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed, 1, purgatory.delayed)
+ assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed, 1, purgatory.numDelayed)
// checking a watch should purge the watch list
purgatory.checkAndComplete("test1")
@@ -117,7 +117,7 @@ class DelayedOperationTest {
val cancelledOperations = purgatory.cancelForKey("key")
assertEquals(2, cancelledOperations.size)
- assertEquals(1, purgatory.delayed)
+ assertEquals(1, purgatory.numDelayed)
assertEquals(1, purgatory.watched)
}
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 61cbd2c..3da22bb 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -72,7 +72,7 @@ class HighwatermarkPersistenceTest {
var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
assertEquals(0L, fooPartition0Hw)
val tp0 = new TopicPartition(topic, 0)
- val partition0 = replicaManager.getOrCreatePartition(tp0)
+ val partition0 = replicaManager.createPartition(tp0)
// create leader and follower replicas
val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), LogConfig())
val leaderReplicaPartition0 = new Replica(configs.head.brokerId, tp0, time, 0, Some(log0))
@@ -117,7 +117,7 @@ class HighwatermarkPersistenceTest {
var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(0L, topic1Partition0Hw)
val t1p0 = new TopicPartition(topic1, 0)
- val topic1Partition0 = replicaManager.getOrCreatePartition(t1p0)
+ val topic1Partition0 = replicaManager.createPartition(t1p0)
// create leader log
val topic1Log0 = logManagers.head.getOrCreateLog(t1p0, LogConfig())
// create a local replica for topic1
@@ -134,7 +134,7 @@ class HighwatermarkPersistenceTest {
assertEquals(5L, topic1Partition0Hw)
// add another partition and set highwatermark
val t2p0 = new TopicPartition(topic2, 0)
- val topic2Partition0 = replicaManager.getOrCreatePartition(t2p0)
+ val topic2Partition0 = replicaManager.createPartition(t2p0)
// create leader log
val topic2Log0 = logManagers.head.getOrCreateLog(t2p0, LogConfig())
// create a local replica for topic2
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 006067e..1dd4b24 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -227,7 +227,7 @@ class IsrExpirationTest {
localLog: Log): Partition = {
val leaderId = config.brokerId
val tp = new TopicPartition(topic, partitionId)
- val partition = replicaManager.getOrCreatePartition(tp)
+ val partition = replicaManager.createPartition(tp)
val leaderReplica = new Replica(leaderId, tp, time, 0, Some(localLog))
val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 0fd289c..6b69c41 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.server.LogDirFailureTest._
import kafka.api.IntegrationTestHarness
+import kafka.cluster.Partition
import kafka.controller.{OfflineReplica, PartitionAndReplica}
import kafka.utils.{CoreUtils, Exit, TestUtils}
import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -112,14 +113,16 @@ class LogDirFailureTest extends IntegrationTestHarness {
// Send a message to another partition whose leader is the same as partition 0
// so that ReplicaFetcherThread on the follower will get response from leader immediately
val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i =>
- leaderServer.replicaManager.getPartition(new TopicPartition(topic, i)).flatMap(_.leaderReplicaIfLocal).isDefined
+ leaderServer.replicaManager.nonOfflinePartition(new TopicPartition(topic, i))
+ .flatMap(_.leaderReplicaIfLocal).isDefined
}.get
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader, topic.getBytes, "message".getBytes)
// When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on the follower
// has fetched from the leader and attempts to append to the offline replica.
producer.send(record).get
- assertEquals(brokerCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
+ assertEquals(brokerCount, leaderServer.replicaManager.nonOfflinePartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader))
+ .get.inSyncReplicas.size)
followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread =>
assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete)
}
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 15f9a9b..780d189 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -23,6 +23,7 @@ import TestUtils._
import kafka.zk.ZooKeeperTestHarness
import java.io.File
+import kafka.cluster.Partition
import kafka.server.checkpoints.OffsetCheckpointFile
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
@@ -147,7 +148,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
* is that server1 has caught up on the topicPartition, and has joined the ISR.
* In the line below, we wait until the condition is met before shutting down server2
*/
- waitUntilTrue(() => server2.replicaManager.getPartition(topicPartition).get.inSyncReplicas.size == 2,
+ waitUntilTrue(() => server2.replicaManager.nonOfflinePartition(topicPartition).get.inSyncReplicas.size == 2,
"Server 1 is not able to join the ISR after restart")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 4049504..d149b8a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -614,12 +614,12 @@ class ReplicaAlterLogDirsThreadTest {
expect(replicaManager.futureLocalReplica(t1p0)).andReturn(Some(futureReplica)).anyTimes()
expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replicaT1p0).anyTimes()
expect(replicaManager.futureLocalReplicaOrException(t1p0)).andReturn(futureReplica).anyTimes()
- expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
+ expect(replicaManager.nonOfflinePartition(t1p0)).andReturn(Some(partition)).anyTimes()
expect(replicaManager.localReplica(t1p1)).andReturn(Some(replicaT1p1)).anyTimes()
expect(replicaManager.futureLocalReplica(t1p1)).andReturn(Some(futureReplica)).anyTimes()
expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replicaT1p1).anyTimes()
expect(replicaManager.futureLocalReplicaOrException(t1p1)).andReturn(futureReplica).anyTimes()
- expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
+ expect(replicaManager.nonOfflinePartition(t1p1)).andReturn(Some(partition)).anyTimes()
}
def stubWithFetchMessages(replicaT1p0: Replica, replicaT1p1: Replica, futureReplica: Replica,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index d6ebdd6..a51641a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -98,9 +98,9 @@ class ReplicaFetcherThreadTest {
stub(replica, partition, replicaManager)
//Expectations
- expect(partition.truncateTo(anyLong(), anyBoolean())).once
+ expect(partition.truncateTo(anyLong(), anyBoolean())).times(3)
- replay(replicaManager, logManager, quota, replica)
+ replay(replicaManager, logManager, quota, replica, partition)
//Define the offsets for the OffsetsForLeaderEpochResponse
val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1),
@@ -227,9 +227,9 @@ class ReplicaFetcherThreadTest {
stub(replica, partition, replicaManager)
//Expectations
- expect(partition.truncateTo(anyLong(), anyBoolean())).once
+ expect(partition.truncateTo(anyLong(), anyBoolean())).times(2)
- replay(replicaManager, logManager, replica)
+ replay(replicaManager, logManager, replica, partition)
//Define the offsets for the OffsetsForLeaderEpochResponse
val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1)).asJava
@@ -609,6 +609,7 @@ class ReplicaFetcherThreadTest {
val leaderEpoch = 4
//Stub return values
+ expect(partition.truncateTo(0L, false)).times(2)
expect(replica.logEndOffset).andReturn(0).anyTimes()
expect(replica.highWatermark).andReturn(new LogOffsetMetadata(0)).anyTimes()
expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes()
@@ -618,7 +619,7 @@ class ReplicaFetcherThreadTest {
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager)
- replay(replicaManager, logManager, quota, replica)
+ replay(replicaManager, logManager, quota, replica, partition)
//Define the offsets for the OffsetsForLeaderEpochResponse
val offsetsReply = Map(
@@ -727,15 +728,13 @@ class ReplicaFetcherThreadTest {
verify(mockBlockingSend)
}
- def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager) = {
- expect(replicaManager.localReplica(t1p0)).andReturn(Some(replica)).anyTimes()
+ def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager): Unit = {
expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replica).anyTimes()
- expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
- expect(replicaManager.localReplica(t1p1)).andReturn(Some(replica)).anyTimes()
+ expect(replicaManager.nonOfflinePartition(t1p0)).andReturn(Some(partition)).anyTimes()
expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replica).anyTimes()
- expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
- expect(replicaManager.localReplica(t2p1)).andReturn(Some(replica)).anyTimes()
+ expect(replicaManager.nonOfflinePartition(t1p1)).andReturn(Some(partition)).anyTimes()
expect(replicaManager.localReplicaOrException(t2p1)).andReturn(replica).anyTimes()
- expect(replicaManager.getPartition(t2p1)).andReturn(Some(partition)).anyTimes()
+ expect(replicaManager.nonOfflinePartition(t2p1)).andReturn(Some(partition)).anyTimes()
+ expect(partition.localReplicaOrException).andReturn(replica).anyTimes()
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 5b2f2ae..c2d92df 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -237,7 +237,7 @@ class ReplicaManagerQuotasTest {
//create the two replicas
for ((p, _) <- fetchInfo) {
- val partition = replicaManager.getOrCreatePartition(p)
+ val partition = replicaManager.createPartition(p)
val leaderReplica = new Replica(configs.head.brokerId, p, time, 0, Some(log))
leaderReplica.highWatermark = new LogOffsetMetadata(5)
partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1c1cbd6..b7239e0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,6 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker
import kafka.cluster.BrokerEndPoint
+import kafka.server.checkpoints.SimpleOffsetCheckpoints
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.timer.MockTimer
import kafka.zk.KafkaZkClient
@@ -85,8 +86,8 @@ class ReplicaManagerTest {
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
- val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
- partition.getOrCreateReplica(1)
+ val partition = rm.createPartition(new TopicPartition(topic, 1))
+ partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks()
} finally {
// shutdown the replica manager upon test completion
@@ -104,8 +105,8 @@ class ReplicaManagerTest {
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
- val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
- partition.getOrCreateReplica(1)
+ val partition = rm.createPartition(new TopicPartition(topic, 1))
+ partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks()
} finally {
// shutdown the replica manager upon test completion
@@ -158,8 +159,8 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
- val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0)
+ val partition = rm.createPartition(new TopicPartition(topic, 0))
+ partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) ->
@@ -202,8 +203,8 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
- val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0)
+ val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
+ partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -253,8 +254,8 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
- val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0)
+ val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
+ partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -349,8 +350,8 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
- val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0)
+ val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
+ partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -415,8 +416,8 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1, 2).asJava
- val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
- partition.getOrCreateReplica(0)
+ val partition = rm.createPartition(new TopicPartition(topic, 0))
+ partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -465,8 +466,9 @@ class ReplicaManagerTest {
// Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
- replicaManager.getOrCreatePartition(tp0).getOrCreateReplica(0)
- replicaManager.getOrCreatePartition(tp1).getOrCreateReplica(0)
+ val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+ replicaManager.createPartition(tp0).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
+ replicaManager.createPartition(tp1).getOrCreateReplica(0, isNew = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](0, 2).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -556,11 +558,12 @@ class ReplicaManagerTest {
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true)
// Initialize partition state to follower, with leader = 1, leaderEpoch = 1
- val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, topicPartition))
- partition.getOrCreateReplica(followerBrokerId)
+ val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition))
+ val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+ partition.getOrCreateReplica(followerBrokerId, isNew = false, offsetCheckpoints)
partition.makeFollower(controllerId,
leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
- correlationId)
+ correlationId, offsetCheckpoints)
// Make local partition a follower - because epoch increased by more than 1, truncation should
// trigger even though leader does not change
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 94f9a16..41c6b3e 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -117,7 +117,7 @@ class SimpleFetchTest {
new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
// add the partition with two replicas, both in ISR
- val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
+ val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
// create the leader replica with the local log
val leaderReplica = new Replica(configs.head.brokerId, partition.topicPartition, time, 0, Some(log))
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index ac6dedc..8349541 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -444,7 +444,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
private def awaitISR(tp: TopicPartition): Unit = {
TestUtils.waitUntilTrue(() => {
- leader.replicaManager.getPartition(tp).get.inSyncReplicas.map(_.brokerId).size == 2
+ leader.replicaManager.nonOfflinePartition(tp).get.inSyncReplicas.map(_.brokerId).size == 2
}, "Timed out waiting for replicas to join ISR")
}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 3d3b342..eba4167 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -57,7 +57,7 @@ class OffsetsForLeaderEpochTest {
val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
- val partition = replicaManager.getOrCreatePartition(tp)
+ val partition = replicaManager.createPartition(tp)
val leaderReplica = new Replica(config.brokerId, partition.topicPartition, time, 0, Some(mockLog))
partition.addReplicaIfNotExists(leaderReplica)
partition.leaderReplicaIdOpt = Some(config.brokerId)
@@ -79,7 +79,7 @@ class OffsetsForLeaderEpochTest {
val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
- replicaManager.getOrCreatePartition(tp)
+ replicaManager.createPartition(tp)
//Given
val epochRequested: Integer = 5
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 59ee426..c7f5c24 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -822,14 +822,14 @@ object TestUtils extends Logging {
}
def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = {
- server.replicaManager.getPartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined)
+ server.replicaManager.nonOfflinePartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined)
}
def findLeaderEpoch(brokerId: Int,
topicPartition: TopicPartition,
servers: Iterable[KafkaServer]): Int = {
val leaderServer = servers.find(_.config.brokerId == brokerId)
- val leaderPartition = leaderServer.flatMap(_.replicaManager.getPartition(topicPartition))
+ val leaderPartition = leaderServer.flatMap(_.replicaManager.nonOfflinePartition(topicPartition))
.getOrElse(fail(s"Failed to find expected replica on broker $brokerId"))
leaderPartition.getLeaderEpoch
}
@@ -837,7 +837,7 @@ object TestUtils extends Logging {
def findFollowerId(topicPartition: TopicPartition,
servers: Iterable[KafkaServer]): Int = {
val followerOpt = servers.find { server =>
- server.replicaManager.getPartition(topicPartition) match {
+ server.replicaManager.nonOfflinePartition(topicPartition) match {
case Some(partition) => !partition.leaderReplicaIdOpt.contains(server.config.brokerId)
case None => false
}
@@ -903,7 +903,7 @@ object TestUtils extends Logging {
def newLeaderExists: Option[Int] = {
servers.find { server =>
server.config.brokerId != oldLeader &&
- server.replicaManager.getPartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
+ server.replicaManager.nonOfflinePartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
}.map(_.config.brokerId)
}
@@ -918,7 +918,7 @@ object TestUtils extends Logging {
timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
def leaderIfExists: Option[Int] = {
servers.find { server =>
- server.replicaManager.getPartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
+ server.replicaManager.nonOfflinePartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
}.map(_.config.brokerId)
}
@@ -1056,7 +1056,7 @@ object TestUtils extends Logging {
"Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic))
// ensure that the topic-partition has been deleted from all brokers' replica managers
TestUtils.waitUntilTrue(() =>
- servers.forall(server => topicPartitions.forall(tp => server.replicaManager.getPartition(tp).isEmpty)),
+ servers.forall(server => topicPartitions.forall(tp => server.replicaManager.nonOfflinePartition(tp).isEmpty)),
"Replica manager's should have deleted all of this topic's partitions")
// ensure that logs from all replicas are deleted if delete topic is marked successful in ZooKeeper
assertTrue("Replica logs not deleted after delete topic is complete",