You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/21 00:41:33 UTC

[3/3] kafka git commit: MINOR: Replace TopicAndPartition with TopicPartition in `Log` and `ReplicaManager`

MINOR: Replace TopicAndPartition with TopicPartition in `Log` and `ReplicaManager`

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2268 from ijuma/topicpartition-vs-topicandpartition


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68f204e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68f204e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68f204e0

Branch: refs/heads/trunk
Commit: 68f204e01bb6fd8b9b6c56dff7a2b0daa448764d
Parents: 0f86dbe
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Dec 20 16:36:32 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Dec 20 16:36:32 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  28 ++-
 .../scala/kafka/common/TopicAndPartition.scala  |   7 +-
 .../coordinator/GroupMetadataManager.scala      |  48 ++---
 core/src/main/scala/kafka/log/Log.scala         |  52 +++---
 core/src/main/scala/kafka/log/LogCleaner.scala  |  53 +++---
 .../scala/kafka/log/LogCleanerManager.scala     |  97 +++++-----
 core/src/main/scala/kafka/log/LogManager.scala  |  70 ++++----
 .../kafka/server/AbstractFetcherManager.scala   |   8 +-
 .../main/scala/kafka/server/DelayedFetch.scala  |  21 ++-
 .../scala/kafka/server/DelayedProduce.scala     |   6 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  19 +-
 .../scala/kafka/server/OffsetCheckpoint.scala   |  10 +-
 .../main/scala/kafka/server/QuotaFactory.scala  |   6 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  23 +--
 .../scala/kafka/server/ReplicaManager.scala     | 178 +++++++++----------
 .../kafka/server/ReplicationQuotaManager.scala  |   8 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |   2 +-
 core/src/main/scala/kafka/utils/Pool.scala      |   2 +-
 .../scala/kafka/utils/ReplicationUtils.scala    |   5 +-
 .../kafka/api/ConsumerBounceTest.scala          |   2 +-
 .../api/GroupCoordinatorIntegrationTest.scala   |   4 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  11 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |  63 +++----
 .../GroupCoordinatorResponseTest.scala          |   4 +-
 .../coordinator/GroupMetadataManagerTest.scala  |  10 +-
 .../kafka/integration/PrimitiveApiTest.scala    |  30 ++--
 .../kafka/log/LogCleanerIntegrationTest.scala   |  13 +-
 .../log/LogCleanerLagIntegrationTest.scala      |  12 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  24 +--
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  35 ++--
 .../scala/unit/kafka/log/LogManagerTest.scala   |  27 +--
 .../src/test/scala/unit/kafka/log/LogTest.scala |   6 +-
 .../server/AbstractFetcherThreadTest.scala      |  10 +-
 .../kafka/server/BaseReplicaFetchTest.scala     |   7 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |   3 +-
 .../server/HighwatermarkPersistenceTest.scala   |  18 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   3 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |  19 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |  53 +++---
 .../kafka/server/ReplicaManagerQuotasTest.scala |  26 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  18 +-
 .../server/ReplicationQuotaManagerTest.scala    |  13 +-
 .../kafka/server/ReplicationQuotasTest.scala    |   8 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |  12 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala |   6 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  29 +--
 46 files changed, 546 insertions(+), 563 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9eb92cd..d555b73 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.PartitionState
 import org.apache.kafka.common.utils.Time
@@ -44,6 +45,8 @@ class Partition(val topic: String,
                 val partitionId: Int,
                 time: Time,
                 replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
+  val topicPartition = new TopicPartition(topic, partitionId)
+
   private val localBrokerId = replicaManager.config.brokerId
   private val logManager = replicaManager.logManager
   private val zkUtils = replicaManager.zkUtils
@@ -106,12 +109,12 @@ class Partition(val topic: String,
         if (isReplicaLocal(replicaId)) {
           val config = LogConfig.fromProps(logManager.defaultConfig.originals,
                                            AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
-          val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
+          val log = logManager.createLog(topicPartition, config)
           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
           val offsetMap = checkpoint.read
-          if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
+          if (!offsetMap.contains(topicPartition))
             info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
-          val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
+          val offset = offsetMap.getOrElse(topicPartition, 0L).min(log.logEndOffset)
           val localReplica = new Replica(replicaId, this, time, offset, Some(log))
           addReplicaIfNotExists(localReplica)
         } else {
@@ -122,13 +125,7 @@ class Partition(val topic: String,
     }
   }
 
-  def getReplica(replicaId: Int = localBrokerId): Option[Replica] = {
-    val replica = assignedReplicaMap.get(replicaId)
-    if (replica == null)
-      None
-    else
-      Some(replica)
-  }
+  def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(assignedReplicaMap.get(replicaId))
 
   def leaderReplicaIfLocal(): Option[Replica] = {
     leaderReplicaIdOpt match {
@@ -159,7 +156,6 @@ class Partition(val topic: String,
       assignedReplicaMap.clear()
       inSyncReplicas = Set.empty[Replica]
       leaderReplicaIdOpt = None
-      val topicPartition = TopicAndPartition(topic, partitionId)
       try {
         logManager.asyncDelete(topicPartition)
         removePartitionMetrics()
@@ -258,9 +254,7 @@ class Partition(val topic: String,
         maybeExpandIsr(replicaId)
 
         debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
-          .format(replicaId,
-                  logReadResult.info.fetchOffsetMetadata.messageOffset,
-                  TopicAndPartition(topic, partitionId)))
+          .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition))
       case None =>
         throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
           " is not recognized to be one of the assigned replicas %s for partition %s.")
@@ -268,7 +262,7 @@ class Partition(val topic: String,
                   replicaId,
                   logReadResult.info.fetchOffsetMetadata.messageOffset,
                   assignedReplicas().map(_.brokerId).mkString(","),
-                  TopicAndPartition(topic, partitionId)))
+                  topicPartition))
     }
   }
 
@@ -435,7 +429,7 @@ class Partition(val topic: String,
 
     val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
     if (laggingReplicas.nonEmpty)
-      debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))
+      debug("Lagging replicas for partition %s are %s".format(topicPartition, laggingReplicas.map(_.brokerId).mkString(",")))
 
     laggingReplicas
   }
