You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/06 21:51:30 UTC
[2/3] kafka git commit: KIP-101: Alter Replication Protocol to use
Leader Epoch rather than High Watermark for Truncation
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index cce59ce..94ed66c 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -17,30 +17,27 @@
package kafka.server
-import java.net.SocketTimeoutException
import java.util
import kafka.admin.AdminUtils
-import kafka.cluster.BrokerEndPoint
-import kafka.log.LogConfig
import kafka.api.{FetchRequest => _, _}
+import kafka.cluster.{BrokerEndPoint, Replica}
import kafka.common.KafkaStorageException
-import ReplicaFetcherThread._
+import kafka.log.LogConfig
+import kafka.server.ReplicaFetcherThread._
+import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.requests.EpochEndOffset._
import kafka.utils.Exit
-import org.apache.kafka.clients._
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
-import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
-import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
-import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest}
import org.apache.kafka.common.utils.Time
-import scala.collection.Map
import scala.collection.JavaConverters._
+import scala.collection.{Map, mutable}
class ReplicaFetcherThread(name: String,
fetcherId: Int,
@@ -49,16 +46,20 @@ class ReplicaFetcherThread(name: String,
replicaMgr: ReplicaManager,
metrics: Metrics,
time: Time,
- quota: ReplicationQuotaManager)
+ quota: ReplicationQuotaManager,
+ leaderEndpointBlockingSend: Option[BlockingSend] = None)
extends AbstractFetcherThread(name = name,
clientId = name,
sourceBroker = sourceBroker,
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
- isInterruptible = false) {
+ isInterruptible = false,
+ includeLogTruncation = true) {
type REQ = FetchRequest
type PD = PartitionData
+ private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(
+ new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId, s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId"))
private val fetchRequestVersion: Short =
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
@@ -66,57 +67,18 @@ class ReplicaFetcherThread(name: String,
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
else 0
- private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
private val replicaId = brokerConfig.brokerId
private val maxWait = brokerConfig.replicaFetchWaitMaxMs
private val minBytes = brokerConfig.replicaFetchMinBytes
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
+ private val shouldSendLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
- private def clientId = name
-
- private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
-
- // we need to include both the broker id and the fetcher id
- // as the metrics tag to avoid metric name conflicts with
- // more than one fetcher thread to the same broker
- private val networkClient = {
- val channelBuilder = ChannelBuilders.clientChannelBuilder(
- brokerConfig.interBrokerSecurityProtocol,
- JaasContext.Type.SERVER,
- brokerConfig,
- brokerConfig.interBrokerListenerName,
- brokerConfig.saslMechanismInterBrokerProtocol,
- brokerConfig.saslInterBrokerHandshakeRequestEnable
- )
- val selector = new Selector(
- NetworkReceive.UNLIMITED,
- brokerConfig.connectionsMaxIdleMs,
- metrics,
- time,
- "replica-fetcher",
- Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
- false,
- channelBuilder
- )
- new NetworkClient(
- selector,
- new ManualMetadataUpdater(),
- clientId,
- 1,
- 0,
- Selectable.USE_DEFAULT_BUFFER_SIZE,
- brokerConfig.replicaSocketReceiveBufferBytes,
- brokerConfig.requestTimeoutMs,
- time,
- false,
- new ApiVersions
- )
- }
+ private def epochCache(tp: TopicPartition): LeaderEpochCache = replicaMgr.getReplica(tp).get.epochs.get
override def shutdown(): Unit = {
super.shutdown()
- networkClient.close()
+ leaderEndpoint.close()
}
// process fetched data
@@ -132,7 +94,10 @@ class ReplicaFetcherThread(name: String,
if (logger.isTraceEnabled)
trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
+
+ // Append the leader's messages to the log
replica.log.get.append(records, assignOffsets = false)
+
if (logger.isTraceEnabled)
trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
@@ -179,8 +144,7 @@ class ReplicaFetcherThread(name: String,
*
* There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
*/
- val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP,
- brokerConfig.brokerId)
+ val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
@@ -222,8 +186,7 @@ class ReplicaFetcherThread(name: String,
* and the current leader's log start offset.
*
*/
- val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
- brokerConfig.brokerId)
+ val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
.format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
@@ -240,32 +203,14 @@ class ReplicaFetcherThread(name: String,
}
protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
- val clientResponse = sendRequest(fetchRequest.underlying)
+ val clientResponse = leaderEndpoint.sendRequest(fetchRequest.underlying)
val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
fetchResponse.responseData.asScala.toSeq.map { case (key, value) =>
key -> new PartitionData(value)
}
}
- private def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse = {
- try {
- if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
- throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
- else {
- val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder,
- time.milliseconds(), true)
- NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
- }
- }
- catch {
- case e: Throwable =>
- networkClient.close(sourceBroker.id.toString)
- throw e
- }
-
- }
-
- private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, replicaId: Int): Long = {
+ private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long): Long = {
val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long))
ListOffsetRequest.Builder.forReplica(1, replicaId).setTargetTimes(partitions.asJava)
@@ -273,7 +218,7 @@ class ReplicaFetcherThread(name: String,
val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1))
ListOffsetRequest.Builder.forReplica(0, replicaId).setOffsetData(partitions.asJava)
}
- val clientResponse = sendRequest(requestBuilder)
+ val clientResponse = leaderEndpoint.sendRequest(requestBuilder)
val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse]
val partitionData = response.responseData.get(topicPartition)
partitionData.error match {
@@ -286,12 +231,12 @@ class ReplicaFetcherThread(name: String,
}
}
- protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
+ override def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
// We will not include a replica in the fetch request if it should be throttled.
- if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition)) {
+ if (partitionFetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {
val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset
requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
}
@@ -303,6 +248,91 @@ class ReplicaFetcherThread(name: String,
}
/**
+ * - Truncate the log to the leader's offset for each partition's epoch.
+ * - If the leader's offset is greater, we stick with the Log End Offset
+ * otherwise we truncate to the leaders offset.
+ * - If the leader replied with undefined epoch offset we must use the high watermark
+ */
+ override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = {
+ val truncationPoints = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
+ val partitionsWithError = mutable.Set[TopicPartition]()
+
+ fetchedEpochs.foreach { case (tp, epochOffset) =>
+ val replica = replicaMgr.getReplica(tp).get
+
+ if (epochOffset.hasError) {
+ info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}")
+ partitionsWithError += tp
+ } else {
+ val truncationOffset =
+ if (epochOffset.endOffset() == UNDEFINED_EPOCH_OFFSET)
+ highWatermark(replica, epochOffset)
+ else if (epochOffset.endOffset() >= replica.logEndOffset.messageOffset)
+ logEndOffset(replica, epochOffset)
+ else
+ epochOffset.endOffset
+
+ truncationPoints.put(tp, truncationOffset)
+ }
+ }
+ replicaMgr.logManager.truncateTo(truncationPoints)
+
+ // For partitions that encountered an error, delay them a bit before retrying the leader epoch request
+ delayPartitions(partitionsWithError, brokerConfig.replicaFetchBackoffMs.toLong)
+
+ truncationPoints
+ }
+
+ override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = {
+ val result = allPartitions
+ .filter { case (_, state) => state.isTruncatingLog }
+ .map { case (tp, _) => tp -> epochCache(tp).latestUsedEpoch }.toMap
+
+ debug(s"Build leaderEpoch request $result for broker $sourceBroker")
+
+ result
+ }
+
+ override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
+ var result: Map[TopicPartition, EpochEndOffset] = null
+ if (shouldSendLeaderEpochRequest) {
+ val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava
+ val epochRequest = new OffsetsForLeaderEpochRequest.Builder(partitionsAsJava)
+ try {
+ val response = leaderEndpoint.sendRequest(epochRequest)
+ result = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
+ debug(s"Receive leaderEpoch response $result from broker $sourceBroker")
+ } catch {
+ case t: Throwable =>
+ warn(s"Error when sending leader epoch request for $partitions", t)
+
+ // if we get any unexpected exception, mark all partitions with an error
+ result = partitions.map { case (tp, _) =>
+ tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH_OFFSET)
+ }
+ }
+ } else {
+ // just generate a response with no error but UNDEFINED_OFFSET so that we can fall back to truncating using
+ // high watermark in maybeTruncate()
+ result = partitions.map { case (tp, _) =>
+ tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET)
+ }
+ }
+ result
+ }
+
+ private def logEndOffset(replica: Replica, epochOffset: EpochEndOffset): Long = {
+ val logEndOffset = replica.logEndOffset.messageOffset
+ info(s"Based on follower's leader epoch, leader replied with an offset ${epochOffset.endOffset()} >= the follower's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.")
+ logEndOffset
+ }
+
+ private def highWatermark(replica: Replica, epochOffset: EpochEndOffset): Long = {
+ warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. High watermark ${replica.highWatermark.messageOffset} will be used for truncation.")
+ replica.highWatermark.messageOffset
+ }
+
+ /**
* To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
*/
@@ -336,7 +366,5 @@ object ReplicaFetcherThread {
case Errors.NONE => None
case e => Some(e.exception)
}
-
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8f67425..b063b9e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -20,28 +20,34 @@ import java.io.{File, IOException}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import org.apache.kafka.common.errors._
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{Partition, Replica}
-import kafka.common._
import kafka.controller.KafkaController
import kafka.log.{Log, LogAppendInfo, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
-import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, PolicyViolationException}
-import org.apache.kafka.common.errors.{NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, DeleteRecordsRequest, DeleteRecordsResponse}
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import scala.collection._
import scala.collection.JavaConverters._
+import java.util.{Map => JMap}
+
+import kafka.common.{KafkaStorageException, Topic}
+import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.requests.EpochEndOffset._
/*
* Result metadata of a log append operation on the log
@@ -132,7 +138,7 @@ class ReplicaManager(val config: KafkaConfig,
private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManager)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
- val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
+ val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
private var hwThreadInitialized = false
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
@@ -316,9 +322,11 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- def getReplica(topicPartition: TopicPartition, replicaId: Int = localBrokerId): Option[Replica] =
+ def getReplica(topicPartition: TopicPartition, replicaId: Int): Option[Replica] =
getPartition(topicPartition).flatMap(_.getReplica(replicaId))
+ def getReplica(tp: TopicPartition): Option[Replica] = getReplica(tp, localBrokerId)
+
/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied
@@ -974,9 +982,6 @@ class ReplicaManager(val config: KafkaConfig,
.format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
}
- logManager.truncateTo(partitionsToMakeFollower.map { partition =>
- (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
- }.toMap)
partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
tryCompleteDelayedProduce(topicPartitionOperationKey)
@@ -1089,4 +1094,22 @@ class ReplicaManager(val config: KafkaConfig,
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
}
+ def getResponseFor(requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = {
+ OffsetsForLeaderEpoch.getResponseFor(this, requestedEpochInfo)
+ }
}
+
+object OffsetsForLeaderEpoch extends Logging {
+ def getResponseFor(replicaManager: ReplicaManager, requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = {
+ debug(s"Processing OffsetForEpochRequest: $requestedEpochInfo")
+ requestedEpochInfo.asScala.map { case (tp, epoch) =>
+ val offset = try {
+ new EpochEndOffset(NONE, replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch))
+ } catch {
+ case e: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+ case e: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+ }
+ (tp, offset)
+ }.toMap.asJava
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
new file mode 100644
index 0000000..890dde0
--- /dev/null
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.checkpoints
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileSystems, Paths}
+import kafka.utils.{Exit, Logging}
+import org.apache.kafka.common.utils.Utils
+import scala.collection.{Seq, mutable}
+
+trait CheckpointFileFormatter[T]{
+ def toLine(entry: T): String
+
+ def fromLine(line: String): Option[T]
+}
+
+class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileFormatter[T]) extends Logging {
+ private val path = file.toPath.toAbsolutePath
+ private val tempPath = Paths.get(path.toString + ".tmp")
+ private val lock = new Object()
+ file.createNewFile()
+
+ def write(entries: Seq[T]) {
+ lock synchronized {
+ // write to temp file and then swap with the existing file
+ val fileOutputStream = new FileOutputStream(tempPath.toFile)
+ val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
+ try {
+ writer.write(version.toString)
+ writer.newLine()
+
+ writer.write(entries.size.toString)
+ writer.newLine()
+
+ entries.foreach { entry =>
+ writer.write(formatter.toLine(entry))
+ writer.newLine()
+ }
+
+ writer.flush()
+ fileOutputStream.getFD().sync()
+ } catch {
+ case e: FileNotFoundException =>
+ if (FileSystems.getDefault.isReadOnly) {
+ fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
+ Exit.halt(1)
+ }
+ throw e
+ } finally {
+ writer.close()
+ }
+
+ Utils.atomicMoveWithFallback(tempPath, path)
+ }
+ }
+
+ def read(): Seq[T] = {
+ def malformedLineException(line: String) =
+ new IOException(s"Malformed line in offset checkpoint file: $line'")
+
+ lock synchronized {
+ val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
+ var line: String = null
+ try {
+ line = reader.readLine()
+ if (line == null)
+ return Seq.empty
+ val version = line.toInt
+ version match {
+ case version =>
+ line = reader.readLine()
+ if (line == null)
+ return Seq.empty
+ val expectedSize = line.toInt
+ val entries = mutable.Buffer[T]()
+ line = reader.readLine()
+ while (line != null) {
+ val entry = formatter.fromLine(line)
+ entry match {
+ case Some(e) =>
+ entries += e
+ line = reader.readLine()
+ case _ => throw malformedLineException(line)
+ }
+ }
+ if (entries.size != expectedSize)
+ throw new IOException(s"Expected $expectedSize entries but found only ${entries.size}")
+ entries
+ case _ =>
+ throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
+ }
+ } catch {
+ case _: NumberFormatException => throw malformedLineException(line)
+ } finally {
+ reader.close()
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
new file mode 100644
index 0000000..9de7564
--- /dev/null
+++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.checkpoints
+
+import java.io._
+import java.util.regex.Pattern
+
+import kafka.server.checkpoints.LeaderEpochCheckpointConstants.{CurrentVersion, WhiteSpacesPattern}
+import kafka.server.epoch.EpochEntry
+
+import scala.collection._
+
+trait LeaderEpochCheckpoint {
+ def write(epochs: Seq[EpochEntry])
+ def read(): Seq[EpochEntry]
+}
+
+object LeaderEpochFile {
+ private val LeaderEpochCheckpointFilename = "leader-epoch-checkpoint"
+ def newFile(dir: File) = {new File(dir, LeaderEpochCheckpointFilename)}
+}
+
+private object LeaderEpochCheckpointConstants {
+ val WhiteSpacesPattern = Pattern.compile("\\s+")
+ val CurrentVersion = 0
+}
+
+/**
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
+ */
+class LeaderEpochCheckpointFile(val file: File) extends CheckpointFileFormatter[EpochEntry] with LeaderEpochCheckpoint {
+ val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, this)
+
+ override def toLine(entry: EpochEntry): String = {
+ s"${entry.epoch} ${entry.startOffset}"
+ }
+
+ override def fromLine(line: String): Option[EpochEntry] = {
+ WhiteSpacesPattern.split(line) match {
+ case Array(epoch, offset) =>
+ Some(EpochEntry(epoch.toInt, offset.toLong))
+ case _ => None
+ }
+ }
+
+ def write(epochs: Seq[EpochEntry]) = {
+ checkpoint.write(epochs)
+ }
+
+ def read(): Seq[EpochEntry] = {
+ checkpoint.read()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
new file mode 100644
index 0000000..12ec986
--- /dev/null
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.checkpoints
+
+import java.io._
+import java.util.regex.Pattern
+import kafka.server.epoch.EpochEntry
+import org.apache.kafka.common.TopicPartition
+import scala.collection._
+
+private object OffsetCheckpointConstants {
+ val WhiteSpacesPattern = Pattern.compile("\\s+")
+ val CurrentVersion = 0
+}
+
+trait OffsetCheckpoint {
+ def write(epochs: Seq[EpochEntry])
+ def read(): Seq[EpochEntry]
+}
+
+/**
+ * This class persists a map of (Partition => Offsets) to a file (for a certain replica)
+ */
+class OffsetCheckpointFile(val f: File) extends CheckpointFileFormatter[(TopicPartition, Long)] {
+ val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointConstants.CurrentVersion, this)
+
+ override def toLine(entry: (TopicPartition, Long)): String = {
+ s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
+ }
+
+ override def fromLine(line: String): Option[(TopicPartition, Long)] = {
+ OffsetCheckpointConstants.WhiteSpacesPattern.split(line) match {
+ case Array(topic, partition, offset) =>
+ Some(new TopicPartition(topic, partition.toInt), offset.toLong)
+ case _ => None
+ }
+ }
+
+ def write(offsets: Map[TopicPartition, Long]) = {
+ checkpoint.write(offsets.toSeq)
+ }
+
+ def read(): Map[TopicPartition, Long] = {
+ checkpoint.read().toMap
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
new file mode 100644
index 0000000..4a4727e
--- /dev/null
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.epoch
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.server.LogOffsetMetadata
+import kafka.server.checkpoints.LeaderEpochCheckpoint
+import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
+import kafka.utils.CoreUtils._
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.EpochEndOffset
+
+import scala.collection.mutable.ListBuffer
+
+trait LeaderEpochCache {
+ def cacheLatestEpoch(leaderEpoch: Int)
+ def maybeAssignLatestCachedEpochToLeo()
+ def assign(leaderEpoch: Int, offset: Long)
+ def latestUsedEpoch(): Int
+ def endOffsetFor(epoch: Int): Long
+ def clearLatest(offset: Long)
+ def clearEarliest(offset: Long)
+ def clear()
+}
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
+ *
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ *
+ * @param leo a function that determines the log end offset
+ * @param checkpoint the checkpoint file
+ */
+class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
+ private val lock = new ReentrantReadWriteLock()
+ private var epochs: ListBuffer[EpochEntry] = lock synchronized { ListBuffer(checkpoint.read(): _*) }
+ private var cachedLatestEpoch: Option[Int] = None //epoch which has yet to be assigned to a message.
+
+ /**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ *
+ * @param epoch
+ * @param offset
+ */
+ override def assign(epoch: Int, offset: Long): Unit = {
+ inWriteLock(lock) {
+ if (epoch >= 0 && epoch > latestUsedEpoch && offset >= latestOffset) {
+ info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.")
+ epochs += EpochEntry(epoch, offset)
+ flush()
+ } else {
+ maybeWarn(epoch, offset)
+ }
+ }
+ }
+
+ /**
+ * Returns the current Leader Epoch. This is the latest epoch
+ * which has messages assigned to it.
+ *
+ * @return
+ */
+ override def latestUsedEpoch(): Int = {
+ inReadLock(lock) {
+ if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
+ }
+ }
+
+ /**
+ * Returns the End Offset for a requested Leader Epoch.
+ *
+ * This is defined as the start offset of the first Leader Epoch larger than the
+ * Leader Epoch requested, or else the Log End Offset if the latest epoch was requested.
+ *
+ * @param requestedEpoch
+ * @return offset
+ */
+ override def endOffsetFor(requestedEpoch: Int): Long = {
+ inReadLock(lock) {
+ val offset =
+ if (requestedEpoch == latestUsedEpoch) {
+ leo().messageOffset
+ }
+ else {
+ val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch)
+ if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
+ UNDEFINED_EPOCH_OFFSET
+ else
+ subsequentEpochs.head.startOffset
+ }
+ debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning offset $offset from epoch list of size ${epochs.size}")
+ offset
+ }
+ }
+
+ /**
+ * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
+ *
+ * @param offset
+ */
+ override def clearLatest(offset: Long): Unit = {
+ inWriteLock(lock) {
+ val before = epochs
+ if (offset >= 0 && offset <= latestOffset()) {
+ epochs = epochs.filter(entry => entry.startOffset < offset)
+ flush()
+ info(s"Cleared latest ${before.toSet.filterNot(epochs.toSet)} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+ }
+ }
+ }
+
+ /**
+ * Clears old epoch entries. This method searches for the oldest epoch < offset, updates the saved epoch offset to
+ * be offset, then clears any previous epoch entries.
+ *
+ * This method is exclusive: so clearEarliest(6) will retain an entry at offset 6.
+ *
+ * @param offset the offset to clear up to
+ */
+ override def clearEarliest(offset: Long): Unit = {
+ inWriteLock(lock) {
+ val before = epochs
+ if (offset >= 0 && earliestOffset() < offset) {
+ val earliest = epochs.filter(entry => entry.startOffset < offset)
+ if (earliest.size > 0) {
+ epochs = epochs --= earliest
+ //If the offset is less than the earliest offset remaining, add previous epoch back, but with an updated offset
+ if (offset < earliestOffset() || epochs.isEmpty)
+ new EpochEntry(earliest.last.epoch, offset) +=: epochs
+ flush()
+ info(s"Cleared earliest ${before.toSet.filterNot(epochs.toSet).size} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+ }
+ }
+ }
+ }
+
+ /**
+ * Delete all entries.
+ */
+ override def clear() = {
+ inWriteLock(lock) {
+ epochs.clear()
+ flush()
+ }
+ }
+
+ def epochEntries(): ListBuffer[EpochEntry] = {
+ epochs
+ }
+
+ private def earliestOffset(): Long = {
+ if (epochs.isEmpty) -1 else epochs.head.startOffset
+ }
+
+ private def latestOffset(): Long = {
+ if (epochs.isEmpty) -1 else epochs.last.startOffset
+ }
+
+ private def flush(): Unit = {
+ checkpoint.write(epochs)
+ }
+
+ def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Latest: {epoch:$latestUsedEpoch, offset$latestOffset} for Partition: $topicPartition"
+
+ def maybeWarn(epoch: Int, offset: Long) = {
+ if (epoch < latestUsedEpoch())
+ warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " +
+ s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
+ else if (epoch < 0)
+ warn(s"Received an PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")
+ else if (offset < latestOffset() && epoch >= 0)
+ warn(s"Received an PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " +
+ s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
+ }
+
+ /**
+ * Registers a PartitionLeaderEpoch (typically in response to a leadership change).
+ * This will be cached until {@link #maybeAssignLatestCachedEpochToLeo} is called.
+ *
+ * This allows us to register an epoch in response to a leadership change, but not persist
+ * that epoch until a message arrives and is stamped. This asigns the aassignment of leadership
+ * on leader and follower, for eases debugability.
+ *
+ * @param epoch
+ */
+ override def cacheLatestEpoch(epoch: Int) = {
+ inWriteLock(lock) {
+ cachedLatestEpoch = Some(epoch)
+ }
+ }
+
+ /**
+ * If there is a cached epoch, associate its start offset with the current log end offset if it's not in the epoch list yet.
+ */
+ override def maybeAssignLatestCachedEpochToLeo() = {
+ inWriteLock(lock) {
+ if (cachedLatestEpoch == None) error("Attempt to assign log end offset to epoch before epoch has been set. This should never happen.")
+ cachedLatestEpoch.foreach { epoch =>
+ assign(epoch, leo().messageOffset)
+ }
+ }
+ }
+}
+
+// Mapping of epoch to the first offset of the subsequent epoch
+case class EpochEntry(epoch: Int, startOffset: Long)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e4cece9..ec3eb88 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -102,7 +102,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse],
ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse],
ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse],
- ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse]
+ ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse],
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse]
)
val RequestKeyToError = Map[ApiKeys, (Nothing) => Errors](
@@ -122,7 +123,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.error),
ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error),
- ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2)
+ ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2),
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.asScala.get(tp).get.error())
)
val RequestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -142,7 +144,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.STOP_REPLICA -> ClusterAcl,
ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ClusterAcl,
ApiKeys.CREATE_TOPICS -> ClusterCreateAcl,
- ApiKeys.DELETE_TOPICS -> TopicDeleteAcl
+ ApiKeys.DELETE_TOPICS -> TopicDeleteAcl,
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ClusterAcl
)
@Before
@@ -201,6 +204,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
build()
}
+ private def offsetsForLeaderEpochRequest = {
+ new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build()
+}
+
private def createOffsetFetchRequest = {
new requests.OffsetFetchRequest.Builder(group, List(tp).asJava).build()
}
@@ -285,7 +292,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.STOP_REPLICA -> createStopReplicaRequest,
ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest,
ApiKeys.CREATE_TOPICS -> createTopicsRequest,
- ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
+ ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
)
for ((key, request) <- requestKeyToRequest) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index d885d9b..6a4c552 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -112,6 +112,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
@Before
override def setUp() {
super.setUp
+ MockDeserializer.resetStaticVariables
// create the consumer offset topic
TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
}
@@ -163,10 +164,10 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
isValidClusterId(MockConsumerInterceptor.CLUSTER_META.get.clusterId)
assertEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get.clusterId, MockConsumerInterceptor.CLUSTER_META.get.clusterId)
- assertNotEquals(MockDeserializer.CLUSTER_ID_BEFORE_DESERIALIZE, MockDeserializer.NO_CLUSTER_ID)
- assertNotNull(MockDeserializer.CLUSTER_META)
- isValidClusterId(MockDeserializer.CLUSTER_META.get.clusterId)
- assertEquals(MockDeserializer.CLUSTER_ID_BEFORE_DESERIALIZE.get.clusterId, MockDeserializer.CLUSTER_META.get.clusterId)
+ assertNotEquals(MockDeserializer.clusterIdBeforeDeserialize, MockDeserializer.noClusterId)
+ assertNotNull(MockDeserializer.clusterMeta)
+ isValidClusterId(MockDeserializer.clusterMeta.get.clusterId)
+ assertEquals(MockDeserializer.clusterIdBeforeDeserialize.get.clusterId, MockDeserializer.clusterMeta.get.clusterId)
assertNotNull(MockConsumerMetricsReporter.CLUSTER_META)
isValidClusterId(MockConsumerMetricsReporter.CLUSTER_META.get.clusterId)
@@ -175,7 +176,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockSerializer.CLUSTER_META.get.clusterId)
assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockProducerMetricsReporter.CLUSTER_META.get.clusterId)
assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockConsumerInterceptor.CLUSTER_META.get.clusterId)
- assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockDeserializer.CLUSTER_META.get.clusterId)
+ assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockDeserializer.clusterMeta.get.clusterId)
assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockConsumerMetricsReporter.CLUSTER_META.get.clusterId)
assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockBrokerMetricsReporter.CLUSTER_META.get.clusterId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 874637b..b165918 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -17,15 +17,14 @@
package kafka.api.test
-import java.util.{Properties, Collection, ArrayList}
+import java.util.{ArrayList, Collection, Properties}
import org.junit.runners.Parameterized
import org.junit.runner.RunWith
import org.junit.runners.Parameterized.Parameters
import org.junit.{After, Before, Test}
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.junit.Assert._
-
import kafka.api.FetchRequestBuilder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.consumer.SimpleConsumer
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 2104842..5aeea89 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -20,8 +20,9 @@ package kafka.log
import java.io.File
import java.util.Properties
-import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0, KAFKA_0_11_0_IV0}
-import kafka.server.OffsetCheckpoint
+import kafka.api.KAFKA_0_11_0_IV0
+import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
+import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record._
@@ -87,7 +88,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
// and make sure its gone from checkpoint file
cleaner.logs.remove(topics(0))
cleaner.updateCheckpoints(logDir)
- val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read()
+ val checkpoints = new OffsetCheckpointFile(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read()
// we expect partition 0 to be gone
assertFalse(checkpoints.contains(topics(0)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 1400615..1248d1a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,7 +21,7 @@ import java.io._
import java.util.Properties
import kafka.common._
-import kafka.server.OffsetCheckpoint
+import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.OffsetOutOfRangeException
@@ -102,7 +102,9 @@ class LogManagerTest {
time.sleep(maxLogAgeMs + 1)
assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
time.sleep(log.config.fileDeleteDelayMs + 1)
- assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
+
+ //There should be a log file, two indexes, the leader epoch checkpoint and the pid snapshot dir
+ assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes)
try {
@@ -148,7 +150,9 @@ class LogManagerTest {
time.sleep(logManager.InitialTaskDelayMs)
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
time.sleep(log.config.fileDeleteDelayMs + 1)
- assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
+
+ //There should be a log file, two indexes, the leader epoch checkpoint and the pid snapshot dir
+ assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes)
try {
log.read(0, 1024)
@@ -288,7 +292,7 @@ class LogManagerTest {
})
logManager.checkpointRecoveryPointOffsets()
- val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
+ val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
topicPartitions.zip(logs).foreach {
case(tp, log) => {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 25b3480..3f531d9 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -346,4 +346,29 @@ class LogSegmentTest {
assertEquals(oldSize, size)
assertEquals(size, fileSize)
}
+
+ @Test
+ def shouldTruncateEvenIfOffsetPointsToAGapInTheLog() {
+ val seg = createSegment(40)
+ val offset = 40
+
+ def records(offset: Long, record: String): MemoryRecords =
+ MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset, CompressionType.NONE, TimestampType.CREATE_TIME,
+ new SimpleRecord(offset * 1000, record.getBytes))
+
+ //Given two messages with a gap between them (e.g. mid offset compacted away)
+ val ms1 = records(offset, "first message")
+ seg.append(offset, offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
+ val ms2 = records(offset + 3, "message after gap")
+ seg.append(offset + 3, offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+
+ // When we truncate to an offset without a corresponding log entry
+ seg.truncateTo(offset + 1)
+
+ //Then we should still truncate the record that was present (i.e. offset + 3 is gone)
+ val log = seg.read(offset, None, 10000)
+ assertEquals(offset, log.records.batches.iterator.next().baseOffset())
+ assertEquals(1, log.records.batches.asScala.size)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a7af24e..4fcf1c3 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -28,10 +28,14 @@ import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils._
import kafka.server.KafkaConfig
-import org.apache.kafka.common.record._
+import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
+import org.apache.kafka.common.record.{RecordBatch, _}
import org.apache.kafka.common.utils.Utils
+import org.easymock.EasyMock
+import org.easymock.EasyMock._
import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
class LogTest extends JUnitSuite {
@@ -1235,7 +1239,7 @@ class LogTest extends JUnitSuite {
val config = LogConfig(logProps)
val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
val recoveryPoint = 50L
- for (_ <- 0 until 50) {
+ for (_ <- 0 until 10) {
// create a log and write some messages to it
logDir.mkdirs()
var log = new Log(logDir,
@@ -1441,9 +1445,14 @@ class LogTest extends JUnitSuite {
for (_ <- 0 until 100)
log.append(set)
+ log.leaderEpochCache.assign(0, 40)
+ log.leaderEpochCache.assign(1, 90)
+
// expire all segments
log.deleteOldSegments()
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+ assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+ assertEquals("Epoch entry should be the latest epoch and the leo.", new EpochEntry(1, 100), epochCache(log).epochEntries().head)
// append some messages to create some segments
for (_ <- 0 until 100)
@@ -1452,6 +1461,8 @@ class LogTest extends JUnitSuite {
log.delete()
assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
+ assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+
}
@Test
@@ -1480,6 +1491,10 @@ class LogTest extends JUnitSuite {
assertEquals(log.logStartOffset, 15)
}
+ def epochCache(log: Log): LeaderEpochFileCache = {
+ log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache]
+ }
+
@Test
def shouldDeleteSizeBasedSegments() {
val set = TestUtils.singletonRecords("test".getBytes)
@@ -1566,6 +1581,205 @@ class LogTest extends JUnitSuite {
assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
}
+ @Test
+ def shouldApplyEpochToMessageOnAppendIfLeader() {
+ val messageIds = (0 until 50).toArray
+ val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
+
+ //Given this partition is on leader epoch 72
+ val epoch = 72
+ val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+
+ //When appending messages as a leader (i.e. assignOffsets = true)
+ for (i <- records.indices)
+ log.append(
+ MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)),
+ leaderEpochCache = mockCache(epoch),
+ assignOffsets = true
+ )
+
+ //Then leader epoch should be set on messages
+ for (i <- records.indices) {
+ val read = log.read(i, 100, Some(i+1)).records.batches().iterator.next()
+ assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch())
+ }
+ }
+
+ @Test
+ def followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache() {
+ val messageIds = (0 until 50).toArray
+ val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
+
+ val cache = createMock(classOf[LeaderEpochCache])
+
+ //Given each message has an offset & epoch, as msgs from leader would
+ def recordsForEpoch(i: Int): MemoryRecords = {
+ val recs = MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i))
+ recs.batches().asScala.foreach{record =>
+ record.setPartitionLeaderEpoch(42)
+ record.setLastOffset(i)
+ }
+ recs
+ }
+
+ //Verify we save the epoch to the cache.
+ expect(cache.assign(EasyMock.eq(42), anyInt())).times(records.size)
+ replay(cache)
+
+ val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+
+ //When appending as follower (assignOffsets = false)
+ for (i <- records.indices)
+ log.append(recordsForEpoch(i), assignOffsets = false, leaderEpochCache = cache)
+
+ verify(cache)
+ }
+
+ @Test
+ def shouldTruncateLeaderEpochsWhenDeletingSegments() {
+ val set = TestUtils.singletonRecords("test".getBytes)
+ val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+ val cache = epochCache(log)
+
+ // Given three segments of 5 messages each
+ for (e <- 0 until 15) {
+ log.append(set)
+ }
+
+ //Given epochs
+ cache.assign(0, 0)
+ cache.assign(1, 5)
+ cache.assign(2, 10)
+
+ //When first segment is removed
+ log.deleteOldSegments
+
+ //The oldest epoch entry should have been removed
+ assertEquals(ListBuffer(EpochEntry(1, 5), EpochEntry(2, 10)), cache.epochEntries)
+ }
+
+ @Test
+ def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() {
+ val set = TestUtils.singletonRecords("test".getBytes)
+ val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+ val cache = epochCache(log)
+
+ // Given three segments of 5 messages each
+ for (e <- 0 until 15) {
+ log.append(set)
+ }
+
+ //Given epochs
+ cache.assign(0, 0)
+ cache.assign(1, 7)
+ cache.assign(2, 10)
+
+ //When first segment removed (up to offset 5)
+ log.deleteOldSegments
+
+ //The the first entry should have gone from (0,0) => (0,5)
+ assertEquals(ListBuffer(EpochEntry(0, 5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
+ }
+
+ @Test
+ def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
+ val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * set.sizeInBytes).toString)
+ val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+ val cache = epochCache(log)
+
+ //Given 2 segments, 10 messages per segment
+ for (epoch <- 1 to 20)
+ log.append(set)
+
+ //Simulate some leader changes at specific offsets
+ cache.assign(0, 0)
+ cache.assign(1, 10)
+ cache.assign(2, 16)
+
+ assertEquals(2, log.numberOfSegments)
+ assertEquals(20, log.logEndOffset)
+
+ //When truncate to LEO (no op)
+ log.truncateTo(log.logEndOffset)
+
+ //Then no change
+ assertEquals(3, cache.epochEntries().size)
+
+ //When truncate
+ log.truncateTo(11)
+
+ //Then no change
+ assertEquals(2, cache.epochEntries().size)
+
+ //When truncate
+ log.truncateTo(10)
+
+ //Then
+ assertEquals(1, cache.epochEntries().size)
+
+ //When truncate all
+ log.truncateTo(0)
+
+ //Then
+ assertEquals(0, cache.epochEntries().size)
+ }
+
+ /**
+ * Append a bunch of messages to a log and then re-open it with recovery and check that the leader epochs are recovered properly.
+ */
+ @Test
+ def testLogRecoversForLeaderEpoch() {
+ val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+ val leaderEpochCache = epochCache(log)
+ val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0)
+ log.append(records = firstBatch, assignOffsets = false)
+
+ val secondBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 2, offset = 1)
+ log.append(records = secondBatch, assignOffsets = false)
+
+ val thirdBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 2, offset = 2)
+ log.append(records = thirdBatch, assignOffsets = false)
+
+ val fourthBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 3, offset = 3)
+ log.append(records = fourthBatch, assignOffsets = false)
+
+ assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
+
+ // deliberately remove some of the epoch entries
+ leaderEpochCache.clearLatest(2)
+ assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
+ log.close()
+
+ // reopen the log and recover from the beginning
+ val recoveredLog = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+ val recoveredLeaderEpochCache = epochCache(recoveredLog)
+
+ // epoch entries should be recovered
+ assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries)
+ recoveredLog.close()
+ }
+
+ /**
+ * Wrap a single record log buffer with leader epoch.
+ */
+ private def singletonRecordsWithLeaderEpoch(value: Array[Byte],
+ key: Array[Byte] = null,
+ leaderEpoch: Int,
+ offset: Long,
+ codec: CompressionType = CompressionType.NONE,
+ timestamp: Long = RecordBatch.NO_TIMESTAMP,
+ magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): MemoryRecords = {
+ val records = Seq(new SimpleRecord(timestamp, key, value))
+
+ val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+ val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, offset,
+ System.currentTimeMillis, leaderEpoch)
+ records.foreach(builder.append)
+ builder.build()
+ }
+
+
def createLog(messageSizeInBytes: Int, retentionMs: Int = -1,
retentionBytes: Int = -1, cleanupPolicy: String = "delete"): Log = {
val logProps = new Properties()
@@ -1583,4 +1797,11 @@ class LogTest extends JUnitSuite {
time = time)
log
}
+
+ private def mockCache(epoch: Int) = {
+ val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
+ EasyMock.expect(cache.latestUsedEpoch()).andReturn(epoch).anyTimes()
+ EasyMock.replay(cache)
+ cache
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 3babfc8..00cda21 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -24,11 +24,12 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.EpochEndOffset
import org.junit.Assert.{assertFalse, assertTrue}
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, Set, mutable}
class AbstractFetcherThreadTest {
@@ -104,7 +105,7 @@ class AbstractFetcherThreadTest {
clientId: String,
sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0)
- extends AbstractFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
+ extends AbstractFetcherThread(name, clientId, sourceBroker, fetchBackOffMs, isInterruptible = true, includeLogTruncation = false) {
type REQ = DummyFetchRequest
type PD = PartitionData
@@ -122,6 +123,12 @@ class AbstractFetcherThreadTest {
override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.fetchOffset) }.toMap)
+
+ override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = { Map() }
+
+ override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() }
+
+ override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() }
}
@@ -137,7 +144,7 @@ class AbstractFetcherThreadTest {
fetcherThread.addPartitions(Map(partition -> 0L))
// Wait until fetcherThread finishes the work
- TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed waiting for fetcherThread tp finish the work")
+ TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed waiting for fetcherThread to finish the work")
fetcherThread.shutdown()
@@ -198,7 +205,7 @@ class AbstractFetcherThreadTest {
val requestMap = new mutable.HashMap[TopicPartition, Long]
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
// Add backoff delay check
- if (partitionFetchState.isActive)
+ if (partitionFetchState.isReadyForFetch)
requestMap.put(topicPartition, partitionFetchState.fetchOffset)
}
new DummyFetchRequest(requestMap)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 3898d2b..0c62a50 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.{Partition, Replica}
import kafka.log.Log
+import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
+import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
@@ -205,6 +207,9 @@ class IsrExpirationTest {
private def logMock: Log = {
val log = EasyMock.createMock(classOf[kafka.log.Log])
+ val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
+ EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
+ EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
EasyMock.replay(log)
log
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 40ac7ec..54cee6b 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -23,6 +23,7 @@ import TestUtils._
import kafka.zk.ZooKeeperTestHarness
import java.io.File
+import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
@@ -56,8 +57,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
val message = "hello"
var producer: KafkaProducer[Integer, String] = null
- def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename))
- def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename))
+ def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename))
+ def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename))
var servers = Seq.empty[KafkaServer]
// Some tests restart the brokers then produce more data. But since test brokers use random ports, we need
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
new file mode 100644
index 0000000..d4118c1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.log.LogManager
+import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.requests.EpochEndOffset._
+import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.requests.EpochEndOffset
+import org.apache.kafka.common.utils.SystemTime
+import org.easymock.EasyMock._
+import org.easymock.{Capture, CaptureType}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.{Map, mutable}
+
+class ReplicaFetcherThreadTest {
+
+ private val t1p0 = new TopicPartition("topic1", 0)
+ private val t1p1 = new TopicPartition("topic1", 1)
+ private val t2p1 = new TopicPartition("topic2", 1)
+
+ @Test
+ def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = {
+ val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
+ props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
+ val config = KafkaConfig.fromProps(props)
+ val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+ val thread = new ReplicaFetcherThread(
+ name = "bob",
+ fetcherId = 0,
+ sourceBroker = endPoint,
+ brokerConfig = config,
+ replicaMgr = null,
+ metrics = new Metrics(),
+ time = new SystemTime(),
+ quota = null,
+ leaderEndpointBlockingSend = None)
+
+ val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
+
+ val expected = Map(
+ t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET),
+ t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET)
+ )
+
+ assertEquals("results from leader epoch request should have undefined offset", expected, result)
+ }
+
+ @Test
+ def shouldHandleExceptionFromBlockingSend(): Unit = {
+ val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+ val config = KafkaConfig.fromProps(props)
+ val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+ val mockBlockingSend = createMock(classOf[BlockingSend])
+
+ expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once()
+ replay(mockBlockingSend)
+
+ val thread = new ReplicaFetcherThread(
+ name = "bob",
+ fetcherId = 0,
+ sourceBroker = endPoint,
+ brokerConfig = config,
+ replicaMgr = null,
+ metrics = new Metrics(),
+ time = new SystemTime(),
+ quota = null,
+ leaderEndpointBlockingSend = Some(mockBlockingSend))
+
+ val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
+
+ val expected = Map(
+ t1p0 -> new EpochEndOffset(Errors.UNKNOWN, UNDEFINED_EPOCH_OFFSET),
+ t1p1 -> new EpochEndOffset(Errors.UNKNOWN, UNDEFINED_EPOCH_OFFSET)
+ )
+
+ assertEquals("results from leader epoch request should have undefined offset", expected, result)
+ verify(mockBlockingSend)
+ }
+
+ @Test
+ def shouldFetchLeaderEpochOnFirstFetchOnly(): Unit = {
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+
+ //Setup all dependencies
+ val quota = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+ val logManager = createMock(classOf[LogManager])
+ val replica = createNiceMock(classOf[Replica])
+ val replicaManager = createMock(classOf[ReplicaManager])
+
+ //Stubs
+ expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+ expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
+ expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+ expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+ stub(replica, replicaManager)
+
+
+ //Expectations
+ expect(logManager.truncateTo(anyObject())).once
+
+ replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+ //Define the offsets for the OffsetsForLeaderEpochResponse
+ val offsets = Map(t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1)).asJava
+
+ //Create the fetcher thread
+ val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+ val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, endPoint, new SystemTime())
+ val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+ thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+ //Loop 1
+ thread.doWork()
+ assertEquals(1, mockNetwork.epochFetchCount)
+ assertEquals(1, mockNetwork.fetchCount)
+
+ //Loop 2 we should not fetch epochs
+ thread.doWork()
+ assertEquals(1, mockNetwork.epochFetchCount)
+ assertEquals(2, mockNetwork.fetchCount)
+
+ //Loop 3 we should not fetch epochs
+ thread.doWork()
+ assertEquals(1, mockNetwork.epochFetchCount)
+ assertEquals(3, mockNetwork.fetchCount)
+
+ //Assert that truncate to is called exactly once (despite two loops)
+ verify(logManager)
+ }
+
+ @Test
+ def shouldTruncateToOffsetSpecifiedInEpochOffsetResponse(): Unit = {
+
+ //Create a capture to track what partitions/offsets are truncated
+ val truncateToCapture: Capture[Map[TopicPartition, Long]] = newCapture(CaptureType.ALL)
+
+ // Setup all the dependencies
+ val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
+ val quota = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs = createMock(classOf[LeaderEpochCache])
+ val logManager = createMock(classOf[LogManager])
+ val replica = createNiceMock(classOf[Replica])
+ val replicaManager = createMock(classOf[ReplicaManager])
+
+ val initialLEO = 200
+
+ //Stubs
+ expect(logManager.truncateTo(capture(truncateToCapture))).once
+ expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+ expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
+ expect(leaderEpochs.latestUsedEpoch).andReturn(5).anyTimes()
+ expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+ stub(replica, replicaManager)
+
+
+ replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+ //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
+ val offsetsReply = Map(t1p0 -> new EpochEndOffset(156), t2p1 -> new EpochEndOffset(172)).asJava
+
+ //Create the thread
+ val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+ val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+ val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+ thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
+
+ //Run it
+ thread.doWork()
+
+ //We should have truncated to the offsets in the response
+ assertEquals(156, truncateToCapture.getValue.get(t1p0).get)
+ assertEquals(172, truncateToCapture.getValue.get(t2p1).get)
+ }
+
+ @Test
+ def shouldTruncateToHighWatermarkIfLeaderReturnsUndefinedOffset(): Unit = {
+
+ //Create a capture to track what partitions/offsets are truncated
+ val truncated: Capture[Map[TopicPartition, Long]] = newCapture(CaptureType.ALL)
+
+ // Setup all the dependencies
+ val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
+ val quota = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+ val logManager = createMock(classOf[LogManager])
+ val replica = createNiceMock(classOf[Replica])
+ val replicaManager = createMock(classOf[ReplicaManager])
+
+ val highWaterMark = 100
+ val initialLeo = 300
+
+ //Stubs
+ expect(replica.highWatermark).andReturn(new LogOffsetMetadata(highWaterMark)).anyTimes()
+ expect(logManager.truncateTo(capture(truncated))).once
+ expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+ expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes()
+ expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+ expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+ stub(replica, replicaManager)
+ replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+ //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
+ val offsetsReply = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava
+
+ //Create the thread
+ val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+ val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+ val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+ thread.addPartitions(Map(t1p0 -> 0))
+
+ //Run it
+ thread.doWork()
+
+ //We should have truncated to the highwatermark for partitino 2 only
+ assertEquals(highWaterMark, truncated.getValue.get(t1p0).get)
+ }
+
+ @Test
+ def shouldPollIndefinitelyIfLeaderReturnsAnyException(): Unit = {
+
+ //Create a capture to track what partitions/offsets are truncated
+ val truncated: Capture[Map[TopicPartition, Long]] = newCapture(CaptureType.ALL)
+
+ // Setup all the dependencies
+ val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
+ val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
+ val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+ val logManager = createMock(classOf[kafka.log.LogManager])
+ val replica = createNiceMock(classOf[Replica])
+ val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+
+ val highWaterMark = 100
+ val initialLeo = 300
+
+ //Stubs
+ expect(replica.highWatermark).andReturn(new LogOffsetMetadata(highWaterMark)).anyTimes()
+ expect(logManager.truncateTo(capture(truncated))).anyTimes()
+ expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+ expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes()
+ expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+ expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+ stub(replica, replicaManager)
+ replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+ //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
+ val offsetsReply = mutable.Map(
+ t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
+ t1p1 -> new EpochEndOffset(UNKNOWN, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
+ ).asJava
+
+ //Create the thread
+ val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+ val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+ val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+ thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
+
+ //Run thread 3 times
+ (0 to 3).foreach { _ =>
+ thread.doWork()
+ }
+
+ //Then should loop continuously while there is no leader
+ for (invocation <- truncated.getValues().asScala)
+ assertEquals(0, invocation.size)
+
+ //New leader elected and replies
+ offsetsReply.put(t1p0, new EpochEndOffset(156))
+
+ thread.doWork()
+
+ //Now the final call should have actually done a truncation (to offset 156)
+ assertEquals(156, truncated.getValues.asScala.last.get(t1p0).get)
+ }
+
+ @Test
+ def shouldMovePartitionsOutOfTruncatingLogState(): Unit = {
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+
+ //Setup all stubs
+ val quota = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+ val logManager = createNiceMock(classOf[LogManager])
+ val replica = createNiceMock(classOf[Replica])
+ val replicaManager = createNiceMock(classOf[ReplicaManager])
+
+ //Stub return values
+ expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+ expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
+ expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+ expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+ stub(replica, replicaManager)
+
+ replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+ //Define the offsets for the OffsetsForLeaderEpochResponse
+ val offsetsReply = Map(
+ t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1)
+ ).asJava
+
+ //Create the fetcher thread
+ val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+ val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+ val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+
+ //When
+ thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+ //Then all partitions should start in an TruncatingLog state
+ assertTrue(thread.partitionStates.partitionStates().asScala.forall(_.value().truncatingLog))
+
+ //When
+ thread.doWork()
+
+ //Then none should be TruncatingLog anymore
+ assertFalse(thread.partitionStates.partitionStates().asScala.forall(_.value().truncatingLog))
+ }
+
+ @Test
+ def shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest(): Unit ={
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+ val truncateToCapture: Capture[Map[TopicPartition, Long]] = newCapture(CaptureType.ALL)
+ val initialLEO = 100
+
+ //Setup all stubs
+ val quota = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+ val logManager = createNiceMock(classOf[LogManager])
+ val replica = createNiceMock(classOf[Replica])
+ val replicaManager = createNiceMock(classOf[ReplicaManager])
+
+ //Stub return values
+ expect(logManager.truncateTo(capture(truncateToCapture))).once
+ expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+ expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
+ expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+ expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+ stub(replica, replicaManager)
+
+ replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+ //Define the offsets for the OffsetsForLeaderEpochResponse
+ val offsetsReply = Map(
+ t1p0 -> new EpochEndOffset(52), t1p1 -> new EpochEndOffset(49)
+ ).asJava
+
+ //Create the fetcher thread
+ val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+ val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime())
+ val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+
+ //When
+ thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+ //When the epoch request is outstanding, remove one of the partitions to simulate a leader change. We do this via a callback passed to the mock thread
+ val partitionThatBecameLeader = t1p0
+ mockNetwork.setEpochRequestCallback(() => {
+ thread.removePartitions(Set(partitionThatBecameLeader))
+ })
+
+ //When
+ thread.doWork()
+
+ //Then we should not have truncated the partition that became leader
+ assertEquals(None, truncateToCapture.getValue.get(partitionThatBecameLeader))
+ assertEquals(49, truncateToCapture.getValue.get(t1p1).get)
+ }
+
+ def stub(replica: Replica, replicaManager: ReplicaManager) = {
+ expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes()
+ expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes()
+ expect(replicaManager.getReplica(t1p1)).andReturn(Some(replica)).anyTimes()
+ expect(replicaManager.getReplicaOrException(t1p1)).andReturn(replica).anyTimes()
+ expect(replicaManager.getReplica(t2p1)).andReturn(Some(replica)).anyTimes()
+ expect(replicaManager.getReplicaOrException(t2p1)).andReturn(replica).anyTimes()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a720a6a..d6b1649 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -146,7 +146,7 @@ class ReplicaManagerQuotasTest {
val scheduler = createNiceMock(classOf[KafkaScheduler])
//Create log which handles both a regular read and a 0 bytes read
- val log = createMock(classOf[Log])
+ val log = createNiceMock(classOf[Log])
expect(log.logStartOffset).andReturn(0L).anyTimes()
expect(log.logEndOffset).andReturn(20L).anyTimes()
expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()