You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/24 20:28:59 UTC

[1/2] kafka git commit: KAFKA-3259 KAFKA-3253; KIP-31/KIP-32 Follow-up

Repository: kafka
Updated Branches:
  refs/heads/trunk 1bfaddae9 -> 01aeea7c7


http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ae810eb..d13c872 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -27,12 +27,12 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, Message
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.SaslConfigs
-
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.record.TimestampType
 
-import scala.collection.{Map, immutable, JavaConverters}
+import scala.collection.{JavaConverters, Map, immutable}
 import JavaConverters._
 
 object Defaults {
@@ -94,7 +94,8 @@ object Defaults {
   val LogFlushSchedulerIntervalMs = Long.MaxValue
   val LogFlushOffsetCheckpointIntervalMs = 60000
   val LogPreAllocateEnable = false
-  val MessageFormatVersion = ApiVersion.latestVersion.toString()
+  // lazy val as `InterBrokerProtocolVersion` is defined later
+  lazy val MessageFormatVersion = InterBrokerProtocolVersion
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
@@ -423,11 +424,12 @@ object KafkaConfig {
   val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
   val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
   val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with acks=all (or -1)"
-  val MessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid ApiVersion." +
-  "Some Examples are: 0.8.2, 0.9.0.0, 0.10.0. Check ApiVersion for detail. When setting the message format version, " +
-  "user certifies that all the existing messages on disk is at or below that version. Otherwise consumers before 0.10.0.0 will break."
-  val MessageTimestampTypeDoc = "Define whether the timestamp in the message is message create time or log append time. The value should be either" +
-  " \"CreateTime\" or \"LogAppendTime\""
+  val MessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid ApiVersion. " +
+  "Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format version, the " +
+  "user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly " +
+  "will cause consumers with older versions to break as they will receive messages with a format that they don't understand."
+  val MessageTimestampTypeDoc = "Define whether the timestamp in the message is message create time or log append time. The value should be either " +
+  "`CreateTime` or `LogAppendTime`"
   val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
   "a message and the timestamp specified in the message. If message.timestamp.type=CreateTime, a message will be rejected " +
   "if the difference in timestamp exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."
@@ -798,8 +800,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val logRetentionTimeMillis = getLogRetentionTimeMillis
   val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
   val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
-  val messageFormatVersion = getString(KafkaConfig.MessageFormatVersionProp)
-  val messageTimestampType = getString(KafkaConfig.MessageTimestampTypeProp)
+  // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
+  // is passed, `0.10.0-IV0` may be picked)
+  val messageFormatVersionString = getString(KafkaConfig.MessageFormatVersionProp)
+  val messageFormatVersion = ApiVersion(messageFormatVersionString)
+  val messageTimestampType = TimestampType.forName(getString(KafkaConfig.MessageTimestampTypeProp))
   val messageTimestampDifferenceMaxMs = getLong(KafkaConfig.MessageTimestampDifferenceMaxMsProp)
 
   /** ********* Replication configuration ***********/
@@ -821,7 +826,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
   val uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
   val interBrokerSecurityProtocol = SecurityProtocol.forName(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
-  val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp))
+  // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
+  // is passed, `0.10.0-IV0` may be picked)
+  val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp)
+  val interBrokerProtocolVersion = ApiVersion(interBrokerProtocolVersionString)
 
   /** ********* Controlled shutdown configuration ***********/
   val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp)
@@ -978,7 +986,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
       s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " +
       s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}"
     )
-    require(interBrokerProtocolVersion.onOrAfter(ApiVersion(messageFormatVersion)),
-      s"message.format.version $messageFormatVersion cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersion")
+    require(interBrokerProtocolVersion >= messageFormatVersion,
+      s"message.format.version $messageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e3e185f..2203df9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -75,8 +75,8 @@ object KafkaServer {
     logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType)
     logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable)
     logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
-    logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.messageFormatVersion)
-    logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.messageTimestampType)
+    logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.messageFormatVersion.version)
+    logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.messageTimestampType.name)
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.messageTimestampDifferenceMaxMs)
     logProps
   }
@@ -200,7 +200,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaController.startup()
 
         /* start kafka coordinator */
-        consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager, kafkaMetricsTime)
+        consumerCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
         consumerCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/
@@ -515,7 +515,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
       val shutdownSucceeded =
         // Before 0.9.0.0, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in
         // `RequestHeader`, which is used by `NetworkClient`
-        if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0))
+        if (config.interBrokerProtocolVersion >= KAFKA_0_9_0)
           networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue)
         else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue)
 
@@ -591,7 +591,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
-    val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).mapValues(LogConfig.fromProps(defaultProps, _))
+    val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
+      topic -> LogConfig.fromProps(defaultProps, configs)
+    }
     // read the log configurations from zookeeper
     val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
                                       dedupeBufferSize = config.logCleanerDedupeBufferSize,

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 2fdb46c..de7269f 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -56,8 +56,8 @@ class ReplicaFetcherThread(name: String,
   type PD = PartitionData
 
   private val fetchRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_0_10_0_IV0)) 2
-    else if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0)) 1
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
     else 0
   private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
   private val replicaId = brokerConfig.brokerId

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 16b8c3a..e388d98 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -431,20 +431,14 @@ class ReplicaManager(val config: KafkaConfig,
             fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
             Runtime.getRuntime.halt(1)
             (topicPartition, null)
-          case utpe: UnknownTopicOrPartitionException =>
-            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
-          case nle: NotLeaderForPartitionException =>
-            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
-          case mtle: RecordTooLargeException =>
-            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle)))
-          case mstle: RecordBatchTooLargeException =>
-            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
-          case imse: CorruptRecordException =>
-            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
-          case ime : InvalidMessageException =>
-            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(ime)))
-          case itse : InvalidTimestampException =>
-            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(itse)))
+          case e@ (_: UnknownTopicOrPartitionException |
+                   _: NotLeaderForPartitionException |
+                   _: RecordTooLargeException |
+                   _: RecordBatchTooLargeException |
+                   _: CorruptRecordException |
+                   _: InvalidMessageException |
+                   _: InvalidTimestampException) =>
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
           case t: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -573,9 +567,10 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] = {
-    getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap(_.log.map(_.config.messageFormatVersion))
-  }
+  def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] =
+    getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { replica =>
+      replica.log.map(_.config.messageFormatVersion.messageFormatVersion)
+    }
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
     replicaStateChangeLock synchronized {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index e20b061..dda9697 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -220,7 +220,7 @@ object SimpleConsumerShell extends Logging {
                   System.out.println("next offset = " + offset)
                 val message = messageAndOffset.message
                 val key = if(message.hasKey) Utils.readBytes(message.key) else null
-                val value = if (message.isNull()) null else Utils.readBytes(message.payload)
+                val value = if (message.isNull) null else Utils.readBytes(message.payload)
                 formatter.writeTo(key, value, message.timestamp, message.timestampType, System.out)
                 numMessagesConsumed += 1
               } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index a66fe35..19a8882 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -296,19 +296,19 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
-      assertEquals(tp.topic(), record.topic())
-      assertEquals(tp.partition(), record.partition())
+      assertEquals(tp.topic, record.topic)
+      assertEquals(tp.partition, record.partition)
       if (timestampType == TimestampType.CREATE_TIME) {
-        assertEquals(timestampType, record.timestampType())
+        assertEquals(timestampType, record.timestampType)
         val timestamp = startingTimestamp + i
-        assertEquals(timestamp.toLong, record.timestamp())
+        assertEquals(timestamp.toLong, record.timestamp)
       } else
-        assertTrue(s"Got unexpected timestamp ${record.timestamp()}. Timestamp should be between [$startingTimestamp, $now}]",
-          record.timestamp() >= startingTimestamp && record.timestamp() <= now)
-      assertEquals(offset.toLong, record.offset())
+        assertTrue(s"Got unexpected timestamp ${record.timestamp}. Timestamp should be between [$startingTimestamp, $now}]",
+          record.timestamp >= startingTimestamp && record.timestamp <= now)
+      assertEquals(offset.toLong, record.offset)
       val keyAndValueIndex = startingKeyAndValueIndex + i
-      assertEquals(s"key $keyAndValueIndex", new String(record.key()))
-      assertEquals(s"value $keyAndValueIndex", new String(record.value()))
+      assertEquals(s"key $keyAndValueIndex", new String(record.key))
+      assertEquals(s"value $keyAndValueIndex", new String(record.value))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index f8ad633..9bdbf6d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -335,7 +335,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
         retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
     (0 until numRecords).foreach { i =>
-      producer.send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes))
+      producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes))
     }
     producer.close()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index b8ad15b..ac7ce51 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -20,9 +20,11 @@ package kafka.consumer
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
+
+import kafka.common.LongRef
+
 import scala.collection._
 import org.junit.Assert._
-
 import kafka.message._
 import kafka.server._
 import kafka.utils.TestUtils._
@@ -64,7 +66,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
   def testConsumerIteratorDeduplicationDeepIterator() {
     val messageStrings = (0 until 10).map(_.toString).toList
     val messages = messageStrings.map(s => new Message(s.getBytes))
-    val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new AtomicLong(0), messages:_*)
+    val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new LongRef(0), messages:_*)
 
     topicInfos(0).enqueue(messageSet)
     assertEquals(1, queue.size)
@@ -88,7 +90,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
   def testConsumerIteratorDecodingFailure() {
     val messageStrings = (0 until 10).map(_.toString).toList
     val messages = messageStrings.map(s => new Message(s.getBytes))
-    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*)
+    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(0), messages:_*)
 
     topicInfos(0).enqueue(messageSet)
     assertEquals(1, queue.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 587abd5..90e2b95 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -89,7 +89,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
-    groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime)
+    groupCoordinator = GroupCoordinator(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime)
     groupCoordinator.startup()
 
     // add the partition into the owned partition list

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 69218ba..3773233 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
-import java.util.concurrent.atomic.AtomicLong
 
 import kafka.common._
 import kafka.message._
@@ -261,7 +260,7 @@ class CleanerTest extends JUnitSuite {
       log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
     
     // forward offset and append message to next segment at offset Int.MaxValue
-    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(Int.MaxValue-1),
+    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(Int.MaxValue - 1),
       new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, Message.MagicValue_V1))
     log.append(messageSet, assignOffsets = false)
     log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
@@ -284,7 +283,7 @@ class CleanerTest extends JUnitSuite {
       log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
 
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
-    assertEquals(log.numberOfSegments-1, groups.size)
+    assertEquals(log.numberOfSegments - 1, groups.size)
     for (group <- groups)
       assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
     checkSegmentOrder(groups)
@@ -313,7 +312,7 @@ class CleanerTest extends JUnitSuite {
       assertEquals("Should have the expected number of messages in the map.", end-start, map.size)
       for(i <- start until end)
         assertEquals("Should find all the keys", i.toLong, map.get(key(i)))
-      assertEquals("Should not find a value too small", -1L, map.get(key(start-1)))
+      assertEquals("Should not find a value too small", -1L, map.get(key(start - 1)))
       assertEquals("Should not find a value too large", -1L, map.get(key(end)))
     }
     val segments = log.logSegments.toSeq
@@ -455,11 +454,11 @@ class CleanerTest extends JUnitSuite {
                                          magicValue = Message.MagicValue_V1))
 
   def unkeyedMessage(value: Int) =
