You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/21 00:41:33 UTC
[3/3] kafka git commit: MINOR: Replace TopicAndPartition with
TopicPartition in `Log` and `ReplicaManager`
MINOR: Replace TopicAndPartition with TopicPartition in `Log` and `ReplicaManager`
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2268 from ijuma/topicpartition-vs-topicandpartition
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68f204e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68f204e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68f204e0
Branch: refs/heads/trunk
Commit: 68f204e01bb6fd8b9b6c56dff7a2b0daa448764d
Parents: 0f86dbe
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Dec 20 16:36:32 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Dec 20 16:36:32 2016 -0800
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 28 ++-
.../scala/kafka/common/TopicAndPartition.scala | 7 +-
.../coordinator/GroupMetadataManager.scala | 48 ++---
core/src/main/scala/kafka/log/Log.scala | 52 +++---
core/src/main/scala/kafka/log/LogCleaner.scala | 53 +++---
.../scala/kafka/log/LogCleanerManager.scala | 97 +++++-----
core/src/main/scala/kafka/log/LogManager.scala | 70 ++++----
.../kafka/server/AbstractFetcherManager.scala | 8 +-
.../main/scala/kafka/server/DelayedFetch.scala | 21 ++-
.../scala/kafka/server/DelayedProduce.scala | 6 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 19 +-
.../scala/kafka/server/OffsetCheckpoint.scala | 10 +-
.../main/scala/kafka/server/QuotaFactory.scala | 6 +-
.../kafka/server/ReplicaFetcherThread.scala | 23 +--
.../scala/kafka/server/ReplicaManager.scala | 178 +++++++++----------
.../kafka/server/ReplicationQuotaManager.scala | 8 +-
.../kafka/tools/ReplicaVerificationTool.scala | 2 +-
core/src/main/scala/kafka/utils/Pool.scala | 2 +-
.../scala/kafka/utils/ReplicationUtils.scala | 5 +-
.../kafka/api/ConsumerBounceTest.scala | 2 +-
.../api/GroupCoordinatorIntegrationTest.scala | 4 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 11 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 63 +++----
.../GroupCoordinatorResponseTest.scala | 4 +-
.../coordinator/GroupMetadataManagerTest.scala | 10 +-
.../kafka/integration/PrimitiveApiTest.scala | 30 ++--
.../kafka/log/LogCleanerIntegrationTest.scala | 13 +-
.../log/LogCleanerLagIntegrationTest.scala | 12 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 24 +--
.../scala/unit/kafka/log/LogCleanerTest.scala | 35 ++--
.../scala/unit/kafka/log/LogManagerTest.scala | 27 +--
.../src/test/scala/unit/kafka/log/LogTest.scala | 6 +-
.../server/AbstractFetcherThreadTest.scala | 10 +-
.../kafka/server/BaseReplicaFetchTest.scala | 7 +-
.../kafka/server/DynamicConfigChangeTest.scala | 3 +-
.../server/HighwatermarkPersistenceTest.scala | 18 +-
.../unit/kafka/server/ISRExpirationTest.scala | 3 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 19 +-
.../unit/kafka/server/LogRecoveryTest.scala | 53 +++---
.../kafka/server/ReplicaManagerQuotasTest.scala | 26 ++-
.../unit/kafka/server/ReplicaManagerTest.scala | 18 +-
.../server/ReplicationQuotaManagerTest.scala | 13 +-
.../kafka/server/ReplicationQuotasTest.scala | 8 +-
.../unit/kafka/server/SimpleFetchTest.scala | 12 +-
.../unit/kafka/utils/ReplicationUtilsTest.scala | 6 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 29 +--
46 files changed, 546 insertions(+), 563 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9eb92cd..d555b73 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.PartitionState
import org.apache.kafka.common.utils.Time
@@ -44,6 +45,8 @@ class Partition(val topic: String,
val partitionId: Int,
time: Time,
replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
+ val topicPartition = new TopicPartition(topic, partitionId)
+
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager
private val zkUtils = replicaManager.zkUtils
@@ -106,12 +109,12 @@ class Partition(val topic: String,
if (isReplicaLocal(replicaId)) {
val config = LogConfig.fromProps(logManager.defaultConfig.originals,
AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
- val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
+ val log = logManager.createLog(topicPartition, config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
- if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
+ if (!offsetMap.contains(topicPartition))
info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
- val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
+ val offset = offsetMap.getOrElse(topicPartition, 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
} else {
@@ -122,13 +125,7 @@ class Partition(val topic: String,
}
}
- def getReplica(replicaId: Int = localBrokerId): Option[Replica] = {
- val replica = assignedReplicaMap.get(replicaId)
- if (replica == null)
- None
- else
- Some(replica)
- }
+ def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(assignedReplicaMap.get(replicaId))
def leaderReplicaIfLocal(): Option[Replica] = {
leaderReplicaIdOpt match {
@@ -159,7 +156,6 @@ class Partition(val topic: String,
assignedReplicaMap.clear()
inSyncReplicas = Set.empty[Replica]
leaderReplicaIdOpt = None
- val topicPartition = TopicAndPartition(topic, partitionId)
try {
logManager.asyncDelete(topicPartition)
removePartitionMetrics()
@@ -258,9 +254,7 @@ class Partition(val topic: String,
maybeExpandIsr(replicaId)
debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
- .format(replicaId,
- logReadResult.info.fetchOffsetMetadata.messageOffset,
- TopicAndPartition(topic, partitionId)))
+ .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition))
case None =>
throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
" is not recognized to be one of the assigned replicas %s for partition %s.")
@@ -268,7 +262,7 @@ class Partition(val topic: String,
replicaId,
logReadResult.info.fetchOffsetMetadata.messageOffset,
assignedReplicas().map(_.brokerId).mkString(","),
- TopicAndPartition(topic, partitionId)))
+ topicPartition))
}
}
@@ -435,7 +429,7 @@ class Partition(val topic: String,
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
if (laggingReplicas.nonEmpty)
- debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))
+ debug("Lagging replicas for partition %s are %s".format(topicPartition, laggingReplicas.map(_.brokerId).mkString(",")))
laggingReplicas
}
@@ -480,7 +474,7 @@ class Partition(val topic: String,
newLeaderAndIsr, controllerEpoch, zkVersion)
if(updateSucceeded) {
- replicaManager.recordIsrChange(TopicAndPartition(topic, partitionId))
+ replicaManager.recordIsrChange(topicPartition)
inSyncReplicas = newIsr
zkVersion = newVersion
trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 95db1dc..cf892c4 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,7 +1,8 @@
package kafka.common
-import kafka.cluster.{Replica, Partition}
+import kafka.cluster.{Partition, Replica}
import kafka.utils.Json
+import org.apache.kafka.common.TopicPartition
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -31,7 +32,9 @@ case class TopicAndPartition(topic: String, partition: Int) {
def this(replica: Replica) = this(replica.topic, replica.partitionId)
+ def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)
+
def asTuple = (topic, partition)
override def toString = "[%s,%d]".format(topic, partition)
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 0eb52bb..e649946 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
-import kafka.common.{MessageFormatter, TopicAndPartition, _}
+import kafka.common.{MessageFormatter, _}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock
@@ -234,9 +234,9 @@ class GroupMetadataManager(val brokerId: Int,
// construct the message set to append
getMagicAndTimestamp(partitionFor(group.groupId)) match {
case Some((magicValue, timestampType, timestamp)) =>
- val records = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+ val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
Record.create(magicValue, timestampType, timestamp,
- GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
+ GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition.topic, topicPartition.partition),
GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
}.toSeq
@@ -260,15 +260,15 @@ class GroupMetadataManager(val brokerId: Int,
group synchronized {
if (statusError == Errors.NONE) {
if (!group.is(Dead)) {
- filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
- group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+ filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
+ group.completePendingOffsetWrite(topicPartition, offsetAndMetadata)
}
}
Errors.NONE.code
} else {
if (!group.is(Dead)) {
- filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
- group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+ filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
+ group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
}
}
@@ -298,11 +298,11 @@ class GroupMetadataManager(val brokerId: Int,
}
// compute the final error codes for the commit response
- val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+ val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
- (topicAndPartition, responseCode)
+ (topicPartition, responseCode)
else
- (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
+ (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
}
// finally trigger the callback logic passed from the API layer
@@ -316,8 +316,8 @@ class GroupMetadataManager(val brokerId: Int,
Some(DelayedStore(entries, putCacheCallback))
case None =>
- val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
- (topicAndPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+ val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
+ (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
}
responseCallback(commitStatus)
None
@@ -366,7 +366,7 @@ class GroupMetadataManager(val brokerId: Int,
*/
def loadGroupsForPartition(offsetsPartition: Int,
onGroupLoaded: GroupMetadata => Unit) {
- val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+ val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
def loadGroupsAndOffsets() {
@@ -510,7 +510,7 @@ class GroupMetadataManager(val brokerId: Int,
*/
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {
- val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+ val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
def removeGroupsAndOffsets() {
@@ -532,11 +532,11 @@ class GroupMetadataManager(val brokerId: Int,
}
}
- if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
- .format(numOffsetsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
+ if (numOffsetsRemoved > 0)
+ info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.")
- if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
- .format(numGroupsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
+ if (numGroupsRemoved > 0)
+ info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.")
}
}
@@ -557,11 +557,11 @@ class GroupMetadataManager(val brokerId: Int,
}
val offsetsPartition = partitionFor(groupId)
+ val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
getMagicAndTimestamp(offsetsPartition) match {
case Some((magicValue, timestampType, timestamp)) =>
- val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+ val partitionOpt = replicaManager.getPartition(appendPartition)
partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
val tombstones = expiredOffsets.map { case (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
@@ -589,7 +589,7 @@ class GroupMetadataManager(val brokerId: Int,
} catch {
case t: Throwable =>
error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired offsets and/or metadata for group $groupId.", t)
- // ignore and continue
+ // ignore and continue
}
}
}
@@ -603,7 +603,7 @@ class GroupMetadataManager(val brokerId: Int,
}
private def getHighWatermark(partitionId: Int): Long = {
- val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, partitionId)
+ val partitionOpt = replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, partitionId))
val hw = partitionOpt.map { partition =>
partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -648,8 +648,8 @@ class GroupMetadataManager(val brokerId: Int,
* @return Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
*/
private def getMagicAndTimestamp(partition: Int): Option[(Byte, TimestampType, Long)] = {
- val groupMetadataTopicAndPartition = TopicAndPartition(Topic.GroupMetadataTopicName, partition)
- replicaManager.getMagicAndTimestampType(groupMetadataTopicAndPartition).map { case (messageFormatVersion, timestampType) =>
+ val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partition)
+ replicaManager.getMagicAndTimestampType(groupMetadataTopicPartition).map { case (messageFormatVersion, timestampType) =>
val timestamp = if (messageFormatVersion == Record.MAGIC_VALUE_V0) Record.NO_TIMESTAMP else time.milliseconds()
(messageFormatVersion, timestampType, timestamp)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6cd7953..8dea5ca 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -36,6 +36,7 @@ import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
+import org.apache.kafka.common.TopicPartition
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Record.NO_TIMESTAMP, -1L, Record.NO_TIMESTAMP,
@@ -121,9 +122,9 @@ class Log(@volatile var dir: File,
.format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
}
- val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
+ val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
- private val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
+ private val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
newGauge("NumLogSegments",
new Gauge[Int] {
@@ -390,7 +391,7 @@ class Log(@volatile var dir: File,
if (logEntry.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
- BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+ BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(logEntry.sizeInBytes, config.maxMessageSize))
@@ -480,7 +481,7 @@ class Log(@volatile var dir: File,
// Check if the message sizes are valid.
val messageSize = entry.sizeInBytes
if(messageSize > config.maxMessageSize) {
- BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+ BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(messageSize, config.maxMessageSize))
@@ -611,7 +612,7 @@ class Log(@volatile var dir: File,
targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP &&
targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP)
throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
- s"for partition $topicAndPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
+ s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
s"required version $KAFKA_0_10_0_IV0")
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
@@ -1120,34 +1121,31 @@ object Log {
/**
* Parse the topic and partition out of the directory name of a log
*/
- def parseTopicPartitionName(dir: File): TopicAndPartition = {
- val dirName = dir.getName
- if (dirName == null || dirName.isEmpty || !dirName.contains('-')) {
- throwException(dir)
+ def parseTopicPartitionName(dir: File): TopicPartition = {
+
+ def exception(dir: File): KafkaException = {
+ new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
+ "'" + dir.getName + "' is not in the form of topic-partition\n" +
+ "If a directory does not contain Kafka topic data it should not exist in Kafka's log " +
+ "directory")
}
+ val dirName = dir.getName
+ if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
+ throw exception(dir)
+
val name: String =
- if (dirName.endsWith(DeleteDirSuffix)) {
- dirName.substring(0, dirName.indexOf('.'))
- } else {
- dirName
- }
+ if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.indexOf('.'))
+ else dirName
val index = name.lastIndexOf('-')
- val topic: String = name.substring(0, index)
- val partition: String = name.substring(index + 1)
- if (topic.length < 1 || partition.length < 1) {
- throwException(dir)
- }
- TopicAndPartition(topic, partition.toInt)
- }
+ val topic = name.substring(0, index)
+ val partition = name.substring(index + 1)
+ if (topic.length < 1 || partition.length < 1)
+ throw exception(dir)
-
- def throwException(dir: File) {
- throw new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
- "'" + dir.getName + "' is not in the form of topic-partition\n" +
- "If a directory does not contain Kafka topic data it should not exist in Kafka's log " +
- "directory")
+ new TopicPartition(topic, partition.toInt)
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7abd1d8..620ae9b 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -29,6 +29,7 @@ import kafka.utils._
import org.apache.kafka.common.record.{FileRecords, LogEntry, MemoryRecords}
import org.apache.kafka.common.utils.Time
import MemoryRecords.LogEntryFilter
+import org.apache.kafka.common.TopicPartition
import scala.collection._
import JavaConverters._
@@ -70,7 +71,7 @@ import JavaConverters._
*/
class LogCleaner(val config: CleanerConfig,
val logDirs: Array[File],
- val logs: Pool[TopicAndPartition, Log],
+ val logs: Pool[TopicPartition, Log],
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
/* for managing the state of partitions being cleaned. package-private to allow access in tests */
@@ -127,8 +128,8 @@ class LogCleaner(val config: CleanerConfig,
* Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
* the partition is aborted.
*/
- def abortCleaning(topicAndPartition: TopicAndPartition) {
- cleanerManager.abortCleaning(topicAndPartition)
+ def abortCleaning(topicPartition: TopicPartition) {
+ cleanerManager.abortCleaning(topicPartition)
}
/**
@@ -141,38 +142,37 @@ class LogCleaner(val config: CleanerConfig,
/**
* Truncate cleaner offset checkpoint for the given partition if its checkpointed offset is larger than the given offset
*/
- def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
- cleanerManager.maybeTruncateCheckpoint(dataDir, topicAndPartition, offset)
+ def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: Long) {
+ cleanerManager.maybeTruncateCheckpoint(dataDir, topicPartition, offset)
}
/**
* Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
* This call blocks until the cleaning of the partition is aborted and paused.
*/
- def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
- cleanerManager.abortAndPauseCleaning(topicAndPartition)
+ def abortAndPauseCleaning(topicPartition: TopicPartition) {
+ cleanerManager.abortAndPauseCleaning(topicPartition)
}
/**
* Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
*/
- def resumeCleaning(topicAndPartition: TopicAndPartition) {
- cleanerManager.resumeCleaning(topicAndPartition)
+ def resumeCleaning(topicPartition: TopicPartition) {
+ cleanerManager.resumeCleaning(topicPartition)
}
/**
* For testing, a way to know when work has completed. This method waits until the
* cleaner has processed up to the given offset on the specified topic/partition
*
- * @param topic The Topic to be cleaned
- * @param part The partition of the topic to be cleaned
+ * @param topicPartition The topic and partition to be cleaned
* @param offset The first dirty offset that the cleaner doesn't have to clean
* @param maxWaitMs The maximum time in ms to wait for cleaner
*
* @return A boolean indicating whether the work has completed before timeout
*/
- def awaitCleaned(topic: String, part: Int, offset: Long, maxWaitMs: Long = 60000L): Boolean = {
- def isCleaned = cleanerManager.allCleanerCheckpoints.get(TopicAndPartition(topic, part)).fold(false)(_ >= offset)
+ def awaitCleaned(topicPartition: TopicPartition, offset: Long, maxWaitMs: Long = 60000L): Boolean = {
+ def isCleaned = cleanerManager.allCleanerCheckpoints.get(topicPartition).fold(false)(_ >= offset)
var remainingWaitMs = maxWaitMs
while (!isCleaned && remainingWaitMs > 0) {
val sleepTime = math.min(100, remainingWaitMs)
@@ -207,10 +207,10 @@ class LogCleaner(val config: CleanerConfig,
@volatile var lastStats: CleanerStats = new CleanerStats()
private val backOffWaitLatch = new CountDownLatch(1)
- private def checkDone(topicAndPartition: TopicAndPartition) {
+ private def checkDone(topicPartition: TopicPartition) {
if (!isRunning.get())
throw new ThreadShutdownException
- cleanerManager.checkCleaningAborted(topicAndPartition)
+ cleanerManager.checkCleaningAborted(topicPartition)
}
/**
@@ -248,7 +248,7 @@ class LogCleaner(val config: CleanerConfig,
}
true
}
- val deletable: Iterable[(TopicAndPartition, Log)] = cleanerManager.deletableLogs()
+ val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
deletable.foreach{
case (topicPartition, log) =>
try {
@@ -311,7 +311,7 @@ private[log] class Cleaner(val id: Int,
dupBufferLoadFactor: Double,
throttler: Throttler,
time: Time,
- checkDone: (TopicAndPartition) => Unit) extends Logging {
+ checkDone: (TopicPartition) => Unit) extends Logging {
override val loggerName = classOf[LogCleaner].getName
@@ -402,7 +402,7 @@ private[log] class Cleaner(val id: Int,
val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
- cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
+ cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
}
// trim excess index
@@ -435,7 +435,7 @@ private[log] class Cleaner(val id: Int,
* Clean the given source log segment into the destination segment using the key=>offset mapping
* provided
*
- * @param topicAndPartition The topic and partition of the log segment to clean
+ * @param topicPartition The topic and partition of the log segment to clean
* @param source The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
@@ -443,13 +443,14 @@ private[log] class Cleaner(val id: Int,
* @param maxLogMessageSize The maximum message size of the corresponding topic
* @param stats Collector for cleaning statistics
*/
- private[log] def cleanInto(topicAndPartition: TopicAndPartition,
+ private[log] def cleanInto(topicPartition: TopicPartition,
source: LogSegment,
dest: LogSegment,
map: OffsetMap,
retainDeletes: Boolean,
maxLogMessageSize: Int,
stats: CleanerStats) {
+
def shouldRetainEntry(logEntry: LogEntry): Boolean =
shouldRetainMessage(source, map, retainDeletes, logEntry, stats)
@@ -459,7 +460,7 @@ private[log] class Cleaner(val id: Int,
var position = 0
while (position < source.log.sizeInBytes) {
- checkDone(topicAndPartition)
+ checkDone(topicPartition)
// read a chunk of messages and copy any that are to be retained to the write buffer to be written out
readBuffer.clear()
writeBuffer.clear()
@@ -598,9 +599,9 @@ private[log] class Cleaner(val id: Int,
// but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
var full = false
for (segment <- dirty if !full) {
- checkDone(log.topicAndPartition)
+ checkDone(log.topicPartition)
- full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, start, log.config.maxMessageSize, stats)
+ full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize, stats)
if (full)
debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
}
@@ -616,7 +617,7 @@ private[log] class Cleaner(val id: Int,
*
* @return If the map was filled whilst loading from this segment
*/
- private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition,
+ private def buildOffsetMapForSegment(topicPartition: TopicPartition,
segment: LogSegment,
map: OffsetMap,
start: Long,
@@ -625,7 +626,7 @@ private[log] class Cleaner(val id: Int,
var position = segment.index.lookup(start).position
val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
while (position < segment.log.sizeInBytes) {
- checkDone(topicAndPartition)
+ checkDone(topicPartition)
readBuffer.clear()
segment.log.readInto(readBuffer, position)
val records = MemoryRecords.readableRecords(readBuffer)
@@ -710,7 +711,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
/**
* Helper class for a log, its topic/partition, the first cleanable position, and the first uncleanable dirty position
*/
-private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
+private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
private[this] val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
val firstUncleanableOffset = firstUncleanableSegment.baseOffset
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 681042e..1290ada 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -22,11 +22,12 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
-import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
+import kafka.common.LogCleaningAbortedException
import kafka.metrics.KafkaMetricsGroup
import kafka.server.OffsetCheckpoint
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
import scala.collection.{immutable, mutable}
@@ -44,7 +45,7 @@ private[log] case object LogCleaningPaused extends LogCleaningState
* While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
* requested to be resumed.
*/
-private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
+private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicPartition, Log]) extends Logging with KafkaMetricsGroup {
import LogCleanerManager._
@@ -57,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
/* the set of logs currently being cleaned */
- private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
+ private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()
/* a global lock used to control all access to the in-progress set and the offset checkpoints */
private val lock = new ReentrantLock
@@ -72,7 +73,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
/**
* @return the position processed for all logs.
*/
- def allCleanerCheckpoints: Map[TopicAndPartition, Long] =
+ def allCleanerCheckpoints: Map[TopicPartition, Long] =
checkpoints.values.flatMap(_.read()).toMap
/**
@@ -87,12 +88,12 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
val dirtyLogs = logs.filter {
case (_, log) => log.config.compact // match logs that are marked as compacted
}.filterNot {
- case (topicAndPartition, _) => inProgress.contains(topicAndPartition) // skip any logs already in-progress
+ case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
}.map {
- case (topicAndPartition, log) => // create a LogToClean instance for each
- val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicAndPartition,
+ case (topicPartition, log) => // create a LogToClean instance for each
+ val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
lastClean, now)
- LogToClean(topicAndPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
+ LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
@@ -111,10 +112,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
/**
* Find any logs that have compact and delete enabled
*/
- def deletableLogs(): Iterable[(TopicAndPartition, Log)] = {
+ def deletableLogs(): Iterable[(TopicPartition, Log)] = {
inLock(lock) {
- val toClean = logs.filter { case (topicAndPartition, log) =>
- !inProgress.contains(topicAndPartition) && isCompactAndDelete(log)
+ val toClean = logs.filter { case (topicPartition, log) =>
+ !inProgress.contains(topicPartition) && isCompactAndDelete(log)
}
toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
toClean
@@ -127,12 +128,12 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
* the partition is aborted.
* This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
*/
- def abortCleaning(topicAndPartition: TopicAndPartition) {
+ def abortCleaning(topicPartition: TopicPartition) {
inLock(lock) {
- abortAndPauseCleaning(topicAndPartition)
- resumeCleaning(topicAndPartition)
+ abortAndPauseCleaning(topicPartition)
+ resumeCleaning(topicPartition)
}
- info(s"The cleaning for partition $topicAndPartition is aborted")
+ info(s"The cleaning for partition $topicPartition is aborted")
}
/**
@@ -145,50 +146,50 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
* 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
* 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
*/
- def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
+ def abortAndPauseCleaning(topicPartition: TopicPartition) {
inLock(lock) {
- inProgress.get(topicAndPartition) match {
+ inProgress.get(topicPartition) match {
case None =>
- inProgress.put(topicAndPartition, LogCleaningPaused)
+ inProgress.put(topicPartition, LogCleaningPaused)
case Some(state) =>
state match {
case LogCleaningInProgress =>
- inProgress.put(topicAndPartition, LogCleaningAborted)
+ inProgress.put(topicPartition, LogCleaningAborted)
case s =>
- throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be aborted and paused since it is in $s state.")
+ throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.")
}
}
- while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
+ while (!isCleaningInState(topicPartition, LogCleaningPaused))
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
}
- info(s"The cleaning for partition $topicAndPartition is aborted and paused")
+ info(s"The cleaning for partition $topicPartition is aborted and paused")
}
/**
* Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
*/
- def resumeCleaning(topicAndPartition: TopicAndPartition) {
+ def resumeCleaning(topicPartition: TopicPartition) {
inLock(lock) {
- inProgress.get(topicAndPartition) match {
+ inProgress.get(topicPartition) match {
case None =>
- throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is not paused.")
+ throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.")
case Some(state) =>
state match {
case LogCleaningPaused =>
- inProgress.remove(topicAndPartition)
+ inProgress.remove(topicPartition)
case s =>
- throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is in $s state.")
+ throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.")
}
}
}
- info(s"Compaction for partition $topicAndPartition is resumed")
+ info(s"Compaction for partition $topicPartition is resumed")
}
/**
* Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
*/
- private def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
- inProgress.get(topicAndPartition) match {
+ private def isCleaningInState(topicPartition: TopicPartition, expectedState: LogCleaningState): Boolean = {
+ inProgress.get(topicPartition) match {
case None => false
case Some(state) =>
if (state == expectedState)
@@ -201,14 +202,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
/**
* Check if the cleaning for a partition is aborted. If so, throw an exception.
*/
- def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
+ def checkCleaningAborted(topicPartition: TopicPartition) {
inLock(lock) {
- if (isCleaningInState(topicAndPartition, LogCleaningAborted))
+ if (isCleaningInState(topicPartition, LogCleaningAborted))
throw new LogCleaningAbortedException()
}
}
- def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
+ def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]) {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
val existing = checkpoint.read().filterKeys(logs.keys) ++ update
@@ -216,14 +217,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
}
}
- def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
+ def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: Long) {
inLock(lock) {
- if (logs.get(topicAndPartition).config.compact) {
+ if (logs.get(topicPartition).config.compact) {
val checkpoint = checkpoints(dataDir)
val existing = checkpoint.read()
- if (existing.getOrElse(topicAndPartition, 0L) > offset)
- checkpoint.write(existing + (topicAndPartition -> offset))
+ if (existing.getOrElse(topicPartition, 0L) > offset)
+ checkpoint.write(existing + (topicPartition -> offset))
}
}
}
@@ -231,24 +232,24 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
/**
* Save out the endOffset and remove the given log from the in-progress set, if not aborted.
*/
- def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
+ def doneCleaning(topicPartition: TopicPartition, dataDir: File, endOffset: Long) {
inLock(lock) {
- inProgress(topicAndPartition) match {
+ inProgress(topicPartition) match {
case LogCleaningInProgress =>
- updateCheckpoints(dataDir,Option(topicAndPartition, endOffset))
- inProgress.remove(topicAndPartition)
+ updateCheckpoints(dataDir,Option(topicPartition, endOffset))
+ inProgress.remove(topicPartition)
case LogCleaningAborted =>
- inProgress.put(topicAndPartition, LogCleaningPaused)
+ inProgress.put(topicPartition, LogCleaningPaused)
pausedCleaningCond.signalAll()
case s =>
- throw new IllegalStateException(s"In-progress partition $topicAndPartition cannot be in $s state.")
+ throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.")
}
}
}
- def doneDeleting(topicAndPartition: TopicAndPartition): Unit = {
+ def doneDeleting(topicPartition: TopicPartition): Unit = {
inLock(lock) {
- inProgress.remove(topicAndPartition)
+ inProgress.remove(topicPartition)
}
}
}
@@ -268,10 +269,10 @@ private[log] object LogCleanerManager extends Logging {
* @param now the current time in milliseconds of the cleaning operation
* @return the lower (inclusive) and upper (exclusive) offsets
*/
- def cleanableOffsets(log: Log, topicAndPartition: TopicAndPartition, lastClean: immutable.Map[TopicAndPartition, Long], now: Long): (Long, Long) = {
+ def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean: immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = {
// the checkpointed offset, ie., the first offset of the next dirty segment
- val lastCleanOffset: Option[Long] = lastClean.get(topicAndPartition)
+ val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
@@ -312,7 +313,7 @@ private[log] object LogCleanerManager extends Logging {
} else None
).flatten.min
- debug(s"Finding range of cleanable offsets for log=${log.name} topicAndPartition=$topicAndPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
+ debug(s"Finding range of cleanable offsets for log=${log.name} topicPartition=$topicPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
(firstDirtyOffset, firstUncleanableDirtyOffset)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 953fca4..8cd9b34 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -24,9 +24,11 @@ import kafka.utils._
import scala.collection._
import scala.collection.JavaConverters._
-import kafka.common.{KafkaStorageException, KafkaException, TopicAndPartition}
+import kafka.common.{KafkaException, KafkaStorageException}
import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
+
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
/**
@@ -56,7 +58,7 @@ class LogManager(val logDirs: Array[File],
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
- private val logs = new Pool[TopicAndPartition, Log]()
+ private val logs = new Pool[TopicPartition, Log]()
private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
createAndValidateLogDirs(logDirs)
@@ -132,7 +134,7 @@ class LogManager(val logDirs: Array[File],
brokerState.newState(RecoveringFromUncleanShutdown)
}
- var recoveryPoints = Map[TopicAndPartition, Long]()
+ var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
@@ -285,21 +287,21 @@ class LogManager(val logDirs: Array[File],
/**
* Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
*
- * @param partitionAndOffsets Partition logs that need to be truncated
+ * @param partitionOffsets Partition logs that need to be truncated
*/
- def truncateTo(partitionAndOffsets: Map[TopicAndPartition, Long]) {
- for ((topicAndPartition, truncateOffset) <- partitionAndOffsets) {
- val log = logs.get(topicAndPartition)
+ def truncateTo(partitionOffsets: Map[TopicPartition, Long]) {
+ for ((topicPartition, truncateOffset) <- partitionOffsets) {
+ val log = logs.get(topicPartition)
// If the log does not exist, skip it
if (log != null) {
//May need to abort and pause the cleaning of the log, and resume after truncation is done.
val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset
if (needToStopCleaner && cleaner != null)
- cleaner.abortAndPauseCleaning(topicAndPartition)
+ cleaner.abortAndPauseCleaning(topicPartition)
log.truncateTo(truncateOffset)
if (needToStopCleaner && cleaner != null) {
- cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset)
- cleaner.resumeCleaning(topicAndPartition)
+ cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+ cleaner.resumeCleaning(topicPartition)
}
}
}
@@ -310,17 +312,17 @@ class LogManager(val logDirs: Array[File],
* Delete all data in a partition and start the log at the new offset
* @param newOffset The new offset to start the log with
*/
- def truncateFullyAndStartAt(topicAndPartition: TopicAndPartition, newOffset: Long) {
- val log = logs.get(topicAndPartition)
+ def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long) {
+ val log = logs.get(topicPartition)
// If the log does not exist, skip it
if (log != null) {
//Abort and pause the cleaning of the log, and resume after truncation is done.
if (cleaner != null)
- cleaner.abortAndPauseCleaning(topicAndPartition)
+ cleaner.abortAndPauseCleaning(topicPartition)
log.truncateFullyAndStartAt(newOffset)
if (cleaner != null) {
- cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset)
- cleaner.resumeCleaning(topicAndPartition)
+ cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+ cleaner.resumeCleaning(topicPartition)
}
}
checkpointRecoveryPointOffsets()
@@ -347,8 +349,8 @@ class LogManager(val logDirs: Array[File],
/**
* Get the log if it exists, otherwise return None
*/
- def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
- val log = logs.get(topicAndPartition)
+ def getLog(topicPartition: TopicPartition): Option[Log] = {
+ val log = logs.get(topicPartition)
if (log == null)
None
else
@@ -359,9 +361,9 @@ class LogManager(val logDirs: Array[File],
* Create a log for the given topic and the given partition
* If the log already exists, just return a copy of the existing log
*/
- def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
+ def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
logCreationOrDeletionLock synchronized {
- var log = logs.get(topicAndPartition)
+ var log = logs.get(topicPartition)
// check if the log has already been created in another thread
if(log != null)
@@ -369,17 +371,17 @@ class LogManager(val logDirs: Array[File],
// if not, create it
val dataDir = nextLogDir()
- val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
+ val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
log = new Log(dir,
config,
recoveryPoint = 0L,
scheduler,
time)
- logs.put(topicAndPartition, log)
+ logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
- .format(topicAndPartition.topic,
- topicAndPartition.partition,
+ .format(topicPartition.topic,
+ topicPartition.partition,
dataDir.getAbsolutePath,
config.originals.asScala.mkString(", ")))
log
@@ -397,7 +399,7 @@ class LogManager(val logDirs: Array[File],
if (removedLog != null) {
try {
removedLog.delete()
- info(s"Deleted log for partition ${removedLog.topicAndPartition} in ${removedLog.dir.getAbsolutePath}.")
+ info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: Throwable =>
error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
@@ -415,16 +417,16 @@ class LogManager(val logDirs: Array[File],
/**
* Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
* add it in the queue for deletion.
- * @param topicAndPartition TopicPartition that needs to be deleted
+ * @param topicPartition TopicPartition that needs to be deleted
*/
- def asyncDelete(topicAndPartition: TopicAndPartition) = {
+ def asyncDelete(topicPartition: TopicPartition) = {
val removedLog: Log = logCreationOrDeletionLock synchronized {
- logs.remove(topicAndPartition)
+ logs.remove(topicPartition)
}
if (removedLog != null) {
//We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null) {
- cleaner.abortCleaning(topicAndPartition)
+ cleaner.abortCleaning(topicPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
// renaming the directory to topic-partition.uniqueId-delete
@@ -446,7 +448,7 @@ class LogManager(val logDirs: Array[File],
logsToBeDeleted.add(removedLog)
removedLog.removeLogMetrics()
- info(s"Log for partition ${removedLog.topicAndPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
+ info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
} else {
throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
}
@@ -495,9 +497,9 @@ class LogManager(val logDirs: Array[File],
def allLogs(): Iterable[Log] = logs.values
/**
- * Get a map of TopicAndPartition => Log
+ * Get a map of TopicPartition => Log
*/
- def logsByTopicPartition: Map[TopicAndPartition, Log] = logs.toMap
+ def logsByTopicPartition: Map[TopicPartition, Log] = logs.toMap
/**
* Map of log dir to logs by topic and partitions in that dir
@@ -514,16 +516,16 @@ class LogManager(val logDirs: Array[File],
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
- for ((topicAndPartition, log) <- logs) {
+ for ((topicPartition, log) <- logs) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
- debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs +
+ debug("Checking if flush is needed on " + topicPartition.topic + " flush interval " + log.config.flushMs +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
- error("Error flushing topic " + topicAndPartition.topic, e)
+ error("Error flushing topic " + topicPartition.topic, e)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 5e584ab..0a17f8e 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -73,8 +73,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
mapLock synchronized {
- val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
- BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
+ val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) =>
+ BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
@@ -91,8 +91,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
}
}
- info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
- "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
+ info("Added fetcher for partitions %s".format(partitionAndOffsets.map { case (topicPartition, brokerAndInitialOffset) =>
+ "[" + topicPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
}
def removeFetcherForPartitions(partitions: Set[TopicPartition]) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 743c994..0a1884a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -19,7 +19,6 @@ package kafka.server
import java.util.concurrent.TimeUnit
-import kafka.common.TopicAndPartition
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
@@ -43,7 +42,7 @@ case class FetchMetadata(fetchMinBytes: Int,
fetchOnlyCommitted: Boolean,
isFromFollower: Boolean,
replicaId: Int,
- fetchPartitionStatus: Seq[(TopicAndPartition, FetchPartitionStatus)]) {
+ fetchPartitionStatus: Seq[(TopicPartition, FetchPartitionStatus)]) {
override def toString = "[minBytes: " + fetchMinBytes + ", " +
"onlyLeader:" + fetchOnlyLeader + ", "
@@ -58,7 +57,7 @@ class DelayedFetch(delayMs: Long,
fetchMetadata: FetchMetadata,
replicaManager: ReplicaManager,
quota: ReplicaQuota,
- responseCallback: Seq[(TopicAndPartition, FetchPartitionData)] => Unit)
+ responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(delayMs) {
/**
@@ -75,11 +74,11 @@ class DelayedFetch(delayMs: Long,
var accumulatedSize = 0
var accumulatedThrottledSize = 0
fetchMetadata.fetchPartitionStatus.foreach {
- case (topicAndPartition, fetchStatus) =>
+ case (topicPartition, fetchStatus) =>
val fetchOffset = fetchStatus.startOffsetMetadata
try {
if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
- val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
+ val replica = replicaManager.getLeaderReplicaIfLocal(topicPartition)
val endOffset =
if (fetchMetadata.fetchOnlyCommitted)
replica.highWatermark
@@ -92,19 +91,19 @@ class DelayedFetch(delayMs: Long,
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case C, this can happen when the new fetch operation is on a truncated leader
- debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition))
+ debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicPartition))
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
// Case C, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
// We will not force complete the fetch request if a replica should be throttled.
- if (!replicaManager.shouldLeaderThrottle(quota, topicAndPartition, fetchMetadata.replicaId))
+ if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
- if (quota.isThrottled(topicAndPartition))
+ if (quota.isThrottled(topicPartition))
accumulatedThrottledSize += bytesAvailable
else
accumulatedSize += bytesAvailable
@@ -113,10 +112,10 @@ class DelayedFetch(delayMs: Long,
}
} catch {
case _: UnknownTopicOrPartitionException => // Case B
- debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
+ debug("Broker no longer know of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
case _: NotLeaderForPartitionException => // Case A
- debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
+ debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
}
}
@@ -146,7 +145,7 @@ class DelayedFetch(delayMs: Long,
readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
fetchMaxBytes = fetchMetadata.fetchMaxBytes,
hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
- readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => new TopicPartition(tp.topic, tp.partition) -> status.fetchInfo },
+ readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
quota = quota
)
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 5a59d3b..1af0bfb 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -81,11 +81,11 @@ class DelayedProduce(delayMs: Long,
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
- produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
- trace(s"Checking produce satisfaction for ${topicAndPartition}, current status $status")
+ produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
+ trace(s"Checking produce satisfaction for ${topicPartition}, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
- val (hasEnough, error) = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) match {
+ val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
case Some(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case None =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c66f9e3..7fd8c2b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -461,9 +461,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// the callback for sending a fetch response
- def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, FetchPartitionData)]) {
+ def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
val convertedPartitionData = {
- // Need to down-convert message when consumer only takes magic value 0.
responsePartitionData.map { case (tp, data) =>
// We only do down-conversion when:
@@ -480,7 +479,7 @@ class KafkaApis(val requestChannel: RequestChannel,
FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0))
} else data
- new TopicPartition(tp.topic, tp.partition) -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records)
+ tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records)
}
}
@@ -544,7 +543,7 @@ class KafkaApis(val requestChannel: RequestChannel,
quota: ReplicationQuotaManager): Int = {
val partitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
mergedPartitionData.foreach { case (tp, data) =>
- if (quota.isThrottled(TopicAndPartition(tp.topic(), tp.partition())))
+ if (quota.isThrottled(tp))
partitionData.put(tp, data)
}
FetchResponse.sizeOf(versionId, partitionData)
@@ -586,9 +585,9 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
// ensure leader exists
val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
- replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition)
+ replicaManager.getLeaderReplicaIfLocal(topicPartition)
else
- replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition)
+ replicaManager.getReplicaOrException(topicPartition)
val offsets = {
val allOffsets = fetchOffsets(replicaManager.logManager,
topicPartition,
@@ -648,9 +647,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// ensure leader exists
val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
- replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition)
+ replicaManager.getLeaderReplicaIfLocal(topicPartition)
else
- replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition)
+ replicaManager.getReplicaOrException(topicPartition)
val found = {
if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
@@ -690,7 +689,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def fetchOffsets(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
- logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition)) match {
+ logManager.getLog(topicPartition) match {
case Some(log) =>
fetchOffsetsBefore(log, timestamp, maxNumOffsets)
case None =>
@@ -702,7 +701,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = {
- logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition)) match {
+ logManager.getLog(topicPartition) match {
case Some(log) =>
log.fetchOffsetsByTimestamp(timestamp)
case None =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index a39fe49..c838e09 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -26,6 +26,8 @@ import kafka.utils.Logging
import kafka.common._
import java.io._
+import org.apache.kafka.common.TopicPartition
+
object OffsetCheckpoint {
private val WhiteSpacesPattern = Pattern.compile("\\s+")
private val CurrentVersion = 0
@@ -41,7 +43,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
private val lock = new Object()
file.createNewFile() // in case the file doesn't exist
- def write(offsets: Map[TopicAndPartition, Long]) {
+ def write(offsets: Map[TopicPartition, Long]) {
lock synchronized {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
@@ -75,7 +77,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
}
}
- def read(): Map[TopicAndPartition, Long] = {
+ def read(): Map[TopicPartition, Long] = {
def malformedLineException(line: String) =
new IOException(s"Malformed line in offset checkpoint file: $line'")
@@ -94,12 +96,12 @@ class OffsetCheckpoint(val file: File) extends Logging {
if (line == null)
return Map.empty
val expectedSize = line.toInt
- val offsets = mutable.Map[TopicAndPartition, Long]()
+ val offsets = mutable.Map[TopicPartition, Long]()
line = reader.readLine()
while (line != null) {
WhiteSpacesPattern.split(line) match {
case Array(topic, partition, offset) =>
- offsets += TopicAndPartition(topic, partition.toInt) -> offset.toLong
+ offsets += new TopicPartition(topic, partition.toInt) -> offset.toLong
line = reader.readLine()
case _ => throw malformedLineException(line)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 65e7c9e..671ad63 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -16,8 +16,8 @@
*/
package kafka.server
-import kafka.common.TopicAndPartition
import kafka.server.QuotaType._
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
@@ -32,7 +32,7 @@ sealed trait QuotaType
object QuotaFactory {
object UnboundedQuota extends ReplicaQuota {
- override def isThrottled(topicAndPartition: TopicAndPartition): Boolean = false
+ override def isThrottled(topicPartition: TopicPartition): Boolean = false
override def isQuotaExceeded(): Boolean = false
}
@@ -71,4 +71,4 @@ object QuotaFactory {
numQuotaSamples = cfg.numReplicationQuotaSamples,
quotaWindowSizeSeconds = cfg.replicationQuotaWindowSizeSeconds
)
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 54a2e05..d5d7a13 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -24,7 +24,7 @@ import kafka.admin.AdminUtils
import kafka.cluster.BrokerEndPoint
import kafka.log.LogConfig
import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0}
-import kafka.common.{KafkaStorageException, TopicAndPartition}
+import kafka.common.KafkaStorageException
import ReplicaFetcherThread._
import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
@@ -115,9 +115,7 @@ class ReplicaFetcherThread(name: String,
// process fetched data
def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
try {
- val topic = topicPartition.topic
- val partitionId = topicPartition.partition
- val replica = replicaMgr.getReplica(topic, partitionId).get
+ val replica = replicaMgr.getReplica(topicPartition).get
val records = partitionData.toRecords
maybeWarnIfOversizedRecords(records, topicPartition)
@@ -137,9 +135,8 @@ class ReplicaFetcherThread(name: String,
// these values will be computed upon making the leader
replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
if (logger.isTraceEnabled)
- trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
- .format(replica.brokerId, topic, partitionId, followerHighWatermark))
- if (quota.isThrottled(TopicAndPartition(topic, partitionId)))
+ trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
+ if (quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
} catch {
case e: KafkaStorageException =>
@@ -161,8 +158,7 @@ class ReplicaFetcherThread(name: String,
* Handle a partition whose offset is out of range and return a new fetch offset.
*/
def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
- val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)
- val replica = replicaMgr.getReplica(topicPartition.topic, topicPartition.partition).get
+ val replica = replicaMgr.getReplica(topicPartition).get
/**
* Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
@@ -192,7 +188,7 @@ class ReplicaFetcherThread(name: String,
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"
.format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
- replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
+ replicaMgr.logManager.truncateTo(Map(topicPartition -> leaderEndOffset))
leaderEndOffset
} else {
/**
@@ -224,7 +220,7 @@ class ReplicaFetcherThread(name: String,
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
// Only truncate log when current leader's log start offset is greater than follower's log end offset.
if (leaderStartOffset > replica.logEndOffset.messageOffset)
- replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
+ replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset)
offsetToFetch
}
}
@@ -287,9 +283,8 @@ class ReplicaFetcherThread(name: String,
val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
- val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition)
// We will not include a replica in the fetch request if it should be throttled.
- if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicAndPartition))
+ if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
}
@@ -304,7 +299,7 @@ class ReplicaFetcherThread(name: String,
* To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
*/
- private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition): Boolean = {
+ private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = {
val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition)
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
}