@@ -480,7 +474,7 @@ class Partition(val topic: String,
       newLeaderAndIsr, controllerEpoch, zkVersion)
 
     if(updateSucceeded) {
-      replicaManager.recordIsrChange(TopicAndPartition(topic, partitionId))
+      replicaManager.recordIsrChange(topicPartition)
       inSyncReplicas = newIsr
       zkVersion = newVersion
       trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 95db1dc..cf892c4 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,7 +1,8 @@
 package kafka.common
 
-import kafka.cluster.{Replica, Partition}
+import kafka.cluster.{Partition, Replica}
 import kafka.utils.Json
+import org.apache.kafka.common.TopicPartition
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -31,7 +32,9 @@ case class TopicAndPartition(topic: String, partition: Int) {
 
   def this(replica: Replica) = this(replica.topic, replica.partitionId)
 
+  def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)
+
   def asTuple = (topic, partition)
 
   override def toString = "[%s,%d]".format(topic, partition)
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 0eb52bb..e649946 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
-import kafka.common.{MessageFormatter, TopicAndPartition, _}
+import kafka.common.{MessageFormatter, _}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
@@ -234,9 +234,9 @@ class GroupMetadataManager(val brokerId: Int,
     // construct the message set to append
     getMagicAndTimestamp(partitionFor(group.groupId)) match {
       case Some((magicValue, timestampType, timestamp)) =>
-        val records = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+        val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
           Record.create(magicValue, timestampType, timestamp,
-            GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
+            GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition.topic, topicPartition.partition),
             GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
         }.toSeq
 
@@ -260,15 +260,15 @@ class GroupMetadataManager(val brokerId: Int,
             group synchronized {
               if (statusError == Errors.NONE) {
                 if (!group.is(Dead)) {
-                  filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
-                    group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+                  filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
+                    group.completePendingOffsetWrite(topicPartition, offsetAndMetadata)
                   }
                 }
                 Errors.NONE.code
               } else {
                 if (!group.is(Dead)) {
-                  filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
-                    group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+                  filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
+                    group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
                   }
                 }
 
@@ -298,11 +298,11 @@ class GroupMetadataManager(val brokerId: Int,
             }
 
           // compute the final error codes for the commit response
-          val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+          val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
             if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-              (topicAndPartition, responseCode)
+              (topicPartition, responseCode)
             else
-              (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
+              (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
           }
 
           // finally trigger the callback logic passed from the API layer
@@ -316,8 +316,8 @@ class GroupMetadataManager(val brokerId: Int,
         Some(DelayedStore(entries, putCacheCallback))
 
       case None =>
-        val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
-          (topicAndPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+        val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
+          (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
         }
         responseCallback(commitStatus)
         None
@@ -366,7 +366,7 @@ class GroupMetadataManager(val brokerId: Int,
    */
   def loadGroupsForPartition(offsetsPartition: Int,
                              onGroupLoaded: GroupMetadata => Unit) {
-    val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+    val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
     scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
 
     def loadGroupsAndOffsets() {
@@ -510,7 +510,7 @@ class GroupMetadataManager(val brokerId: Int,
    */
   def removeGroupsForPartition(offsetsPartition: Int,
                                onGroupUnloaded: GroupMetadata => Unit) {
-    val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+    val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
     scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
 
     def removeGroupsAndOffsets() {
@@ -532,11 +532,11 @@ class GroupMetadataManager(val brokerId: Int,
         }
       }
 
-      if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
-        .format(numOffsetsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
+      if (numOffsetsRemoved > 0)
+        info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.")
 
-      if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
-        .format(numGroupsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
+      if (numGroupsRemoved > 0)
+        info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.")
     }
   }
 
@@ -557,11 +557,11 @@ class GroupMetadataManager(val brokerId: Int,
       }
 
       val offsetsPartition = partitionFor(groupId)
+      val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
       getMagicAndTimestamp(offsetsPartition) match {
         case Some((magicValue, timestampType, timestamp)) =>
-          val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+          val partitionOpt = replicaManager.getPartition(appendPartition)
           partitionOpt.foreach { partition =>
-            val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
             val tombstones = expiredOffsets.map { case (topicPartition, offsetAndMetadata) =>
               trace(s"Removing expired offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
               val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
@@ -589,7 +589,7 @@ class GroupMetadataManager(val brokerId: Int,
               } catch {
                 case t: Throwable =>
                   error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired offsets and/or metadata for group $groupId.", t)
-                // ignore and continue
+                  // ignore and continue
               }
             }
           }
@@ -603,7 +603,7 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
-    val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, partitionId)
+    val partitionOpt = replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, partitionId))
 
     val hw = partitionOpt.map { partition =>
       partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -648,8 +648,8 @@ class GroupMetadataManager(val brokerId: Int,
    * @return  Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
    */
   private def getMagicAndTimestamp(partition: Int): Option[(Byte, TimestampType, Long)] = {
-    val groupMetadataTopicAndPartition = TopicAndPartition(Topic.GroupMetadataTopicName, partition)
-    replicaManager.getMagicAndTimestampType(groupMetadataTopicAndPartition).map { case (messageFormatVersion, timestampType) =>
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partition)
+    replicaManager.getMagicAndTimestampType(groupMetadataTopicPartition).map { case (messageFormatVersion, timestampType) =>
       val timestamp = if (messageFormatVersion == Record.MAGIC_VALUE_V0) Record.NO_TIMESTAMP else time.milliseconds()
       (messageFormatVersion, timestampType, timestamp)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6cd7953..8dea5ca 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -36,6 +36,7 @@ import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.{Time, Utils}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
+import org.apache.kafka.common.TopicPartition
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Record.NO_TIMESTAMP, -1L, Record.NO_TIMESTAMP,
@@ -121,9 +122,9 @@ class Log(@volatile var dir: File,
       .format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
   }
 
-  val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
+  val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
 
-  private val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
+  private val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
 
   newGauge("NumLogSegments",
     new Gauge[Int] {
@@ -390,7 +391,7 @@ class Log(@volatile var dir: File,
               if (logEntry.sizeInBytes > config.maxMessageSize) {
                 // we record the original message set size instead of the trimmed size
                 // to be consistent with pre-compression bytesRejectedRate recording
-                BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+                BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                 BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                 throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
                   .format(logEntry.sizeInBytes, config.maxMessageSize))
@@ -480,7 +481,7 @@ class Log(@volatile var dir: File,
       // Check if the message sizes are valid.
       val messageSize = entry.sizeInBytes
       if(messageSize > config.maxMessageSize) {
-        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+        BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
         BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
         throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
           .format(messageSize, config.maxMessageSize))
@@ -611,7 +612,7 @@ class Log(@volatile var dir: File,
         targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP &&
         targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP)
       throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
-          s"for partition $topicAndPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
+          s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
           s"required version $KAFKA_0_10_0_IV0")
 
     // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
@@ -1120,34 +1121,31 @@ object Log {
   /**
    * Parse the topic and partition out of the directory name of a log
    */
-  def parseTopicPartitionName(dir: File): TopicAndPartition = {
-    val dirName = dir.getName
-    if (dirName == null || dirName.isEmpty || !dirName.contains('-')) {
-      throwException(dir)
+  def parseTopicPartitionName(dir: File): TopicPartition = {
+
+    def exception(dir: File): KafkaException = {
+      new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
+        "'" + dir.getName + "' is not in the form of topic-partition\n" +
+        "If a directory does not contain Kafka topic data it should not exist in Kafka's log " +
+        "directory")
     }
 
+    val dirName = dir.getName
+    if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
+      throw exception(dir)
+
     val name: String =
-      if (dirName.endsWith(DeleteDirSuffix)) {
-        dirName.substring(0, dirName.indexOf('.'))
-      } else {
-        dirName
-      }
+      if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.indexOf('.'))
+      else dirName
 
     val index = name.lastIndexOf('-')
-    val topic: String = name.substring(0, index)
-    val partition: String = name.substring(index + 1)
-    if (topic.length < 1 || partition.length < 1) {
-      throwException(dir)
-    }
-    TopicAndPartition(topic, partition.toInt)
-  }
+    val topic = name.substring(0, index)
+    val partition = name.substring(index + 1)
+    if (topic.length < 1 || partition.length < 1)
+      throw exception(dir)
 
-
-  def throwException(dir: File) {
-    throw new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
-      "'" + dir.getName + "' is not in the form of topic-partition\n" +
-      "If a directory does not contain Kafka topic data it should not exist in Kafka's log " +
-      "directory")
+    new TopicPartition(topic, partition.toInt)
   }
+
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7abd1d8..620ae9b 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -29,6 +29,7 @@ import kafka.utils._
 import org.apache.kafka.common.record.{FileRecords, LogEntry, MemoryRecords}
 import org.apache.kafka.common.utils.Time
 import MemoryRecords.LogEntryFilter
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection._
 import JavaConverters._
@@ -70,7 +71,7 @@ import JavaConverters._
  */
 class LogCleaner(val config: CleanerConfig,
                  val logDirs: Array[File],
-                 val logs: Pool[TopicAndPartition, Log], 
+                 val logs: Pool[TopicPartition, Log],
                  time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
   
   /* for managing the state of partitions being cleaned. package-private to allow access in tests */
@@ -127,8 +128,8 @@ class LogCleaner(val config: CleanerConfig,
    *  Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
    *  the partition is aborted.
    */
-  def abortCleaning(topicAndPartition: TopicAndPartition) {
-    cleanerManager.abortCleaning(topicAndPartition)
+  def abortCleaning(topicPartition: TopicPartition) {
+    cleanerManager.abortCleaning(topicPartition)
   }
 
   /**
@@ -141,38 +142,37 @@ class LogCleaner(val config: CleanerConfig,
   /**
    * Truncate cleaner offset checkpoint for the given partition if its checkpointed offset is larger than the given offset
    */
-  def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
-    cleanerManager.maybeTruncateCheckpoint(dataDir, topicAndPartition, offset)
+  def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: Long) {
+    cleanerManager.maybeTruncateCheckpoint(dataDir, topicPartition, offset)
   }
 
   /**
    *  Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
    *  This call blocks until the cleaning of the partition is aborted and paused.
    */
-  def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
-    cleanerManager.abortAndPauseCleaning(topicAndPartition)
+  def abortAndPauseCleaning(topicPartition: TopicPartition) {
+    cleanerManager.abortAndPauseCleaning(topicPartition)
   }
 
   /**
    *  Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
    */
-  def resumeCleaning(topicAndPartition: TopicAndPartition) {
-    cleanerManager.resumeCleaning(topicAndPartition)
+  def resumeCleaning(topicPartition: TopicPartition) {
+    cleanerManager.resumeCleaning(topicPartition)
   }
 
   /**
    * For testing, a way to know when work has completed. This method waits until the
    * cleaner has processed up to the given offset on the specified topic/partition
    *
-   * @param topic The Topic to be cleaned
-   * @param part The partition of the topic to be cleaned
+   * @param topicPartition The topic and partition to be cleaned
    * @param offset The first dirty offset that the cleaner doesn't have to clean
    * @param maxWaitMs The maximum time in ms to wait for cleaner
    *
    * @return A boolean indicating whether the work has completed before timeout
    */
-  def awaitCleaned(topic: String, part: Int, offset: Long, maxWaitMs: Long = 60000L): Boolean = {
-    def isCleaned = cleanerManager.allCleanerCheckpoints.get(TopicAndPartition(topic, part)).fold(false)(_ >= offset)
+  def awaitCleaned(topicPartition: TopicPartition, offset: Long, maxWaitMs: Long = 60000L): Boolean = {
+    def isCleaned = cleanerManager.allCleanerCheckpoints.get(topicPartition).fold(false)(_ >= offset)
     var remainingWaitMs = maxWaitMs
     while (!isCleaned && remainingWaitMs > 0) {
       val sleepTime = math.min(100, remainingWaitMs)
@@ -207,10 +207,10 @@ class LogCleaner(val config: CleanerConfig,
     @volatile var lastStats: CleanerStats = new CleanerStats()
     private val backOffWaitLatch = new CountDownLatch(1)
 
-    private def checkDone(topicAndPartition: TopicAndPartition) {
+    private def checkDone(topicPartition: TopicPartition) {
       if (!isRunning.get())
         throw new ThreadShutdownException
-      cleanerManager.checkCleaningAborted(topicAndPartition)
+      cleanerManager.checkCleaningAborted(topicPartition)
     }
 
     /**
@@ -248,7 +248,7 @@ class LogCleaner(val config: CleanerConfig,
           }
           true
       }
-      val deletable: Iterable[(TopicAndPartition, Log)] = cleanerManager.deletableLogs()
+      val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
       deletable.foreach{
         case (topicPartition, log) =>
           try {
@@ -311,7 +311,7 @@ private[log] class Cleaner(val id: Int,
                            dupBufferLoadFactor: Double,
                            throttler: Throttler,
                            time: Time,
-                           checkDone: (TopicAndPartition) => Unit) extends Logging {
+                           checkDone: (TopicPartition) => Unit) extends Logging {
   
   override val loggerName = classOf[LogCleaner].getName
 
@@ -402,7 +402,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.lastModified > deleteHorizonMs
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
             .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
+        cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
       }
 
       // trim excess index
@@ -435,7 +435,7 @@ private[log] class Cleaner(val id: Int,
    * Clean the given source log segment into the destination segment using the key=>offset mapping
    * provided
    *
-   * @param topicAndPartition The topic and partition of the log segment to clean
+   * @param topicPartition The topic and partition of the log segment to clean
    * @param source The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
@@ -443,13 +443,14 @@ private[log] class Cleaner(val id: Int,
    * @param maxLogMessageSize The maximum message size of the corresponding topic
    * @param stats Collector for cleaning statistics
    */
-  private[log] def cleanInto(topicAndPartition: TopicAndPartition,
+  private[log] def cleanInto(topicPartition: TopicPartition,
                              source: LogSegment,
                              dest: LogSegment,
                              map: OffsetMap,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
                              stats: CleanerStats) {
+
     def shouldRetainEntry(logEntry: LogEntry): Boolean =
       shouldRetainMessage(source, map, retainDeletes, logEntry, stats)
 
@@ -459,7 +460,7 @@ private[log] class Cleaner(val id: Int,
 
     var position = 0
     while (position < source.log.sizeInBytes) {
-      checkDone(topicAndPartition)
+      checkDone(topicPartition)
       // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
       readBuffer.clear()
       writeBuffer.clear()
@@ -598,9 +599,9 @@ private[log] class Cleaner(val id: Int,
     // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
     var full = false
     for (segment <- dirty if !full) {
-      checkDone(log.topicAndPartition)
+      checkDone(log.topicPartition)
 
-      full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, start, log.config.maxMessageSize, stats)
+      full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize, stats)
       if (full)
         debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
     }
@@ -616,7 +617,7 @@ private[log] class Cleaner(val id: Int,
    *
    * @return If the map was filled whilst loading from this segment
    */
-  private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition,
+  private def buildOffsetMapForSegment(topicPartition: TopicPartition,
                                        segment: LogSegment,
                                        map: OffsetMap,
                                        start: Long,
@@ -625,7 +626,7 @@ private[log] class Cleaner(val id: Int,
     var position = segment.index.lookup(start).position
     val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
-      checkDone(topicAndPartition)
+      checkDone(topicPartition)
       readBuffer.clear()
       segment.log.readInto(readBuffer, position)
       val records = MemoryRecords.readableRecords(readBuffer)
@@ -710,7 +711,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
 /**
  * Helper class for a log, its topic/partition, the first cleanable position, and the first uncleanable dirty position
  */
-private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
+private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
   private[this] val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
   val firstUncleanableOffset = firstUncleanableSegment.baseOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 681042e..1290ada 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -22,11 +22,12 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
-import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
+import kafka.common.LogCleaningAbortedException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.OffsetCheckpoint
 import kafka.utils.CoreUtils._
 import kafka.utils.{Logging, Pool}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{immutable, mutable}
@@ -44,7 +45,7 @@ private[log] case object LogCleaningPaused extends LogCleaningState
  *  While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
  *  requested to be resumed.
  */
-private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
+private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicPartition, Log]) extends Logging with KafkaMetricsGroup {
 
   import LogCleanerManager._
 
@@ -57,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
 
   /* the set of logs currently being cleaned */
-  private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
+  private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()
 
   /* a global lock used to control all access to the in-progress set and the offset checkpoints */
   private val lock = new ReentrantLock
@@ -72,7 +73,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   /**
    * @return the position processed for all logs.
    */
-  def allCleanerCheckpoints: Map[TopicAndPartition, Long] =
+  def allCleanerCheckpoints: Map[TopicPartition, Long] =
     checkpoints.values.flatMap(_.read()).toMap
 
    /**
@@ -87,12 +88,12 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
       val dirtyLogs = logs.filter {
         case (_, log) => log.config.compact  // match logs that are marked as compacted
       }.filterNot {
-        case (topicAndPartition, _) => inProgress.contains(topicAndPartition) // skip any logs already in-progress
+        case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
       }.map {
-        case (topicAndPartition, log) => // create a LogToClean instance for each
-          val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicAndPartition,
+        case (topicPartition, log) => // create a LogToClean instance for each
+          val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
             lastClean, now)
-          LogToClean(topicAndPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
+          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
       }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
 
       this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
@@ -111,10 +112,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   /**
     * Find any logs that have compact and delete enabled
     */
-  def deletableLogs(): Iterable[(TopicAndPartition, Log)] = {
+  def deletableLogs(): Iterable[(TopicPartition, Log)] = {
     inLock(lock) {
-      val toClean = logs.filter { case (topicAndPartition, log) =>
-        !inProgress.contains(topicAndPartition) && isCompactAndDelete(log)
+      val toClean = logs.filter { case (topicPartition, log) =>
+        !inProgress.contains(topicPartition) && isCompactAndDelete(log)
       }
       toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
       toClean
@@ -127,12 +128,12 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
    *  the partition is aborted.
    *  This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
    */
-  def abortCleaning(topicAndPartition: TopicAndPartition) {
+  def abortCleaning(topicPartition: TopicPartition) {
     inLock(lock) {
-      abortAndPauseCleaning(topicAndPartition)
-      resumeCleaning(topicAndPartition)
+      abortAndPauseCleaning(topicPartition)
+      resumeCleaning(topicPartition)
     }
-    info(s"The cleaning for partition $topicAndPartition is aborted")
+    info(s"The cleaning for partition $topicPartition is aborted")
   }
 
   /**
@@ -145,50 +146,50 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
    *  4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
    *  5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
    */
-  def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
+  def abortAndPauseCleaning(topicPartition: TopicPartition) {
     inLock(lock) {
-      inProgress.get(topicAndPartition) match {
+      inProgress.get(topicPartition) match {
         case None =>
-          inProgress.put(topicAndPartition, LogCleaningPaused)
+          inProgress.put(topicPartition, LogCleaningPaused)
         case Some(state) =>
           state match {
             case LogCleaningInProgress =>
-              inProgress.put(topicAndPartition, LogCleaningAborted)
+              inProgress.put(topicPartition, LogCleaningAborted)
             case s =>
-              throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be aborted and paused since it is in $s state.")
+              throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.")
           }
       }
-      while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
+      while (!isCleaningInState(topicPartition, LogCleaningPaused))
         pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
     }
-    info(s"The cleaning for partition $topicAndPartition is aborted and paused")
+    info(s"The cleaning for partition $topicPartition is aborted and paused")
   }
 
   /**
    *  Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
    */
-  def resumeCleaning(topicAndPartition: TopicAndPartition) {
+  def resumeCleaning(topicPartition: TopicPartition) {
     inLock(lock) {
-      inProgress.get(topicAndPartition) match {
+      inProgress.get(topicPartition) match {
         case None =>
-          throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is not paused.")
+          throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.")
         case Some(state) =>
           state match {
             case LogCleaningPaused =>
-              inProgress.remove(topicAndPartition)
+              inProgress.remove(topicPartition)
             case s =>
-              throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is in $s state.")
+              throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.")
           }
       }
     }
-    info(s"Compaction for partition $topicAndPartition is resumed")
+    info(s"Compaction for partition $topicPartition is resumed")
   }
 
   /**
    *  Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
    */
-  private def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
-    inProgress.get(topicAndPartition) match {
+  private def isCleaningInState(topicPartition: TopicPartition, expectedState: LogCleaningState): Boolean = {
+    inProgress.get(topicPartition) match {
       case None => false
       case Some(state) =>
         if (state == expectedState)
@@ -201,14 +202,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   /**
    *  Check if the cleaning for a partition is aborted. If so, throw an exception.
    */
-  def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
+  def checkCleaningAborted(topicPartition: TopicPartition) {
     inLock(lock) {
-      if (isCleaningInState(topicAndPartition, LogCleaningAborted))
+      if (isCleaningInState(topicPartition, LogCleaningAborted))
         throw new LogCleaningAbortedException()
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]) {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       val existing = checkpoint.read().filterKeys(logs.keys) ++ update
@@ -216,14 +217,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     }
   }
 
-  def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
+  def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: Long) {
     inLock(lock) {
-      if (logs.get(topicAndPartition).config.compact) {
+      if (logs.get(topicPartition).config.compact) {
         val checkpoint = checkpoints(dataDir)
         val existing = checkpoint.read()
 
-        if (existing.getOrElse(topicAndPartition, 0L) > offset)
-          checkpoint.write(existing + (topicAndPartition -> offset))
+        if (existing.getOrElse(topicPartition, 0L) > offset)
+          checkpoint.write(existing + (topicPartition -> offset))
       }
     }
   }
@@ -231,24 +232,24 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   /**
    * Save out the endOffset and remove the given log from the in-progress set, if not aborted.
    */
-  def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
+  def doneCleaning(topicPartition: TopicPartition, dataDir: File, endOffset: Long) {
     inLock(lock) {
-      inProgress(topicAndPartition) match {
+      inProgress(topicPartition) match {
         case LogCleaningInProgress =>
-          updateCheckpoints(dataDir,Option(topicAndPartition, endOffset))
-          inProgress.remove(topicAndPartition)
+          updateCheckpoints(dataDir,Option(topicPartition, endOffset))
+          inProgress.remove(topicPartition)
         case LogCleaningAborted =>
-          inProgress.put(topicAndPartition, LogCleaningPaused)
+          inProgress.put(topicPartition, LogCleaningPaused)
           pausedCleaningCond.signalAll()
         case s =>
-          throw new IllegalStateException(s"In-progress partition $topicAndPartition cannot be in $s state.")
+          throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.")
       }
     }
   }
 
-  def doneDeleting(topicAndPartition: TopicAndPartition): Unit = {
+  def doneDeleting(topicPartition: TopicPartition): Unit = {
     inLock(lock) {
-      inProgress.remove(topicAndPartition)
+      inProgress.remove(topicPartition)
     }
   }
 }
@@ -268,10 +269,10 @@ private[log] object LogCleanerManager extends Logging {
     * @param now the current time in milliseconds of the cleaning operation
     * @return the lower (inclusive) and upper (exclusive) offsets
     */
-  def cleanableOffsets(log: Log, topicAndPartition: TopicAndPartition, lastClean: immutable.Map[TopicAndPartition, Long], now: Long): (Long, Long) = {
+  def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean: immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = {
 
     // the checkpointed offset, ie., the first offset of the next dirty segment
-    val lastCleanOffset: Option[Long] = lastClean.get(topicAndPartition)
+    val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)
 
     // If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
     // reset to the log starting offset and log the error
@@ -312,7 +313,7 @@ private[log] object LogCleanerManager extends Logging {
         } else None
       ).flatten.min
 
-    debug(s"Finding range of cleanable offsets for log=${log.name} topicAndPartition=$topicAndPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
+    debug(s"Finding range of cleanable offsets for log=${log.name} topicPartition=$topicPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
 
     (firstDirtyOffset, firstUncleanableDirtyOffset)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 953fca4..8cd9b34 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -24,9 +24,11 @@ import kafka.utils._
 
 import scala.collection._
 import scala.collection.JavaConverters._
-import kafka.common.{KafkaStorageException, KafkaException, TopicAndPartition}
+import kafka.common.{KafkaException, KafkaStorageException}
 import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
 import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
+
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 
 /**
@@ -56,7 +58,7 @@ class LogManager(val logDirs: Array[File],
   val InitialTaskDelayMs = 30*1000
 
   private val logCreationOrDeletionLock = new Object
-  private val logs = new Pool[TopicAndPartition, Log]()
+  private val logs = new Pool[TopicPartition, Log]()
   private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
 
   createAndValidateLogDirs(logDirs)
@@ -132,7 +134,7 @@ class LogManager(val logDirs: Array[File],
         brokerState.newState(RecoveringFromUncleanShutdown)
       }
 
-      var recoveryPoints = Map[TopicAndPartition, Long]()
+      var recoveryPoints = Map[TopicPartition, Long]()
       try {
         recoveryPoints = this.recoveryPointCheckpoints(dir).read
       } catch {
@@ -285,21 +287,21 @@ class LogManager(val logDirs: Array[File],
   /**
    * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
    *
-   * @param partitionAndOffsets Partition logs that need to be truncated
+   * @param partitionOffsets Partition logs that need to be truncated
    */
-  def truncateTo(partitionAndOffsets: Map[TopicAndPartition, Long]) {
-    for ((topicAndPartition, truncateOffset) <- partitionAndOffsets) {
-      val log = logs.get(topicAndPartition)
+  def truncateTo(partitionOffsets: Map[TopicPartition, Long]) {
+    for ((topicPartition, truncateOffset) <- partitionOffsets) {
+      val log = logs.get(topicPartition)
       // If the log does not exist, skip it
       if (log != null) {
         //May need to abort and pause the cleaning of the log, and resume after truncation is done.
         val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset
         if (needToStopCleaner && cleaner != null)
-          cleaner.abortAndPauseCleaning(topicAndPartition)
+          cleaner.abortAndPauseCleaning(topicPartition)
         log.truncateTo(truncateOffset)
         if (needToStopCleaner && cleaner != null) {
-          cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset)
-          cleaner.resumeCleaning(topicAndPartition)
+          cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+          cleaner.resumeCleaning(topicPartition)
         }
       }
     }
@@ -310,17 +312,17 @@ class LogManager(val logDirs: Array[File],
    *  Delete all data in a partition and start the log at the new offset
    *  @param newOffset The new offset to start the log with
    */
-  def truncateFullyAndStartAt(topicAndPartition: TopicAndPartition, newOffset: Long) {
-    val log = logs.get(topicAndPartition)
+  def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long) {
+    val log = logs.get(topicPartition)
     // If the log does not exist, skip it
     if (log != null) {
         //Abort and pause the cleaning of the log, and resume after truncation is done.
       if (cleaner != null)
-        cleaner.abortAndPauseCleaning(topicAndPartition)
+        cleaner.abortAndPauseCleaning(topicPartition)
       log.truncateFullyAndStartAt(newOffset)
       if (cleaner != null) {
-        cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset)
-        cleaner.resumeCleaning(topicAndPartition)
+        cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+        cleaner.resumeCleaning(topicPartition)
       }
     }
     checkpointRecoveryPointOffsets()
@@ -347,8 +349,8 @@ class LogManager(val logDirs: Array[File],
   /**
    * Get the log if it exists, otherwise return None
    */
-  def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
-    val log = logs.get(topicAndPartition)
+  def getLog(topicPartition: TopicPartition): Option[Log] = {
+    val log = logs.get(topicPartition)
     if (log == null)
       None
     else
@@ -359,9 +361,9 @@ class LogManager(val logDirs: Array[File],
    * Create a log for the given topic and the given partition
    * If the log already exists, just return a copy of the existing log
    */
-  def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
+  def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
     logCreationOrDeletionLock synchronized {
-      var log = logs.get(topicAndPartition)
+      var log = logs.get(topicPartition)
       
       // check if the log has already been created in another thread
       if(log != null)
@@ -369,17 +371,17 @@ class LogManager(val logDirs: Array[File],
       
       // if not, create it
       val dataDir = nextLogDir()
-      val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
+      val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
       dir.mkdirs()
       log = new Log(dir, 
                     config,
                     recoveryPoint = 0L,
                     scheduler,
                     time)
-      logs.put(topicAndPartition, log)
+      logs.put(topicPartition, log)
       info("Created log for partition [%s,%d] in %s with properties {%s}."
-           .format(topicAndPartition.topic,
-                   topicAndPartition.partition,
+           .format(topicPartition.topic,
+                   topicPartition.partition,
                    dataDir.getAbsolutePath,
                    config.originals.asScala.mkString(", ")))
       log
@@ -397,7 +399,7 @@ class LogManager(val logDirs: Array[File],
         if (removedLog != null) {
           try {
             removedLog.delete()
-            info(s"Deleted log for partition ${removedLog.topicAndPartition} in ${removedLog.dir.getAbsolutePath}.")
+            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
           } catch {
             case e: Throwable =>
               error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
@@ -415,16 +417,16 @@ class LogManager(val logDirs: Array[File],
   /**
     * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and 
     * add it in the queue for deletion. 
-    * @param topicAndPartition TopicPartition that needs to be deleted
+    * @param topicPartition TopicPartition that needs to be deleted
     */
-  def asyncDelete(topicAndPartition: TopicAndPartition) = {
+  def asyncDelete(topicPartition: TopicPartition) = {
     val removedLog: Log = logCreationOrDeletionLock synchronized {
-                            logs.remove(topicAndPartition)
+                            logs.remove(topicPartition)
                           }
     if (removedLog != null) {
       //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
       if (cleaner != null) {
-        cleaner.abortCleaning(topicAndPartition)
+        cleaner.abortCleaning(topicPartition)
         cleaner.updateCheckpoints(removedLog.dir.getParentFile)
       }
       // renaming the directory to topic-partition.uniqueId-delete
@@ -446,7 +448,7 @@ class LogManager(val logDirs: Array[File],
 
         logsToBeDeleted.add(removedLog)
         removedLog.removeLogMetrics()
-        info(s"Log for partition ${removedLog.topicAndPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
+        info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
       } else {
         throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
       }
@@ -495,9 +497,9 @@ class LogManager(val logDirs: Array[File],
   def allLogs(): Iterable[Log] = logs.values
 
   /**
-   * Get a map of TopicAndPartition => Log
+   * Get a map of TopicPartition => Log
    */
-  def logsByTopicPartition: Map[TopicAndPartition, Log] = logs.toMap
+  def logsByTopicPartition: Map[TopicPartition, Log] = logs.toMap
 
   /**
    * Map of log dir to logs by topic and partitions in that dir
@@ -514,16 +516,16 @@ class LogManager(val logDirs: Array[File],
   private def flushDirtyLogs() = {
     debug("Checking for dirty logs to flush...")
 
-    for ((topicAndPartition, log) <- logs) {
+    for ((topicPartition, log) <- logs) {
       try {
         val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
-        debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval  " + log.config.flushMs +
+        debug("Checking if flush is needed on " + topicPartition.topic + " flush interval  " + log.config.flushMs +
               " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= log.config.flushMs)
           log.flush
       } catch {
         case e: Throwable =>
-          error("Error flushing topic " + topicAndPartition.topic, e)
+          error("Error flushing topic " + topicPartition.topic, e)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 5e584ab..0a17f8e 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -73,8 +73,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
 
   def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
     mapLock synchronized {
-      val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
-        BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
+      val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) =>
+        BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
       for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
         var fetcherThread: AbstractFetcherThread = null
         fetcherThreadMap.get(brokerAndFetcherId) match {
@@ -91,8 +91,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
       }
     }
 
-    info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
-      "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
+    info("Added fetcher for partitions %s".format(partitionAndOffsets.map { case (topicPartition, brokerAndInitialOffset) =>
+      "[" + topicPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
   }
 
   def removeFetcherForPartitions(partitions: Set[TopicPartition]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 743c994..0a1884a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -19,7 +19,6 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 
-import kafka.common.TopicAndPartition
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
@@ -43,7 +42,7 @@ case class FetchMetadata(fetchMinBytes: Int,
                          fetchOnlyCommitted: Boolean,
                          isFromFollower: Boolean,
                          replicaId: Int,
-                         fetchPartitionStatus: Seq[(TopicAndPartition, FetchPartitionStatus)]) {
+                         fetchPartitionStatus: Seq[(TopicPartition, FetchPartitionStatus)]) {
 
   override def toString = "[minBytes: " + fetchMinBytes + ", " +
                           "onlyLeader:" + fetchOnlyLeader + ", "
@@ -58,7 +57,7 @@ class DelayedFetch(delayMs: Long,
                    fetchMetadata: FetchMetadata,
                    replicaManager: ReplicaManager,
                    quota: ReplicaQuota,
-                   responseCallback: Seq[(TopicAndPartition, FetchPartitionData)] => Unit)
+                   responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
   extends DelayedOperation(delayMs) {
 
   /**
@@ -75,11 +74,11 @@ class DelayedFetch(delayMs: Long,
     var accumulatedSize = 0
     var accumulatedThrottledSize = 0
     fetchMetadata.fetchPartitionStatus.foreach {
-      case (topicAndPartition, fetchStatus) =>
+      case (topicPartition, fetchStatus) =>
         val fetchOffset = fetchStatus.startOffsetMetadata
         try {
           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
-            val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
+            val replica = replicaManager.getLeaderReplicaIfLocal(topicPartition)
             val endOffset =
               if (fetchMetadata.fetchOnlyCommitted)
                 replica.highWatermark
@@ -92,19 +91,19 @@ class DelayedFetch(delayMs: Long,
             if (endOffset.messageOffset != fetchOffset.messageOffset) {
               if (endOffset.onOlderSegment(fetchOffset)) {
                 // Case C, this can happen when the new fetch operation is on a truncated leader
-                debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition))
+                debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicPartition))
                 return forceComplete()
               } else if (fetchOffset.onOlderSegment(endOffset)) {
                 // Case C, this can happen when the fetch operation is falling behind the current segment
                 // or the partition has just rolled a new segment
                 debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
                 // We will not force complete the fetch request if a replica should be throttled.
-                if (!replicaManager.shouldLeaderThrottle(quota, topicAndPartition, fetchMetadata.replicaId))
+                if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
                   return forceComplete()
               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                 // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
                 val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
-                if (quota.isThrottled(topicAndPartition))
+                if (quota.isThrottled(topicPartition))
                   accumulatedThrottledSize += bytesAvailable
                 else
                   accumulatedSize += bytesAvailable
@@ -113,10 +112,10 @@ class DelayedFetch(delayMs: Long,
           }
         } catch {
           case _: UnknownTopicOrPartitionException => // Case B
-            debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
+            debug("Broker no longer know of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
             return forceComplete()
           case _: NotLeaderForPartitionException =>  // Case A
-            debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
+            debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
             return forceComplete()
         }
     }
@@ -146,7 +145,7 @@ class DelayedFetch(delayMs: Long,
       readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
       fetchMaxBytes = fetchMetadata.fetchMaxBytes,
       hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
-      readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => new TopicPartition(tp.topic, tp.partition) -> status.fetchInfo },
+      readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
       quota = quota
     )
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 5a59d3b..1af0bfb 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -81,11 +81,11 @@ class DelayedProduce(delayMs: Long,
    */
   override def tryComplete(): Boolean = {
     // check for each partition if it still has pending acks
-    produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
-      trace(s"Checking produce satisfaction for ${topicAndPartition}, current status $status")
+    produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
+      trace(s"Checking produce satisfaction for ${topicPartition}, current status $status")
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
-        val (hasEnough, error) = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) match {
+        val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
           case Some(partition) =>
             partition.checkEnoughReplicasReachOffset(status.requiredOffset)
           case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c66f9e3..7fd8c2b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -461,9 +461,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     // the callback for sending a fetch response
-    def sendResponseCallback(responsePartitionData: Seq[(TopicAndPartition, FetchPartitionData)]) {
+    def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
       val convertedPartitionData = {
-        // Need to down-convert message when consumer only takes magic value 0.
         responsePartitionData.map { case (tp, data) =>
 
           // We only do down-conversion when:
@@ -480,7 +479,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0))
           } else data
 
-          new TopicPartition(tp.topic, tp.partition) -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records)
+          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records)
         }
       }
 
@@ -544,7 +543,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                                         quota: ReplicationQuotaManager): Int = {
     val partitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
     mergedPartitionData.foreach { case (tp, data) =>
-      if (quota.isThrottled(TopicAndPartition(tp.topic(), tp.partition())))
+      if (quota.isThrottled(tp))
         partitionData.put(tp, data)
     }
     FetchResponse.sizeOf(versionId, partitionData)
@@ -586,9 +585,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       try {
         // ensure leader exists
         val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
-          replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition)
+          replicaManager.getLeaderReplicaIfLocal(topicPartition)
         else
-          replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition)
+          replicaManager.getReplicaOrException(topicPartition)
         val offsets = {
           val allOffsets = fetchOffsets(replicaManager.logManager,
             topicPartition,
@@ -648,9 +647,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
           // ensure leader exists
           val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
-            replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition)
+            replicaManager.getLeaderReplicaIfLocal(topicPartition)
           else
-            replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition)
+            replicaManager.getReplicaOrException(topicPartition)
 
           val found = {
             if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
@@ -690,7 +689,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def fetchOffsets(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
-    logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition)) match {
+    logManager.getLog(topicPartition) match {
       case Some(log) =>
         fetchOffsetsBefore(log, timestamp, maxNumOffsets)
       case None =>
@@ -702,7 +701,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = {
-    logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition)) match {
+    logManager.getLog(topicPartition) match {
       case Some(log) =>
         log.fetchOffsetsByTimestamp(timestamp)
       case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index a39fe49..c838e09 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -26,6 +26,8 @@ import kafka.utils.Logging
 import kafka.common._
 import java.io._
 
+import org.apache.kafka.common.TopicPartition
+
 object OffsetCheckpoint {
   private val WhiteSpacesPattern = Pattern.compile("\\s+")
   private val CurrentVersion = 0
@@ -41,7 +43,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
   private val lock = new Object()
   file.createNewFile() // in case the file doesn't exist
 
-  def write(offsets: Map[TopicAndPartition, Long]) {
+  def write(offsets: Map[TopicPartition, Long]) {
     lock synchronized {
       // write to temp file and then swap with the existing file
       val fileOutputStream = new FileOutputStream(tempPath.toFile)
@@ -75,7 +77,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
     }
   }
 
-  def read(): Map[TopicAndPartition, Long] = {
+  def read(): Map[TopicPartition, Long] = {
 
     def malformedLineException(line: String) =
       new IOException(s"Malformed line in offset checkpoint file: $line'")
@@ -94,12 +96,12 @@ class OffsetCheckpoint(val file: File) extends Logging {
             if (line == null)
               return Map.empty
             val expectedSize = line.toInt
-            val offsets = mutable.Map[TopicAndPartition, Long]()
+            val offsets = mutable.Map[TopicPartition, Long]()
             line = reader.readLine()
             while (line != null) {
               WhiteSpacesPattern.split(line) match {
                 case Array(topic, partition, offset) =>
-                  offsets += TopicAndPartition(topic, partition.toInt) -> offset.toLong
+                  offsets += new TopicPartition(topic, partition.toInt) -> offset.toLong
                   line = reader.readLine()
                 case _ => throw malformedLineException(line)
               }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 65e7c9e..671ad63 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -16,8 +16,8 @@
   */
 package kafka.server
 
-import kafka.common.TopicAndPartition
 import kafka.server.QuotaType._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 
@@ -32,7 +32,7 @@ sealed trait QuotaType
 object QuotaFactory {
 
   object UnboundedQuota extends ReplicaQuota {
-    override def isThrottled(topicAndPartition: TopicAndPartition): Boolean = false
+    override def isThrottled(topicPartition: TopicPartition): Boolean = false
     override def isQuotaExceeded(): Boolean = false
   }
 
@@ -71,4 +71,4 @@ object QuotaFactory {
       numQuotaSamples = cfg.numReplicationQuotaSamples,
       quotaWindowSizeSeconds = cfg.replicationQuotaWindowSizeSeconds
     )
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 54a2e05..d5d7a13 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -24,7 +24,7 @@ import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0}
-import kafka.common.{KafkaStorageException, TopicAndPartition}
+import kafka.common.KafkaStorageException
 import ReplicaFetcherThread._
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
@@ -115,9 +115,7 @@ class ReplicaFetcherThread(name: String,
   // process fetched data
   def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
     try {
-      val topic = topicPartition.topic
-      val partitionId = topicPartition.partition
-      val replica = replicaMgr.getReplica(topic, partitionId).get
+      val replica = replicaMgr.getReplica(topicPartition).get
       val records = partitionData.toRecords
 
       maybeWarnIfOversizedRecords(records, topicPartition)
@@ -137,9 +135,8 @@ class ReplicaFetcherThread(name: String,
       // these values will be computed upon making the leader
       replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
       if (logger.isTraceEnabled)
-        trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
-          .format(replica.brokerId, topic, partitionId, followerHighWatermark))
-      if (quota.isThrottled(TopicAndPartition(topic, partitionId)))
+        trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
+      if (quota.isThrottled(topicPartition))
         quota.record(records.sizeInBytes)
     } catch {
       case e: KafkaStorageException =>
@@ -161,8 +158,7 @@ class ReplicaFetcherThread(name: String,
    * Handle a partition whose offset is out of range and return a new fetch offset.
    */
   def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
-    val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)
-    val replica = replicaMgr.getReplica(topicPartition.topic, topicPartition.partition).get
+    val replica = replicaMgr.getReplica(topicPartition).get
 
     /**
      * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
@@ -192,7 +188,7 @@ class ReplicaFetcherThread(name: String,
 
       warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"
         .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
-      replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
+      replicaMgr.logManager.truncateTo(Map(topicPartition -> leaderEndOffset))
       leaderEndOffset
     } else {
       /**
@@ -224,7 +220,7 @@ class ReplicaFetcherThread(name: String,
       val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
       // Only truncate log when current leader's log start offset is greater than follower's log end offset.
       if (leaderStartOffset > replica.logEndOffset.messageOffset)
-        replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
+        replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset)
       offsetToFetch
     }
   }
@@ -287,9 +283,8 @@ class ReplicaFetcherThread(name: String,
     val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
 
     partitionMap.foreach { case (topicPartition, partitionFetchState) =>
-      val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition)
       // We will not include a replica in the fetch request if it should be throttled.
-      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicAndPartition))
+      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
         requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
     }
 
@@ -304,7 +299,7 @@ class ReplicaFetcherThread(name: String,
    *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
    *  the quota is exceeded and the replica is not in sync.
    */
-  private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicAndPartition): Boolean = {
+  private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = {
     val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition)
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
   }