-    new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes))
+    new ByteBufferMessageSet(new Message(bytes = value.toString.getBytes))
 
   def deleteMessage(key: Int) =
-    new ByteBufferMessageSet(new Message(key=key.toString.getBytes,
-                                         bytes=null,
+    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
+                                         bytes = null,
                                          timestamp = Message.NoTimestamp,
                                          magicValue = Message.MagicValue_V1))
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 0179166..a3e5b2d 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -20,6 +20,8 @@ package kafka.log
 import java.io._
 import java.nio._
 import java.util.concurrent.atomic._
+
+import kafka.common.LongRef
 import org.junit.Assert._
 import kafka.utils.TestUtils._
 import kafka.message._
@@ -103,7 +105,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   def testSearch() {
     // append a new message with a high offset
     val lastMessage = new Message("test".getBytes)
-    messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(50), lastMessage))
+    messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(50), lastMessage))
     var position = 0
     assertEquals("Should be able to find the first message by its offset", 
                  OffsetPosition(0L, position), 

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 1be9e65..71e40da 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -53,19 +53,17 @@ class LogConfigTest {
 
   @Test
   def testFromPropsInvalid() {
-    LogConfig.configNames().foreach((name) => {
-      name match {
-        case LogConfig.UncleanLeaderElectionEnableProp  => return
-        case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
-        case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" )
-        case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
-        case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
-        case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
-        case LogConfig.MessageFormatVersionProp => assertPropertyInvalid(name, "")
-        case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
-      }
+    LogConfig.configNames.foreach(name => name match {
+      case LogConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(name, "not a boolean")
+      case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
+      case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" )
+      case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
+      case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
+      case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
+      case LogConfig.MessageFormatVersionProp => assertPropertyInvalid(name, "")
+      case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
     })
-   }
+  }
 
   private def assertPropertyInvalid(name: String, values: AnyRef*) {
     values.foreach((value) => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 91a4449..46bfbed 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,7 +20,6 @@ package kafka.log
 import java.io._
 import java.util.Properties
 
-import kafka.api.ApiVersion
 import kafka.common._
 import kafka.server.OffsetCheckpoint
 import kafka.utils._

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 7b80c27..edbfd99 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -18,10 +18,13 @@
 
 import org.junit.Assert._
 import java.util.concurrent.atomic._
-import org.junit.{Test, After}
+
+import kafka.common.LongRef
+import org.junit.{After, Test}
 import kafka.utils.TestUtils
 import kafka.message._
 import kafka.utils.SystemTime
+
 import scala.collection._
 
 class LogSegmentTest {
@@ -43,7 +46,7 @@ class LogSegmentTest {
   /* create a ByteBufferMessageSet for the given messages starting from the given offset */
   def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
     new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, 
-                             offsetCounter = new AtomicLong(offset), 
+                             offsetCounter = new LongRef(offset),
                              messages = messages.map(s => new Message(s.getBytes)):_*)
   }
   
@@ -275,4 +278,4 @@ class LogSegmentTest {
     assertEquals(oldSize, size)
     assertEquals(size, fileSize)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 426b5e8..5446eec 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,8 +20,10 @@ package kafka.log
 import java.io._
 import java.util.Properties
 import java.util.concurrent.atomic._
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+
+import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
 import kafka.api.ApiVersion
+import kafka.common.LongRef
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
@@ -61,10 +63,10 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTimeBasedLogRoll() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
 
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long)
+    logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long)
 
     // create a log
     val log = new Log(logDir,
@@ -98,7 +100,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTimeBasedLogRollJitter() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val maxJitter = 20 * 60L
 
     val logProps = new Properties()
@@ -134,7 +136,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
-    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
+    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     // create a log
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
@@ -164,7 +166,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
-    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
+    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
 
@@ -192,7 +194,7 @@ class LogTest extends JUnitSuite {
 
     // now test the case that we give the offsets and use non-sequential offsets
     for(i <- 0 until messages.length)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(messageIds(i)), messages = messages(i)), assignOffsets = false)
     for(i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
       val read = log.read(i, 100, None).messageSet.head
@@ -350,7 +352,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
-    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
+    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
 
     try {
@@ -534,7 +536,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTruncateTo() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
@@ -591,7 +593,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testIndexResizingAtTruncation() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
@@ -623,7 +625,7 @@ class LogTest extends JUnitSuite {
     val bogusIndex1 = Log.indexFilename(logDir, 0)
     val bogusIndex2 = Log.indexFilename(logDir, 5)
 
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -649,7 +651,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testReopenThenTruncate() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -682,7 +684,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAsyncDelete() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val asyncDeleteMs = 1000
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
@@ -724,7 +726,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testOpenDeletesObsoleteFiles() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -771,7 +773,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
     val config = LogConfig(logProps)
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val recoveryPoint = 50L
     for(iteration <- 0 until 50) {
       // create a log and write some messages to it
@@ -807,7 +809,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     val config = LogConfig(logProps)
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val parentLogDir = logDir.getParentFile
     assertTrue("Data directory %s must exist", parentLogDir.isDirectory)
     val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile)

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 77f5d65..758dad2 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -18,8 +18,8 @@
 package kafka.message
 
 import java.nio._
-import java.util.concurrent.atomic.AtomicLong
 
+import kafka.common.LongRef
 import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record.TimestampType
@@ -34,7 +34,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testValidBytes() {
     {
-      val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+      val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))
       val buffer = ByteBuffer.allocate(messages.sizeInBytes + 2)
       buffer.put(messages.buffer)
       buffer.putShort(4)
@@ -51,25 +51,23 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 
   @Test
   def testValidBytesWithCompression() {
-    {
-      val messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
-      val buffer = ByteBuffer.allocate(messages.sizeInBytes + 2)
-      buffer.put(messages.buffer)
-      buffer.putShort(4)
-      val messagesPlus = new ByteBufferMessageSet(buffer)
-      assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
-    }
+    val messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))
+    val buffer = ByteBuffer.allocate(messages.sizeInBytes + 2)
+    buffer.put(messages.buffer)
+    buffer.putShort(4)
+    val messagesPlus = new ByteBufferMessageSet(buffer)
+    assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
   }
 
   @Test
   def testEquals() {
-    var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
-    var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+    var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))
+    var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))
 
     assertTrue(messages.equals(moreMessages))
 
-    messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
-    moreMessages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+    messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))
+    moreMessages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))
 
     assertTrue(messages.equals(moreMessages))
   }
@@ -161,23 +159,26 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     val compressedMessagesWithoutRecompression =
       getMessages(magicValue = Message.MagicValue_V1, timestamp = -1L, codec = DefaultCompressionCodec)
 
-    val validatedMessages = messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
-                                                                      sourceCodec = NoCompressionCodec,
-                                                                      targetCodec = NoCompressionCodec,
-                                                                      messageFormatVersion = 1,
-                                                                      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                      messageTimestampDiffMaxMs = 1000L)
-
-    val validatedCompressedMessages =
-      compressedMessagesWithRecompresion.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+    val (validatedMessages, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                                           now = System.currentTimeMillis(),
+                                                                           sourceCodec = NoCompressionCodec,
+                                                                           targetCodec = NoCompressionCodec,
+                                                                           messageFormatVersion = 1,
+                                                                           messageTimestampType = TimestampType.LOG_APPEND_TIME,
+                                                                           messageTimestampDiffMaxMs = 1000L)
+
+    val (validatedCompressedMessages, _) =
+      compressedMessagesWithRecompresion.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                                          now = System.currentTimeMillis(),
                                                                           sourceCodec = DefaultCompressionCodec,
                                                                           targetCodec = DefaultCompressionCodec,
                                                                           messageFormatVersion = 1,
                                                                           messageTimestampType = TimestampType.LOG_APPEND_TIME,
                                                                           messageTimestampDiffMaxMs = 1000L)
 
-    val validatedCompressedMessagesWithoutRecompression =
-      compressedMessagesWithoutRecompression.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+    val (validatedCompressedMessagesWithoutRecompression, _) =
+      compressedMessagesWithoutRecompression.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                                              now = System.currentTimeMillis(),
                                                                               sourceCodec = DefaultCompressionCodec,
                                                                               targetCodec = DefaultCompressionCodec,
                                                                               messageFormatVersion = 1,
@@ -186,16 +187,15 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 
     val now = System.currentTimeMillis()
     assertEquals("message set size should not change", messages.size, validatedMessages.size)
-    validatedMessages.foreach({case messageAndOffset => validateLogAppendTime(messageAndOffset.message)})
+    validatedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
 
     assertEquals("message set size should not change", compressedMessagesWithRecompresion.size, validatedCompressedMessages.size)
-    validatedCompressedMessages.foreach({case messageAndOffset => validateLogAppendTime(messageAndOffset.message)})
+    validatedCompressedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
     assertTrue("MessageSet should still valid", validatedCompressedMessages.shallowIterator.next().message.isValid)
 
     assertEquals("message set size should not change", compressedMessagesWithoutRecompression.size,
       validatedCompressedMessagesWithoutRecompression.size)
-    validatedCompressedMessagesWithoutRecompression.foreach({case messageAndOffset =>
-      validateLogAppendTime(messageAndOffset.message)})
+    validatedCompressedMessagesWithoutRecompression.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
     assertTrue("MessageSet should still valid", validatedCompressedMessagesWithoutRecompression.shallowIterator.next().message.isValid)
 
     def validateLogAppendTime(message: Message) {
@@ -212,15 +212,17 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = NoCompressionCodec)
     val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = DefaultCompressionCodec)
 
-    val validatedMessages = messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
-                                                                      sourceCodec = NoCompressionCodec,
-                                                                      targetCodec = NoCompressionCodec,
-                                                                      messageFormatVersion = 1,
-                                                                      messageTimestampType = TimestampType.CREATE_TIME,
-                                                                      messageTimestampDiffMaxMs = 1000L)
-
-    val validatedCompressedMessages =
-      compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+    val (validatedMessages, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                                           now = System.currentTimeMillis(),
+                                                                           sourceCodec = NoCompressionCodec,
+                                                                           targetCodec = NoCompressionCodec,
+                                                                           messageFormatVersion = 1,
+                                                                           messageTimestampType = TimestampType.CREATE_TIME,
+                                                                           messageTimestampDiffMaxMs = 1000L)
+
+    val (validatedCompressedMessages, _) =
+      compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                          now = System.currentTimeMillis(),
                                                           sourceCodec = DefaultCompressionCodec,
                                                           targetCodec = DefaultCompressionCodec,
                                                           messageFormatVersion = 1,
@@ -246,7 +248,8 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now - 1001L, codec = DefaultCompressionCodec)
 
     try {
-      messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+      messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                now = System.currentTimeMillis(),
                                                 sourceCodec = NoCompressionCodec,
                                                 targetCodec = NoCompressionCodec,
                                                 messageFormatVersion = 1,
@@ -258,7 +261,8 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     }
 
     try {
-      compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+      compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                          now = System.currentTimeMillis(),
                                                           sourceCodec = DefaultCompressionCodec,
                                                           targetCodec = DefaultCompressionCodec,
                                                           messageFormatVersion = 1,
@@ -277,21 +281,23 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     // check uncompressed offsets
     checkOffsets(messages, 0)
     val offset = 1234567
-    checkOffsets(messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+    checkOffsets(messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                           now = System.currentTimeMillis(),
                                                            sourceCodec = NoCompressionCodec,
                                                            targetCodec = NoCompressionCodec,
                                                            messageFormatVersion = 0,
                                                            messageTimestampType = TimestampType.CREATE_TIME,
-                                                           messageTimestampDiffMaxMs = 1000L), offset)
+                                                           messageTimestampDiffMaxMs = 1000L)._1, offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
-    checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+    checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                     now = System.currentTimeMillis(),
                                                                      sourceCodec = DefaultCompressionCodec,
                                                                      targetCodec = DefaultCompressionCodec,
                                                                      messageFormatVersion = 0,
                                                                      messageTimestampType = TimestampType.CREATE_TIME,
-                                                                     messageTimestampDiffMaxMs = 1000L), offset)
+                                                                     messageTimestampDiffMaxMs = 1000L)._1, offset)
 
   }
 
@@ -304,20 +310,22 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     // check uncompressed offsets
     checkOffsets(messages, 0)
     val offset = 1234567
-    val messageWithOffset = messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
-                                                                      sourceCodec = NoCompressionCodec,
-                                                                      targetCodec = NoCompressionCodec,
-                                                                      messageTimestampType = TimestampType.CREATE_TIME,
-                                                                      messageTimestampDiffMaxMs = 5000L)
+    val (messageWithOffset, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                           now = System.currentTimeMillis(),
+                                                                           sourceCodec = NoCompressionCodec,
+                                                                           targetCodec = NoCompressionCodec,
+                                                                           messageTimestampType = TimestampType.CREATE_TIME,
+                                                                           messageTimestampDiffMaxMs = 5000L)
     checkOffsets(messageWithOffset, offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
-    val compressedMessagesWithOffset = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
-                                                                                           sourceCodec = DefaultCompressionCodec,
-                                                                                           targetCodec = DefaultCompressionCodec,
-                                                                                           messageTimestampType = TimestampType.CREATE_TIME,
-                                                                                           messageTimestampDiffMaxMs = 5000L)
+    val (compressedMessagesWithOffset, _) = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                                                now = System.currentTimeMillis(),
+                                                                                                sourceCodec = DefaultCompressionCodec,
+                                                                                                targetCodec = DefaultCompressionCodec,
+                                                                                                messageTimestampType = TimestampType.CREATE_TIME,
+                                                                                                messageTimestampDiffMaxMs = 5000L)
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -329,21 +337,23 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     // check uncompressed offsets
     checkOffsets(messagesV0, 0)
     val offset = 1234567
-    checkOffsets(messagesV0.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+    checkOffsets(messagesV0.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                             now = System.currentTimeMillis(),
                                                              sourceCodec = NoCompressionCodec,
                                                              targetCodec = NoCompressionCodec,
                                                              messageFormatVersion = 1,
                                                              messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                             messageTimestampDiffMaxMs = 1000L), offset)
+                                                             messageTimestampDiffMaxMs = 1000L)._1, offset)
 
     // check compressed messages
     checkOffsets(compressedMessagesV0, 0)
-    checkOffsets(compressedMessagesV0.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+    checkOffsets(compressedMessagesV0.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                       now = System.currentTimeMillis(),
                                                                        sourceCodec = DefaultCompressionCodec,
                                                                        targetCodec = DefaultCompressionCodec,
                                                                        messageFormatVersion = 1,
                                                                        messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                       messageTimestampDiffMaxMs = 1000L), offset)
+                                                                       messageTimestampDiffMaxMs = 1000L)._1, offset)
 
     // Check down conversion
     val now = System.currentTimeMillis()
@@ -352,21 +362,23 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 
     // check uncompressed offsets
     checkOffsets(messagesV1, 0)
-    checkOffsets(messagesV1.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+    checkOffsets(messagesV1.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                             now = System.currentTimeMillis(),
                                                              sourceCodec = NoCompressionCodec,
                                                              targetCodec = NoCompressionCodec,
                                                              messageFormatVersion = 0,
                                                              messageTimestampType = TimestampType.CREATE_TIME,
-                                                             messageTimestampDiffMaxMs = 5000L), offset)
+                                                             messageTimestampDiffMaxMs = 5000L)._1, offset)
 
     // check compressed messages
     checkOffsets(compressedMessagesV1, 0)
-    checkOffsets(compressedMessagesV1.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+    checkOffsets(compressedMessagesV1.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                       now = System.currentTimeMillis(),
                                                                        sourceCodec = DefaultCompressionCodec,
                                                                        targetCodec = DefaultCompressionCodec,
                                                                        messageFormatVersion = 0,
                                                                        messageTimestampType = TimestampType.CREATE_TIME,
-                                                                       messageTimestampDiffMaxMs = 5000L), offset)
+                                                                       messageTimestampDiffMaxMs = 5000L)._1, offset)
   }
   
   /* check that offsets are assigned based on byte offset from the given base offset */

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 7fe9ffc..aac50bd 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -295,7 +295,7 @@ class KafkaConfigTest {
     assertEquals(KAFKA_0_8_2, conf3.interBrokerProtocolVersion)
 
     //check that latest is newer than 0.8.2
-    assert(ApiVersion.latestVersion.onOrAfter(conf3.interBrokerProtocolVersion))
+    assert(ApiVersion.latestVersion >= conf3.interBrokerProtocolVersion)
   }
 
   private def isValidKafkaConfig(props: Properties): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index f6d67eb..3370822 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -18,15 +18,15 @@
 <h3><a id="upgrade" href="#upgrade">1.5 Upgrading From Previous Versions</a></h3>
 
 <h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 0.10.0.0</a></h4>
-0.10.0.0 has <a href="#upgrade_10_performance_impact">potential performance impact during upgrade</a> and
-<a href="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading). Because new protocols
+0.10.0.0 has <a href="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading) and
+there may be a <a href="#upgrade_10_performance_impact">performance impact during the upgrade</a>. Because new protocols
 are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
 
 <p><b>For a rolling upgrade:</b></p>
 
 <ol>
-    <li> Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=CURRENT_KAFKA_VERSION(e.g. 0.8.2, 0.9.0.0).
-         We recommend the users to set message.format.version=CURRENT_KAFKA_VERSION as well to avoid performance regression
+    <li> Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0).
+         We recommend that users set message.format.version=CURRENT_KAFKA_VERSION as well to avoid a performance regression
          during upgrade. See <a href="#upgrade_10_performance_impact">potential performance impact during upgrade</a> for the details.
     </li>
     <li> Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it. </li>
@@ -38,35 +38,36 @@ are introduced, it is important to upgrade your Kafka clusters before upgrading
 
 <p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
 
-<h5><a id="upgrade_10_performance_impact" href="#upgrade_10_performance_impact">potential performance impact in 0.10.0.0 during upgrade</a></h5>
+<h5><a id="upgrade_10_performance_impact" href="#upgrade_10_performance_impact">Potential performance impact during upgrade to 0.10.0.0</a></h5>
 <p>
-    Message format in 0.10.0 now includes a new timestamp field and uses relative offsets for compressed messages.
-    The on disk message format can be configured through message.format.version in server.properties file.
+    The message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages.
+    The on disk message format can be configured through message.format.version in the server.properties file.
     The default on-disk message format is 0.10.0. If a consumer client is on a version before 0.10.0.0, it only understands
-    message format before 0.10.0. In this case, the broker is able to convert messages of the format in 0.10.0 to earlier format
-    before sending a response to the consumer on an older version. However, the broker can't use zero-copy transfer in this case.
+    message formats before 0.10.0. In this case, the broker is able to convert messages from the 0.10.0 format to an earlier format
+    before sending the response to the consumer on an older version. However, the broker can't use zero-copy transfer in this case.
 
-    To avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set the message format to 0.9.0 when
-    upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the data to the old
-    consumers. Once most consumers are upgraded, one can change the message format to 0.10.0 on the broker.
+    To avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set the message format to
+    e.g. 0.9.0 when upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the
+    data to the old consumers. Once most consumers are upgraded, one can change the message format to 0.10.0 on the broker.
 </p>
 <p>
     For clients that are upgraded to 0.10.0.0, there is no performance impact.
 </p>
 <p>
-    <b>Note:</b> By setting the message format version, one certifies all the existing messages are on or below that
+    <b>Note:</b> By setting the message format version, one certifies that all existing messages are on or below that
     message format version. Otherwise consumers before 0.10.0.0 might break. In particular, after the message format
-    is set to 0.10.0, one should not change it back to earlier format since it may break the consumer on versions before 0.10.0.0.
+    is set to 0.10.0, one should not change it back to an earlier format as it may break consumers on versions before 0.10.0.0.
 </p>
 
 <h5><a id="upgrade_10_breaking" href="#upgrade_10_breaking">potential breaking changes in 0.10.0.0</a></h5>
 <ul>
-    <li> Starting from Kafka 0.10.0.0, message format version in Kafka is represented as the Kafka version. For example, message format 0.9.0 refers to the highest message version supported by Kafka 0.9.0. </li>
-    <li> Message format 0.10.0 is added and used by default to include a timestamp field in the messages and use relative offsets for compressed messages. </li>
-    <li> ProduceRequest/Response v2 is added and used by default to support message format 0.10.0 </li>
-    <li> FetchRequest/Response v2 is added and used by default to support message format 0.10.0 </li>
-    <li> MessageFormatter interface changed from <code>void writeTo(byte[] key, byte[] value, PrintStream output)</code> to
-         <code>void writeTo(byte[] key, byte[] value, long timestamp, TimestampType timestampType, PrintStream output)</code> </li>
+    <li> Starting from Kafka 0.10.0.0, the message format version in Kafka is represented as the Kafka version. For example, message format 0.9.0 refers to the highest message version supported by Kafka 0.9.0. </li>
+    <li> Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages. </li>
+    <li> ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li>
+    <li> FetchRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li>
+    <li> MessageFormatter interface was changed from <code>void writeTo(byte[] key, byte[] value, PrintStream output)</code> to
+        <code>void writeTo(byte[] key, byte[] value, long timestamp, TimestampType timestampType, PrintStream output)</code> </li>
+    <li> MirrorMakerMessageHandler no longer exposes <em>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</em> method as it was never called. </li>
 </ul>
 
 <h4><a id="upgrade_9" href="#upgrade_9">Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0</a></h4>
@@ -107,9 +108,8 @@ are introduced, it is important to upgrade your Kafka clusters before upgrading
 <h5><a id="upgrade_901_notable" href="#upgrade_901_notable">Notable changes in 0.9.0.1</a></h5>
 
 <ul>
-    <li> The new broker id generation feature can be disable by setting broker.id.generation.enable to false. </li>
+    <li> The new broker id generation feature can be disabled by setting broker.id.generation.enable to false. </li>
     <li> Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics. </li>
-    <li> MirrorMakerMessageHandler no longer exposes <em>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</em> method as it was never called. </li>
     <li> Default value of configuration parameter fetch.min.bytes for the new consumer is now 1 by default. </li>
 </ul>
 


[2/2] kafka git commit: KAFKA-3259 KAFKA-3253; KIP-31/KIP-32 Follow-up

Posted by ju...@apache.org.
KAFKA-3259 KAFKA-3253; KIP-31/KIP-32 Follow-up

This PR includes a number of clean-ups:
* Code style
* Documentation wording improvements
* Efficiency improvements

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #943 from ijuma/kafka-3259-kip-31-32-clean-ups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01aeea7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01aeea7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01aeea7c

Branch: refs/heads/trunk
Commit: 01aeea7c7bca34f1edce40116b7721335938b13b
Parents: 1bfadda
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Feb 24 11:28:53 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Feb 24 11:28:53 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/common/record/Record.java  |   2 +-
 .../kafka/common/record/TimestampType.java      |  32 +--
 .../org/apache/kafka/common/utils/Utils.java    |   2 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  14 +-
 core/src/main/scala/kafka/common/LongRef.scala  |  61 +++++
 .../controller/ControllerChannelManager.scala   |   2 +-
 .../kafka/coordinator/GroupCoordinator.scala    |   8 +-
 .../javaapi/message/ByteBufferMessageSet.scala  |   5 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |  25 +-
 core/src/main/scala/kafka/log/Log.scala         |  65 ++---
 core/src/main/scala/kafka/log/LogCleaner.scala  |  62 ++---
 core/src/main/scala/kafka/log/LogConfig.scala   |  20 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  10 +-
 .../kafka/message/ByteBufferMessageSet.scala    | 239 +++++++++----------
 core/src/main/scala/kafka/message/Message.scala |  53 ++--
 .../main/scala/kafka/message/MessageSet.scala   |  20 +-
 .../scala/kafka/message/MessageWriter.scala     |   2 +-
 .../main/scala/kafka/server/ConfigHandler.scala |  35 ++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  71 +++---
 .../main/scala/kafka/server/KafkaConfig.scala   |  34 ++-
 .../main/scala/kafka/server/KafkaServer.scala   |  12 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../scala/kafka/server/ReplicaManager.scala     |  29 +--
 .../scala/kafka/tools/SimpleConsumerShell.scala |   2 +-
 .../kafka/api/BaseConsumerTest.scala            |  18 +-
 .../kafka/api/PlaintextConsumerTest.scala       |   2 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |   8 +-
 .../GroupCoordinatorResponseTest.scala          |   2 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |  13 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |   4 +-
 .../scala/unit/kafka/log/LogConfigTest.scala    |  22 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   1 -
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   9 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  34 +--
 .../message/ByteBufferMessageSetTest.scala      | 136 ++++++-----
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +-
 docs/upgrade.html                               |  44 ++--
 37 files changed, 575 insertions(+), 529 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 8390dc7..147ad86 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -347,7 +347,7 @@ public final class Record {
         if (magic() == 0)
             return TimestampType.NO_TIMESTAMP_TYPE;
         else
-            return wrapperRecordTimestampType == null ? TimestampType.getTimestampType(attributes()) : wrapperRecordTimestampType;
+            return wrapperRecordTimestampType == null ? TimestampType.forAttributes(attributes()) : wrapperRecordTimestampType;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
index ab12a35..62fd814 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
@@ -25,34 +25,28 @@ import java.util.NoSuchElementException;
 public enum TimestampType {
     NO_TIMESTAMP_TYPE(-1, "NoTimestampType"), CREATE_TIME(0, "CreateTime"), LOG_APPEND_TIME(1, "LogAppendTime");
 
-    public final int value;
+    public final int id;
     public final String name;
-    TimestampType(int value, String name) {
-        this.value = value;
+    TimestampType(int id, String name) {
+        this.id = id;
         this.name = name;
     }
 
-    public static TimestampType getTimestampType(byte attributes) {
-        int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET;
-        return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME;
+    public byte updateAttributes(byte attributes) {
+        return this == CREATE_TIME ?
+            (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK);
     }
 
-    public static byte setTimestampType(byte attributes, TimestampType timestampType) {
-        return timestampType == CREATE_TIME ?
-                (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK);
+    public static TimestampType forAttributes(byte attributes) {
+        int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET;
+        return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME;
     }
 
     public static TimestampType forName(String name) {
-        switch (name) {
-            case "NoTimestampType":
-                return NO_TIMESTAMP_TYPE;
-            case "CreateTime":
-                return CREATE_TIME;
-            case "LogAppendTime":
-                return LOG_APPEND_TIME;
-            default:
-                throw new NoSuchElementException("Invalid timestamp type " + name);
-        }
+        for (TimestampType t : values())
+            if (t.name.equals(name))
+                return t;
+        throw new NoSuchElementException("Invalid timestamp type " + name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8df54a4..daef458 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -166,7 +166,7 @@ public class Utils {
      * @param buffer The buffer to write to
      * @param value The value to write
      */
-    public static void writetUnsignedInt(ByteBuffer buffer, long value) {
+    public static void writeUnsignedInt(ByteBuffer buffer, long value) {
         buffer.putInt((int) (value & 0xffffffffL));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 6b5fb7c..e2cadd1 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -51,9 +51,14 @@ object ApiVersion {
     "0.10.0" -> KAFKA_0_10_0_IV0
   )
 
-  def apply(version: String): ApiVersion  = versionNameMap(version.split("\\.").slice(0,3).mkString("."))
+  private val versionPattern = "\\.".r
+
+  def apply(version: String): ApiVersion =
+    versionNameMap.getOrElse(versionPattern.split(version).slice(0, 3).mkString("."),
+      throw new IllegalArgumentException(s"Version `$version` is not a valid version"))
 
   def latestVersion = versionNameMap.values.max
+
 }
 
 sealed trait ApiVersion extends Ordered[ApiVersion] {
@@ -61,13 +66,8 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
   val messageFormatVersion: Byte
   val id: Int
 
-  override def compare(that: ApiVersion): Int = {
+  override def compare(that: ApiVersion): Int =
     ApiVersion.orderingByVersion.compare(this, that)
-  }
-
-  def onOrAfter(that: ApiVersion): Boolean = {
-    this.compare(that) >= 0
-  }
 
   override def toString(): String = version
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/common/LongRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/LongRef.scala b/core/src/main/scala/kafka/common/LongRef.scala
new file mode 100644
index 0000000..f2b1e32
--- /dev/null
+++ b/core/src/main/scala/kafka/common/LongRef.scala
@@ -0,0 +1,61 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.common
+
+/**
+  * A mutable cell that holds a value of type `Long`. One should generally prefer using value-based programming (i.e.
+  * passing and returning `Long` values), but this class can be useful in some scenarios.
+  *
+  * Unlike `AtomicLong`, this class is not thread-safe and there are no atomicity guarantees.
+  */
+class LongRef(var value: Long) {
+
+  def addAndGet(delta: Long): Long = {
+    value += delta
+    value
+  }
+
+  def getAndAdd(delta: Long): Long = {
+    val result = value
+    value += delta
+    result
+  }
+
+  def getAndIncrement(): Long = {
+    val v = value
+    value += 1
+    v
+  }
+
+  def incrementAndGet(): Long = {
+    value += 1
+    value
+  }
+
+  def getAndDecrement(): Long = {
+    val v = value
+    value -= 1
+    v
+  }
+
+  def decrementAndGet(): Long = {
+    value -= 1
+    value
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 513f383..3b1a458 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -380,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           topicPartition -> partitionState
         }
 
-        val version = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0)) (1: Short) else (0: Short)
+        val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) (1: Short) else (0: Short)
 
         val updateMetadataRequest =
           if (version == 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index c86e87b..cb08358 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -724,10 +724,10 @@ object GroupCoordinator {
   // TODO: we store both group metadata and offset data here despite the topic name being offsets only
   val GroupMetadataTopicName = "__consumer_offsets"
 
-  def create(config: KafkaConfig,
-             zkUtils: ZkUtils,
-             replicaManager: ReplicaManager,
-             time: Time): GroupCoordinator = {
+  def apply(config: KafkaConfig,
+            zkUtils: ZkUtils,
+            replicaManager: ReplicaManager,
+            time: Time): GroupCoordinator = {
     val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index df30279..590db83 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -16,8 +16,9 @@
 */
 package kafka.javaapi.message
 
-import java.util.concurrent.atomic.AtomicLong
 import java.nio.ByteBuffer
+
+import kafka.common.LongRef
 import kafka.message._
 
 import scala.collection.JavaConverters._
@@ -26,7 +27,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
   private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
   
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), messages.asScala: _*).buffer)
+    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new LongRef(0), messages.asScala: _*).buffer)
   }
 
   def this(messages: java.util.List[Message]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index fe31ad4..45b3df9 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -174,16 +174,17 @@ class FileMessageSet private[kafka](@volatile var file: File,
   }
 
   /**
-    * This method is called before we write messages to socket use zero-copy transfer. We need to
-    * make sure all the messages in the message set has expected magic value
+    * This method is called before we write messages to the socket using zero-copy transfer. We need to
+    * make sure all the messages in the message set have the expected magic value.
+    *
     * @param expectedMagicValue the magic value expected
-    * @return true if all messages has expected magic value, false otherwise
+    * @return true if all messages have expected magic value, false otherwise
     */
-  override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
+  override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
     var location = start
     val offsetAndSizeBuffer = ByteBuffer.allocate(MessageSet.LogOverhead)
     val crcAndMagicByteBuffer = ByteBuffer.allocate(Message.CrcLength + Message.MagicLength)
-    while(location < end) {
+    while (location < end) {
       offsetAndSizeBuffer.rewind()
       channel.read(offsetAndSizeBuffer, location)
       if (offsetAndSizeBuffer.hasRemaining)
@@ -191,7 +192,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
       offsetAndSizeBuffer.rewind()
       offsetAndSizeBuffer.getLong // skip offset field
       val messageSize = offsetAndSizeBuffer.getInt
-      if(messageSize < Message.MinMessageOverhead)
+      if (messageSize < Message.MinMessageOverhead)
         throw new IllegalStateException("Invalid message size: " + messageSize)
       crcAndMagicByteBuffer.rewind()
       channel.read(crcAndMagicByteBuffer, location + MessageSet.LogOverhead)
@@ -203,15 +204,15 @@ class FileMessageSet private[kafka](@volatile var file: File,
   }
 
   /**
-   * Convert this message set to use specified message format.
+   * Convert this message set to use the specified message format.
    */
   def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = {
     val offsets = new ArrayBuffer[Long]
     val newMessages = new ArrayBuffer[Message]
-    this.iterator().foreach(messageAndOffset => {
+    this.foreach { messageAndOffset =>
       val message = messageAndOffset.message
       if (message.compressionCodec == NoCompressionCodec) {
-        newMessages += messageAndOffset.message.toFormatVersion(toMagicValue)
+        newMessages += message.toFormatVersion(toMagicValue)
         offsets += messageAndOffset.offset
       } else {
         // File message set only has shallow iterator. We need to do deep iteration here if needed.
@@ -221,19 +222,19 @@ class FileMessageSet private[kafka](@volatile var file: File,
           offsets += innerMessageAndOffset.offset
         }
       }
-    })
+    }
 
     // We use the offset seq to assign offsets so the offset of the messages does not change.
     new ByteBufferMessageSet(
       compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
-      offsetSeq = offsets.toSeq,
+      offsetSeq = offsets,
       newMessages: _*)
   }
 
   /**
    * Get a shallow iterator over the messages in the set.
    */
-  override def iterator() = iterator(Int.MaxValue)
+  override def iterator = iterator(Int.MaxValue)
 
   /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f8c0b77..fd176b1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -21,17 +21,16 @@ import kafka.utils._
 import kafka.message._
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats}
-
-import java.io.{IOException, File}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
+import java.io.{File, IOException}
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+
+import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
 import org.apache.kafka.common.record.TimestampType
 
 import scala.collection.JavaConversions
-
 import com.yammer.metrics.core.Gauge
 
 object LogAppendInfo {
@@ -320,7 +319,7 @@ class Log(val dir: File,
     val appendInfo = analyzeAndValidateMessageSet(messages)
 
     // if we have any valid messages, append them to the log
-    if(appendInfo.shallowCount == 0)
+    if (appendInfo.shallowCount == 0)
       return appendInfo
 
     // trim any invalid bytes or partial messages before appending it to the on-disk log
@@ -333,42 +332,46 @@ class Log(val dir: File,
 
         if (assignOffsets) {
           // assign offsets to the message set
-          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
-          val now = SystemTime.milliseconds
-          try {
-            validMessages = validMessages.validateMessagesAndAssignOffsets(offset,
-                                                                           now,
-                                                                           appendInfo.sourceCodec,
-                                                                           appendInfo.targetCodec,
-                                                                           config.compact,
-                                                                           config.messageFormatVersion,
-                                                                           config.messageTimestampType,
-                                                                           config.messageTimestampDifferenceMaxMs)
+          val offset = new LongRef(nextOffsetMetadata.messageOffset)
+          val now = time.milliseconds
+          val (validatedMessages, messageSizesMaybeChanged) = try {
+            validMessages.validateMessagesAndAssignOffsets(offset,
+                                                           now,
+                                                           appendInfo.sourceCodec,
+                                                           appendInfo.targetCodec,
+                                                           config.compact,
+                                                           config.messageFormatVersion.messageFormatVersion,
+                                                           config.messageTimestampType,
+                                                           config.messageTimestampDifferenceMaxMs)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
-          appendInfo.lastOffset = offset.get - 1
-          // If log append time is used, we put the timestamp assigned to the messages in the append info.
+          validMessages = validatedMessages
+          appendInfo.lastOffset = offset.value - 1
           if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
             appendInfo.timestamp = now
+
+          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
+          // format conversion)
+          if (messageSizesMaybeChanged) {
+            for (messageAndOffset <- validMessages.shallowIterator) {
+              if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
+                // we record the original message set size instead of the trimmed size
+                // to be consistent with pre-compression bytesRejectedRate recording
+                BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
+                BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
+                throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
+                  .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
+              }
+            }
+          }
+
         } else {
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
             throw new IllegalArgumentException("Out of order offsets found in " + messages)
         }
 
-        // re-validate message sizes since after re-compression some may exceed the limit
-        for (messageAndOffset <- validMessages.shallowIterator) {
-          if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
-            // we record the original message set size instead of trimmed size
-            // to be consistent with pre-compression bytesRejectedRate recording
-            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
-            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
-            throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
-              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
-          }
-        }
-
         // check messages set size may be exceed config.segmentSize
         if (validMessages.sizeInBytes > config.segmentSize) {
           throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index a3aff15..a2e1913 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -370,7 +370,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.lastModified > deleteHorizonMs
         info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
             .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion)
+        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion)
       }
 
       // trim excess index
@@ -430,28 +430,30 @@ private[log] class Cleaner(val id: Int,
           }
           messagesRead += 1
         } else {
-          // We use absolute offset to decide whether retain the message or not. This is handled by
+          // We use the absolute offset to decide whether to retain the message or not. This is handled by the
           // deep iterator.
           val messages = ByteBufferMessageSet.deepIterator(entry)
-          var numberOfInnerMessages = 0
-          var formatConversionNeeded = false
-          val retainedMessages = messages.filter(messageAndOffset => {
+          var writeOriginalMessageSet = true
+          val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
+          messages.foreach { messageAndOffset =>
             messagesRead += 1
-            numberOfInnerMessages += 1
-            if (messageAndOffset.message.magic != messageFormatVersion)
-              formatConversionNeeded = true
-            shouldRetainMessage(source, map, retainDeletes, messageAndOffset)
-          }).toSeq
-
-          // There is no messages compacted out and no message format conversion, write the original message set back
-          if (retainedMessages.size == numberOfInnerMessages && !formatConversionNeeded)
-            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
-          else if (retainedMessages.nonEmpty) {
-            val convertedRetainedMessages = retainedMessages.map(messageAndOffset => {
-              new MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion), messageAndOffset.offset)
-            })
-            compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion, convertedRetainedMessages)
+            if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) {
+              retainedMessages += {
+                if (messageAndOffset.message.magic != messageFormatVersion) {
+                  writeOriginalMessageSet = false
+                  new MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion), messageAndOffset.offset)
+                }
+                else messageAndOffset
+              }
+            }
+            else writeOriginalMessageSet = false
           }
+
+          // There are no messages compacted out and no message format conversion, write the original message set back
+          if (writeOriginalMessageSet)
+            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+          else if (retainedMessages.nonEmpty)
+            compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion, retainedMessages)
         }
       }
 
@@ -474,27 +476,27 @@ private[log] class Cleaner(val id: Int,
   private def compressMessages(buffer: ByteBuffer,
                                compressionCodec: CompressionCodec,
                                messageFormatVersion: Byte,
-                               messages: Seq[MessageAndOffset]) {
-    val messagesIterable = messages.toIterable.map(_.message)
-    if (messages.isEmpty) {
+                               messageAndOffsets: Seq[MessageAndOffset]) {
+    val messages = messageAndOffsets.map(_.message)
+    if (messageAndOffsets.isEmpty) {
       MessageSet.Empty.sizeInBytes
     } else if (compressionCodec == NoCompressionCodec) {
-      for(messageOffset <- messages)
+      for (messageOffset <- messageAndOffsets)
         ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset)
-      MessageSet.messageSetSize(messagesIterable)
+      MessageSet.messageSetSize(messages)
     } else {
-      val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages.map(_.message))
-      val firstAbsoluteOffset = messages.head.offset
+      val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
+      val firstMessageOffset = messageAndOffsets.head
+      val firstAbsoluteOffset = firstMessageOffset.offset
       var offset = -1L
-      val timestampType = messages.head.message.timestampType
-      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 2, 1024), 1 << 16))
+      val timestampType = firstMessageOffset.message.timestampType
+      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
       messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = messageFormatVersion) { outputStream =>
         val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
         try {
-          for (messageOffset <- messages) {
+          for (messageOffset <- messageAndOffsets) {
             val message = messageOffset.message
             offset = messageOffset.offset
-            // Use inner offset when magic value is greater than 0
             if (messageFormatVersion > Message.MagicValue_V0) {
               // The offset of the messages are absolute offset, compute the inner offset.
               val innerOffset = messageOffset.offset - firstAbsoluteOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index a8fffbd..a76dce7 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -19,6 +19,8 @@ package kafka.log
 
 import java.util.Properties
 
+import scala.collection.JavaConverters._
+
 import kafka.api.ApiVersion
 import kafka.message.{BrokerCompressionCodec, Message}
 import kafka.server.KafkaConfig
@@ -73,9 +75,9 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
   val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
   val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
   val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
-  val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp)).messageFormatVersion
+  val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp))
   val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
-  val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp)
+  val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -177,12 +179,8 @@ object LogConfig {
 
   def apply(): LogConfig = LogConfig(new Properties())
 
-  def configNames() = {
-    import scala.collection.JavaConversions._
-    configDef.names().toList.sorted
-  }
-
-
+  def configNames: Seq[String] = configDef.names.asScala.toSeq.sorted
+  
   /**
    * Create a log config instance using the given properties and defaults
    */
@@ -197,10 +195,8 @@ object LogConfig {
    * Check that property names are valid
    */
   def validateNames(props: Properties) {
-    import scala.collection.JavaConversions._
-    val names = configDef.names()
-    for(name <- props.keys)
-      require(names.contains(name), "Unknown configuration \"%s\".".format(name))
+    val names = configNames
+    for (name <- props.keys.asScala) require(names.contains(name), s"Unknown configuration `$name`.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 69386c1..b64fac6 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -19,11 +19,13 @@ package kafka.log
 
 import java.io._
 import java.util.concurrent.TimeUnit
+
 import kafka.utils._
+
 import scala.collection._
-import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
-import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future}
+import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
+import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
 
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -463,7 +465,7 @@ class LogManager(val logDirs: Array[File],
   /**
    * Get a map of TopicAndPartition => Log
    */
-  def logsByTopicPartition = logs.toMap
+  def logsByTopicPartition: Map[TopicAndPartition, Log] = logs.toMap
 
   /**
    * Map of log dir to logs by topic and partitions in that dir

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 2867c78..856f971 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -18,33 +18,33 @@
 package kafka.message
 
 import kafka.utils.{IteratorTemplate, Logging}
-import kafka.common.KafkaException
-
+import kafka.common.{KafkaException, LongRef}
 import java.nio.ByteBuffer
 import java.nio.channels._
 import java.io._
-import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import java.util.ArrayDeque
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
 
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
 
 object ByteBufferMessageSet {
 
-  private def create(offsetAssignor: OffsetAssigner,
+  private def create(offsetAssigner: OffsetAssigner,
                      compressionCodec: CompressionCodec,
                      wrapperMessageTimestamp: Option[Long],
                      timestampType: TimestampType,
                      messages: Message*): ByteBuffer = {
-    if(messages.size == 0) {
+    if (messages.isEmpty)
       MessageSet.Empty.buffer
-    } else if(compressionCodec == NoCompressionCodec) {
+    else if (compressionCodec == NoCompressionCodec) {
       val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-      for(message <- messages)
-        writeMessage(buffer, message, offsetAssignor.nextAbsoluteOffset)
+      for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
       buffer.rewind()
       buffer
     } else {
@@ -58,12 +58,12 @@ object ByteBufferMessageSet {
         val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
         try {
           for (message <- messages) {
-            offset = offsetAssignor.nextAbsoluteOffset
+            offset = offsetAssigner.nextAbsoluteOffset()
             if (message.magic != magicAndTimestamp.magic)
               throw new IllegalArgumentException("Messages in the message set must have same magic value")
             // Use inner offset if magic value is greater than 0
             if (magicAndTimestamp.magic > Message.MagicValue_V0)
-              output.writeLong(offsetAssignor.toInnerOffset(offset))
+              output.writeLong(offsetAssigner.toInnerOffset(offset))
             else
               output.writeLong(offset)
             output.writeInt(message.size)
@@ -87,24 +87,22 @@ object ByteBufferMessageSet {
 
     new IteratorTemplate[MessageAndOffset] {
 
-      val wrapperMessageOffset = wrapperMessageAndOffset.offset
-      val wrapperMessage = wrapperMessageAndOffset.message
+      val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset
       val wrapperMessageTimestampOpt: Option[Long] =
         if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None
       val wrapperMessageTimestampTypeOpt: Option[TimestampType] =
         if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None
       if (wrapperMessage.payload == null)
-        throw new KafkaException("Message payload is null: " + wrapperMessage)
-      val inputStream: InputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
-      val compressed: DataInputStream = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream))
+        throw new KafkaException(s"Message payload is null: $wrapperMessage")
+      val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
+      val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream))
       var lastInnerOffset = -1L
 
       val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
-        var innerMessageAndOffsets = new mutable.Queue[MessageAndOffset]()
+        val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]()
         try {
-          while (true) {
-            innerMessageAndOffsets += readMessageFromStream()
-          }
+          while (true)
+            innerMessageAndOffsets.add(readMessageFromStream())
         } catch {
           case eofe: EOFException =>
             compressed.close()
@@ -112,23 +110,18 @@ object ByteBufferMessageSet {
             throw new KafkaException(ioe)
         }
         Some(innerMessageAndOffsets)
-      } else {
-        None
-      }
+      } else None
 
-      private def readMessageFromStream() = {
-        // read the offset
+      private def readMessageFromStream(): MessageAndOffset = {
         val innerOffset = compressed.readLong()
-        // read record size
-        val size = compressed.readInt()
+        val recordSize = compressed.readInt()
 
-        if (size < MinMessageOverhead)
-          throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")
+        if (recordSize < MinMessageOverhead)
+          throw new InvalidMessageException(s"Message found with corrupt size `$recordSize` in deep iterator")
 
-        // read the record into an intermediate record buffer
-        // and hence has to do extra copy
-        val bufferArray = new Array[Byte](size)
-        compressed.readFully(bufferArray, 0, size)
+        // read the record into an intermediate record buffer (i.e. extra copy needed)
+        val bufferArray = new Array[Byte](recordSize)
+        compressed.readFully(bufferArray, 0, recordSize)
         val buffer = ByteBuffer.wrap(bufferArray)
 
         // Override the timestamp if necessary
@@ -146,20 +139,17 @@ object ByteBufferMessageSet {
         messageAndOffsets match {
           // Using inner offset and timestamps
           case Some(innerMessageAndOffsets) =>
-            if (innerMessageAndOffsets.isEmpty)
-              allDone()
-            else {
-              val messageAndOffset = innerMessageAndOffsets.dequeue()
-              val message = messageAndOffset.message
-              val relativeOffset = messageAndOffset.offset - lastInnerOffset
-              val absoluteOffset = wrapperMessageOffset + relativeOffset
-              new MessageAndOffset(message, absoluteOffset)
+            innerMessageAndOffsets.pollFirst() match {
+              case null => allDone()
+              case MessageAndOffset(message, offset) =>
+                val relativeOffset = offset - lastInnerOffset
+                val absoluteOffset = wrapperMessageOffset + relativeOffset
+                new MessageAndOffset(message, absoluteOffset)
             }
           // Not using inner offset and timestamps
           case None =>
-            try {
-              readMessageFromStream()
-            } catch {
+            try readMessageFromStream()
+            catch {
               case eofe: EOFException =>
                 compressed.close()
                 allDone()
@@ -185,17 +175,23 @@ object ByteBufferMessageSet {
   }
 }
 
+private object OffsetAssigner {
+
+  def apply(offsetCounter: LongRef, size: Int): OffsetAssigner =
+    new OffsetAssigner(offsetCounter.value to offsetCounter.addAndGet(size))
+
+}
+
 private class OffsetAssigner(offsets: Seq[Long]) {
-  val index = new AtomicInteger(0)
+  private var index = 0
 
-  def this(offsetCounter: AtomicLong, size: Int) {
-    this((offsetCounter.get() to offsetCounter.get + size).toSeq)
-    offsetCounter.addAndGet(size)
+  def nextAbsoluteOffset(): Long = {
+    val result = offsets(index)
+    index += 1
+    result
   }
 
-  def nextAbsoluteOffset = offsets(index.getAndIncrement)
-
-  def toInnerOffset(offset: Long) = offset - offsets(0)
+  def toInnerOffset(offset: Long): Long = offset - offsets.head
 
 }
 
@@ -209,17 +205,17 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
  *
  *
- * When message format v1 is used, there will be following message format changes.
- * - For non-compressed messages, with message v1 we are adding timestamp and timestamp type attribute. The offsets of
+ * Message format v1 has the following changes:
+ * - For non-compressed messages, timestamp and timestamp type attributes have been added. The offsets of
  *   the messages remain absolute offsets.
- * - For Compressed messages, with message v1 we are adding timestamp, timestamp type attribute bit and using
- *   inner offsets (IO) for inner messages of compressed messages (see offset calculation details below). Timestamp type
- *   attribute is only set in wrapper messages. Inner messages always have CreateTime as timestamp type in attributes.
+ * - For compressed messages, timestamp and timestamp type attributes have been added and inner offsets (IO) are used
+ *   for inner messages of compressed messages (see offset calculation details below). The timestamp type
+ *   attribute is only set in wrapper messages. Inner messages always have CreateTime as the timestamp type in attributes.
  *
- * The way timestamp set is following:
- * For non-compressed messages: timestamp and timestamp type attribute in the messages are set and used.
+ * We set the timestamp in the following way:
+ * For non-compressed messages: the timestamp and timestamp type message attributes are set and used.
  * For compressed messages:
- * 1. Wrapper messages' timestamp type attribute is set to proper value
+ * 1. Wrapper messages' timestamp type attribute is set to the proper value
  * 2. Wrapper messages' timestamp is set to:
  *    - the max timestamp of inner messages if CreateTime is used
  *    - the current server time if wrapper message's timestamp = LogAppendTime.
@@ -227,11 +223,10 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  * 3. Inner messages' timestamp will be:
  *    - used when wrapper message's timestamp type is CreateTime
  *    - ignored when wrapper message's timestamp type is LogAppendTime
- * 4. Inner messages' timestamp type will always be ignored. However, producer must set the inner message timestamp
- *    type to CreateTime, otherwise the messages will be rejected by broker.
+ * 4. Inner messages' timestamp type will always be ignored with one exception: producers must set the inner message
+ *    timestamp type to CreateTime, otherwise the messages will be rejected by broker.
  *
- *
- * The way absolute offset calculated is the following:
+ * Absolute offsets are calculated in the following way:
  * Ideally the conversion from relative offset(RO) to absolute offset(AO) should be:
  *
  * AO = AO_Of_Last_Inner_Message + RO
@@ -240,7 +235,7 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  * And the relative offset of an inner message compared with the last inner message is not known until
  * the last inner message is written.
  * Unfortunately we are not able to change the previously written messages after the last message is written to
- * the message set when stream compressing is used.
+ * the message set when stream compression is used.
  *
  * To solve this issue, we use the following solution:
  *
@@ -254,20 +249,23 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  *    RO = IO_of_a_message - IO_of_the_last_message
  *    AO = AO_Of_Last_Inner_Message + RO
  *
- * 4. This solution works for compacted message set as well
+ * 4. This solution works for compacted message sets as well.
  *
  */
 class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {
   private var shallowValidByteCount = -1
 
-  def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(ByteBufferMessageSet.create(new OffsetAssigner(new AtomicLong(0), messages.size), compressionCodec,
-      None, TimestampType.CREATE_TIME, messages:_*))
+  private[kafka] def this(compressionCodec: CompressionCodec,
+                          offsetCounter: LongRef,
+                          wrapperMessageTimestamp: Option[Long],
+                          timestampType: TimestampType,
+                          messages: Message*) {
+    this(ByteBufferMessageSet.create(OffsetAssigner(offsetCounter, messages.size), compressionCodec,
+      wrapperMessageTimestamp, timestampType, messages:_*))
   }
 
-  def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) {
-    this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, messages.size), compressionCodec,
-      None, TimestampType.CREATE_TIME, messages:_*))
+  def this(compressionCodec: CompressionCodec, offsetCounter: LongRef, messages: Message*) {
+    this(compressionCodec, offsetCounter, None, TimestampType.CREATE_TIME, messages:_*)
   }
 
   def this(compressionCodec: CompressionCodec, offsetSeq: Seq[Long], messages: Message*) {
@@ -275,31 +273,21 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       None, TimestampType.CREATE_TIME, messages:_*))
   }
 
-  def this(messages: Message*) {
-    this(NoCompressionCodec, new AtomicLong(0), messages: _*)
+  def this(compressionCodec: CompressionCodec, messages: Message*) {
+    this(compressionCodec, new LongRef(0L), messages: _*)
   }
 
-  // This constructor is only used internally
-  private[kafka] def this(compressionCodec: CompressionCodec,
-                          offsetCounter: AtomicLong,
-                          wrapperMessageTimestamp: Option[Long],
-                          timestampType: TimestampType,
-                          messages: Message*) {
-    this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, messages.size), compressionCodec,
-      wrapperMessageTimestamp, timestampType, messages:_*))
+  def this(messages: Message*) {
+    this(NoCompressionCodec, messages: _*)
   }
 
   def getBuffer = buffer
 
   private def shallowValidBytes: Int = {
-    if(shallowValidByteCount < 0) {
-      var bytes = 0
-      val iter = this.internalIterator(true)
-      while(iter.hasNext) {
-        val messageAndOffset = iter.next
-        bytes += MessageSet.entrySize(messageAndOffset.message)
-      }
-      this.shallowValidByteCount = bytes
+    if (shallowValidByteCount < 0) {
+      this.shallowValidByteCount = this.internalIterator(isShallow = true).map { messageAndOffset =>
+        MessageSet.entrySize(messageAndOffset.message)
+      }.sum
     }
     shallowValidByteCount
   }
@@ -315,7 +303,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
     written
   }
 
-  override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
+  override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
     for (messageAndOffset <- shallowIterator) {
       if (messageAndOffset.message.magic != expectedMagicValue)
         return false
@@ -398,27 +386,28 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
    *
    * If no format conversion or value overwriting is required for messages, this method will perform in-place
    * operations and avoid re-compression.
+   *
+   * Returns the message set and a boolean indicating whether the message sizes may have changed.
    */
-  private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
-                                                      now: Long = System.currentTimeMillis(),
+  private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
+                                                      now: Long,
                                                       sourceCodec: CompressionCodec,
                                                       targetCodec: CompressionCodec,
                                                       compactedTopic: Boolean = false,
                                                       messageFormatVersion: Byte = Message.CurrentMagicValue,
                                                       messageTimestampType: TimestampType,
-                                                      messageTimestampDiffMaxMs: Long): ByteBufferMessageSet = {
+                                                      messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
-      if (!magicValueInAllWrapperMessages(messageFormatVersion)) {
+      if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
         // Message format conversion
-        convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
-          messageFormatVersion)
+        (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
+          messageFormatVersion), true)
       } else {
         // Do in-place validation, offset assignment and maybe set timestamp
-        validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
-          messageTimestampDiffMaxMs)
+        (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
+          messageTimestampDiffMaxMs), false)
       }
-
     } else {
       // Deal with compressed messages
       // We cannot do in place assignment in one of the following situations:
@@ -432,8 +421,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
 
       var maxTimestamp = Message.NoTimestamp
       val expectedInnerOffset = new AtomicLong(0)
-      val validatedMessages = new ListBuffer[Message]
-      this.internalIterator(isShallow = false).foreach(messageAndOffset => {
+      val validatedMessages = new mutable.ArrayBuffer[Message]
+      this.internalIterator(isShallow = false).foreach { messageAndOffset =>
         val message = messageAndOffset.message
         validateMessageKey(message, compactedTopic)
         if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
@@ -441,7 +430,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           // Validate the timestamp
           validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)
           // Check if we need to overwrite offset
-          if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement)
+          if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
             inPlaceAssignment = false
           maxTimestamp = math.max(maxTimestamp, message.timestamp)
         }
@@ -451,7 +440,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           inPlaceAssignment = false
 
         validatedMessages += message.toFormatVersion(messageFormatVersion)
-      })
+      }
       if (!inPlaceAssignment) {
         // Cannot do in place assignment.
         val wrapperMessageTimestamp = {
@@ -463,11 +452,11 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
             Some(now)
         }
 
-        new ByteBufferMessageSet(compressionCodec = targetCodec,
-                                 offsetCounter = offsetCounter,
-                                 wrapperMessageTimestamp = wrapperMessageTimestamp,
-                                 timestampType = messageTimestampType,
-                                 messages = validatedMessages.toBuffer: _*)
+        (new ByteBufferMessageSet(compressionCodec = targetCodec,
+                                  offsetCounter = offsetCounter,
+                                  wrapperMessageTimestamp = wrapperMessageTimestamp,
+                                  timestampType = messageTimestampType,
+                                  messages = validatedMessages: _*), true)
       } else {
         // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
         buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
@@ -480,53 +469,49 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
         val timestamp = buffer.getLong(timestampOffset)
         val attributes = buffer.get(attributeOffset)
         if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
-            // We don't need to recompute crc if the timestamp is not updated.
-            crcUpdateNeeded = false
+          // We don't need to recompute crc if the timestamp is not updated.
+          crcUpdateNeeded = false
         else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
           // Set timestamp type and timestamp
           buffer.putLong(timestampOffset, now)
-          buffer.put(attributeOffset, TimestampType.setTimestampType(attributes, TimestampType.LOG_APPEND_TIME))
+          buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes))
         }
 
         if (crcUpdateNeeded) {
           // need to recompute the crc value
           buffer.position(MessageSet.LogOverhead)
           val wrapperMessage = new Message(buffer.slice())
-          Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum())
+          Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
         }
         buffer.rewind()
-        this
+        (this, false)
       }
     }
   }
 
-  // We create this method to save memory copy operation. It reads from the original message set and directly
+  // We create this method to avoid a memory copy. It reads from the original message set and directly
   // writes the converted messages into new message set buffer. Hence we don't need to allocate memory for each
   // individual message during message format conversion.
-  private def convertNonCompressedMessages(offsetCounter: AtomicLong,
+  private def convertNonCompressedMessages(offsetCounter: LongRef,
                                            compactedTopic: Boolean,
                                            now: Long,
                                            timestampType: TimestampType,
                                            messageTimestampDiffMaxMs: Long,
                                            toMagicValue: Byte): ByteBufferMessageSet = {
-    val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).foldLeft(0)(
-      (sizeDiff, messageAndOffset) => sizeDiff + Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue))
+    val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).map { messageAndOffset =>
+      Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue)
+    }.sum
     val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
     var newMessagePosition = 0
-    this.internalIterator(isShallow = true).foreach {messageAndOffset =>
-      val message = messageAndOffset.message
+    this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) =>
       validateMessageKey(message, compactedTopic)
       validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs)
       newBuffer.position(newMessagePosition)
-      // write offset.
-      newBuffer.putLong(offsetCounter.getAndIncrement)
-      // Write new message size
+      newBuffer.putLong(offsetCounter.getAndIncrement())
       val newMessageSize = message.size + Message.headerSizeDiff(message.magic, toMagicValue)
       newBuffer.putInt(newMessageSize)
-      // Create new message buffer
       val newMessageBuffer = newBuffer.slice()
       newMessageBuffer.limit(newMessageSize)
-      // Convert message
       message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType)
 
       newMessagePosition += MessageSet.LogOverhead + newMessageSize
@@ -535,7 +520,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
     new ByteBufferMessageSet(newBuffer)
   }
 
-  private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: AtomicLong,
+  private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef,
                                                                   now: Long,
                                                                   compactedTopic: Boolean,
                                                                   timestampType: TimestampType,
@@ -555,8 +540,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
         validateTimestamp(message, now, timestampType, timestampDiffMaxMs)
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
           message.buffer.putLong(Message.TimestampOffset, now)
-          message.buffer.put(Message.AttributesOffset, TimestampType.setTimestampType(message.attributes, TimestampType.LOG_APPEND_TIME))
-          Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum())
+          message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes))
+          Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum)
         }
       }
       messagePosition += MessageSet.LogOverhead + messageSize

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 51aa11a..2ab2e0c 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -50,8 +50,8 @@ object Message {
   val ValueSizeLength = 4
 
   private val MessageHeaderSizeMap = Map (
-    0.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength),
-    1.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength))
+    (0: Byte) -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength),
+    (1: Byte) -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength))
 
   /**
    * The amount of overhead bytes in a message
@@ -123,10 +123,10 @@ object Message {
  *
  * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents.
  * @param buffer the byte buffer of this message.
- * @param wrapperMessageTimestamp the wrapper message timestamp, only not None when the message is an inner message
- *                                of a compressed message.
- * @param wrapperMessageTimestampType the wrapper message timestamp type, only not None when the message is an inner
- *                                    message of a compressed message.
+ * @param wrapperMessageTimestamp the wrapper message timestamp, which is only defined when the message is an inner
+ *                                message of a compressed message.
+ * @param wrapperMessageTimestampType the wrapper message timestamp type, which is only defined when the message is an
+ *                                    inner message of a compressed message.
  */
 class Message(val buffer: ByteBuffer,
               private val wrapperMessageTimestamp: Option[Long] = None,
@@ -168,11 +168,10 @@ class Message(val buffer: ByteBuffer,
     // skip crc, we will fill that in at the end
     buffer.position(MagicOffset)
     buffer.put(magicValue)
-    var attributes: Byte = 0
-    if (codec.codec > 0) {
-      attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
-      attributes = TimestampType.setTimestampType(attributes, timestampType)
-    }
+    val attributes: Byte =
+      if (codec.codec > 0)
+        timestampType.updateAttributes((CompressionCodeMask & codec.codec).toByte)
+      else 0
     buffer.put(attributes)
     // Only put timestamp when "magic" value is greater than 0
     if (magic > MagicValue_V0)
@@ -213,7 +212,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * Compute the checksum of the message from the message contents
    */
-  def computeChecksum(): Long = 
+  def computeChecksum: Long =
     CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  buffer.limit - MagicOffset)
   
   /**
@@ -231,7 +230,7 @@ class Message(val buffer: ByteBuffer,
    */
   def ensureValid() {
     if(!isValid)
-      throw new InvalidMessageException("Message is corrupt (stored crc = " + checksum + ", computed crc = " + computeChecksum() + ")")
+      throw new InvalidMessageException(s"Message is corrupt (stored crc = ${checksum}, computed crc = ${computeChecksum})")
   }
   
   /**
@@ -242,7 +241,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * The position where the key size is stored.
    */
-  def keySizeOffset = {
+  private def keySizeOffset = {
     if (magic == MagicValue_V0) KeySizeOffset_V0
     else KeySizeOffset_V1
   }
@@ -260,7 +259,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * The position where the payload size is stored
    */
-  def payloadSizeOffset = {
+  private def payloadSizeOffset = {
     if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize)
     else KeyOffset_V1 + max(0, keySize)
   }
@@ -273,7 +272,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * Is the payload of this message null
    */
-  def isNull(): Boolean = payloadSize < 0
+  def isNull: Boolean = payloadSize < 0
   
   /**
    * The magic version of this message
@@ -309,7 +308,7 @@ class Message(val buffer: ByteBuffer,
     if (magic == MagicValue_V0)
       TimestampType.NO_TIMESTAMP_TYPE
     else
-      wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes))
+      wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))
   }
   
   /**
@@ -337,23 +336,23 @@ class Message(val buffer: ByteBuffer,
     else {
       val byteBuffer = ByteBuffer.allocate(size + Message.headerSizeDiff(magic, toMagicValue))
       // Copy bytes from old messages to new message
-      convertToBuffer(toMagicValue, byteBuffer)
+      convertToBuffer(toMagicValue, byteBuffer, Message.NoTimestamp)
       new Message(byteBuffer)
     }
   }
 
   def convertToBuffer(toMagicValue: Byte,
                       byteBuffer: ByteBuffer,
-                      now: Long = NoTimestamp,
-                      timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes))) {
+                      now: Long,
+                      timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))) {
     if (byteBuffer.remaining() < size + headerSizeDiff(magic, toMagicValue))
       throw new IndexOutOfBoundsException("The byte buffer does not have enough capacity to hold new message format " +
-        "version " + toMagicValue)
+        s"version $toMagicValue")
     if (toMagicValue == Message.MagicValue_V1) {
       // Up-conversion, reserve CRC and update magic byte
       byteBuffer.position(Message.MagicOffset)
       byteBuffer.put(Message.MagicValue_V1)
-      byteBuffer.put(TimestampType.setTimestampType(attributes, timestampType))
+      byteBuffer.put(timestampType.updateAttributes(attributes))
       // Up-conversion, insert the timestamp field
       if (timestampType == TimestampType.LOG_APPEND_TIME)
         byteBuffer.putLong(now)
@@ -364,13 +363,13 @@ class Message(val buffer: ByteBuffer,
       // Down-conversion, reserve CRC and update magic byte
       byteBuffer.position(Message.MagicOffset)
       byteBuffer.put(Message.MagicValue_V0)
-      byteBuffer.put(TimestampType.setTimestampType(attributes, TimestampType.CREATE_TIME))
+      byteBuffer.put(TimestampType.CREATE_TIME.updateAttributes(attributes))
       // Down-conversion, skip the timestamp field
       byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V1, size - Message.KeySizeOffset_V1)
     }
     // update crc value
     val newMessage = new Message(byteBuffer)
-    Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum())
+    Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum)
     byteBuffer.rewind()
   }
 
@@ -382,7 +381,7 @@ class Message(val buffer: ByteBuffer,
     if(size < 0) {
       null
     } else {
-      var b = buffer.duplicate
+      var b = buffer.duplicate()
       b.position(start + 4)
       b = b.slice()
       b.limit(size)
@@ -396,9 +395,9 @@ class Message(val buffer: ByteBuffer,
    */
   private def validateTimestampAndMagicValue(timestamp: Long, magic: Byte) {
     if (magic != MagicValue_V0 && magic != MagicValue_V1)
-      throw new IllegalArgumentException("Invalid magic value " + magic)
+      throw new IllegalArgumentException(s"Invalid magic value $magic")
     if (timestamp < 0 && timestamp != NoTimestamp)
-      throw new IllegalArgumentException("Invalid message timestamp " + timestamp)
+      throw new IllegalArgumentException(s"Invalid message timestamp $timestamp")
     if (magic == MagicValue_V0 && timestamp != NoTimestamp)
       throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index 014788a..14c455c 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -37,30 +37,16 @@ object MessageSet {
     messages.foldLeft(0)(_ + entrySize(_))
 
   /**
-   * The size of a list of messages
-   */
-  def messageSetSize(messages: java.util.List[Message]): Int = {
-    var size = 0
-    val iter = messages.iterator
-    while(iter.hasNext) {
-      val message = iter.next
-      size += entrySize(message)
-    }
-    size
-  }
-  
-  /**
    * The size of a size-delimited entry in a message set
    */
   def entrySize(message: Message): Int = LogOverhead + message.size
 
   /**
-   * Validate the "magic" values of messages are the same in a compressed message set and return the magic value of
-   * and the max timestamp of the inner messages.
+   * Validate that all "magic" values in `messages` are the same and return their magic value and max timestamp
    */
   def magicAndLargestTimestamp(messages: Seq[Message]): MagicAndTimestamp = {
     val firstMagicValue = messages.head.magic
-    var largestTimestamp: Long = Message.NoTimestamp
+    var largestTimestamp = Message.NoTimestamp
     for (message <- messages) {
       if (message.magic != firstMagicValue)
         throw new IllegalStateException("Messages in the same message set must have same magic value")
@@ -92,7 +78,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
   /**
    * Check if all the wrapper messages in the message set have the expected magic value
    */
-  def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean
+  def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean
 
   /**
    * Provides an iterator over the message/offset pairs in this set

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/MessageWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageWriter.scala b/core/src/main/scala/kafka/message/MessageWriter.scala
index 660772c..e6954ff 100755
--- a/core/src/main/scala/kafka/message/MessageWriter.scala
+++ b/core/src/main/scala/kafka/message/MessageWriter.scala
@@ -40,7 +40,7 @@ class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize)
       if (codec.codec > 0)
         attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
       if (magicValue > MagicValue_V0)
-        attributes = TimestampType.setTimestampType(attributes, timestampType)
+        attributes = timestampType.updateAttributes(attributes)
       write(attributes)
       // Write timestamp
       if (magicValue > MagicValue_V0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 9343fde..4bdd308 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -20,14 +20,13 @@ package kafka.server
 import java.util.Properties
 
 import kafka.api.ApiVersion
-import kafka.common.TopicAndPartition
-import kafka.log.{Log, LogConfig, LogManager}
+import kafka.log.{LogConfig, LogManager}
 import kafka.utils.Logging
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.protocol.ApiKeys
 
-import scala.collection.mutable
 import scala.collection.Map
+import scala.collection.JavaConverters._
 
 /**
  * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
@@ -42,29 +41,27 @@ trait ConfigHandler {
  */
 class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig) extends ConfigHandler with Logging {
 
-  def processConfigChanges(topic : String, topicConfig : Properties) {
-    val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
-    val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case (topicAndPartition, log) => topicAndPartition.topic }
-            .mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) }
+  def processConfigChanges(topic: String, topicConfig: Properties) {
     // Validate the compatibility of message format version.
-    Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)) match {
-      case Some(versionString) =>
-        if (!kafkaConfig.interBrokerProtocolVersion.onOrAfter(ApiVersion(versionString))) {
-          topicConfig.remove(LogConfig.MessageFormatVersionProp)
-          warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for $topic because $versionString " +
-            s"is not compatible with Kafka inter broker protocol version ${kafkaConfig.interBrokerProtocolVersion}")
-        }
-      case _ =>
+    val configNameToExclude = Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).flatMap { versionString =>
+      if (kafkaConfig.interBrokerProtocolVersion < ApiVersion(versionString)) {
+        warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for `$topic` because `$versionString` " +
+          s"is not compatible with Kafka inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`")
+        Some(LogConfig.MessageFormatVersionProp)
+      }
+      else None
     }
 
-    if (logsByTopic.contains(topic)) {
+    val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer
+    if (logs.nonEmpty) {
       /* combine the default properties with the overrides in zk to create the new LogConfig */
       val props = new Properties()
       props.putAll(logManager.defaultConfig.originals)
-      props.putAll(topicConfig)
+      topicConfig.asScala.foreach { case (key, value) =>
+        if (key != configNameToExclude) props.put(key, value)
+      }
       val logConfig = LogConfig(props)
-      for (log <- logsByTopic(topic))
-        log.config = logConfig
+      logs.foreach(_.config = logConfig)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bd02630..2a289b4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -368,12 +368,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           val respHeader = new ResponseHeader(request.header.correlationId)
           val respBody = request.header.apiVersion match {
             case 0 => new ProduceResponse(mergedResponseStatus.asJava)
-            case 1 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, 1)
-            case 2 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, 2)
+            case version@ (1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
             // This case shouldn't happen unless a new version of ProducerRequest is added without
             // updating this part of the code to handle it properly.
-            case _ => throw new IllegalArgumentException("Version %d of ProducerRequest is not handled. Code must be updated."
-              .format(request.header.apiVersion))
+            case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
           }
 
           requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
@@ -424,52 +422,51 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
     }
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty))
+    val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ =>
+      FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty)
+    }
 
     // the callback for sending a fetch response
     def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
 
-      val convertedResponseStatus =
+      val convertedPartitionData =
         // Need to down-convert message when consumer only takes magic value 0.
         if (fetchRequest.versionId <= 1) {
-          responsePartitionData.map({ case (tp, data) =>
-            tp -> {
-              // We only do down-conversion when:
-              // 1. The message format version configured for the topic is using magic value > 0, and
-              // 2. The message set contains message whose magic > 0
-              // This is to reduce the message format conversion as much as possible. The conversion will only occur
-              // when new message format is used for the topic and we see an old request.
-              // Please notice that if the message format is changed from a higher version back to lower version this
-              // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
-              // without format down conversion.
-              if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
-                  !data.messages.magicValueInAllWrapperMessages(Message.MagicValue_V0)) {
-                trace("Down converting message to V0 for fetch request from " + fetchRequest.clientId)
-                new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
-              } else
-                data
-            }
-          })
-        } else
-          responsePartitionData
+          responsePartitionData.map { case (tp, data) =>
+
+            // We only do down-conversion when:
+            // 1. The message format version configured for the topic is using magic value > 0, and
+            // 2. The message set contains message whose magic > 0
+            // This is to reduce the message format conversion as much as possible. The conversion will only occur
+            // when new message format is used for the topic and we see an old request.
+            // Please note that if the message format is changed from a higher version back to lower version this
+            // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
+            // without format down conversion.
+            val convertedData = if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
+              !data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) {
+              trace(s"Down converting message to V0 for fetch request from ${fetchRequest.clientId}")
+              new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
+            } else data
+
+            tp -> convertedData
+          }
+        } else responsePartitionData
 
-      val mergedResponseStatus = convertedResponseStatus ++ unauthorizedResponseStatus
+      val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData
 
-      mergedResponseStatus.foreach { case (topicAndPartition, data) =>
-        if (data.error != Errors.NONE.code) {
-          debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s"
-            .format(fetchRequest.correlationId, fetchRequest.clientId,
-            topicAndPartition, Errors.forCode(data.error).exceptionName))
-        }
+      mergedPartitionData.foreach { case (topicAndPartition, data) =>
+        if (data.error != Errors.NONE.code)
+          debug(s"Fetch request with correlation id ${fetchRequest.correlationId} from client ${fetchRequest.clientId} " +
+            s"on partition $topicAndPartition failed due to ${Errors.forCode(data.error).exceptionName}")
         // record the bytes out metrics only when the response is being sent
         BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
         BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
       }
 
       def fetchResponseCallback(delayTimeMs: Int) {
-          trace(s"Sending fetch response to ${fetchRequest.clientId} with ${convertedResponseStatus.values.map(_.messages.sizeInBytes).sum}" +
-            s" bytes")
-        val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)
+        trace(s"Sending fetch response to client ${fetchRequest.clientId} of " +
+          s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} bytes")
+        val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData, fetchRequest.versionId, delayTimeMs)
         requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
       }
 
@@ -482,7 +479,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchResponseCallback(0)
       } else {
         quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
-                                                               FetchResponse.responseSize(responsePartitionData.groupBy(_._1.topic),
+                                                               FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic),
                                                                                           fetchRequest.versionId),
                                                                fetchResponseCallback)
       }