You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/19 16:56:49 UTC

[2/5] kafka git commit: KAFKA-3025; Added timetamp to Message and use relative offset.

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2c6311c..ae810eb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -94,9 +94,12 @@ object Defaults {
   val LogFlushSchedulerIntervalMs = Long.MaxValue
   val LogFlushOffsetCheckpointIntervalMs = 60000
   val LogPreAllocateEnable = false
+  val MessageFormatVersion = ApiVersion.latestVersion.toString()
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
+  val MessageTimestampType = "CreateTime"
+  val MessageTimestampDifferenceMaxMs = Long.MaxValue
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -251,9 +254,12 @@ object KafkaConfig {
   val LogFlushIntervalMsProp = "log.flush.interval.ms"
   val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
   val LogPreAllocateProp = "log.preallocate"
+  val MessageFormatVersionProp = "message.format.version"
   val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
   val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
+  val MessageTimestampTypeProp = "message.timestamp.type"
+  val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms"
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
   val DefaultReplicationFactorProp = "default.replication.factor"
@@ -417,6 +423,14 @@ object KafkaConfig {
   val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
   val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
   val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with acks=all (or -1)"
+  val MessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid ApiVersion." +
+  "Some Examples are: 0.8.2, 0.9.0.0, 0.10.0. Check ApiVersion for detail. When setting the message format version, " +
+  "user certifies that all the existing messages on disk is at or below that version. Otherwise consumers before 0.10.0.0 will break."
+  val MessageTimestampTypeDoc = "Define whether the timestamp in the message is message create time or log append time. The value should be either" +
+  " \"CreateTime\" or \"LogAppendTime\""
+  val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
+  "a message and the timestamp specified in the message. If message.timestamp.type=CreateTime, a message will be rejected " +
+  "if the difference in timestamp exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
   val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels"
@@ -556,7 +570,7 @@ object KafkaConfig {
       .define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc)
       .define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc)
       .define(LogDirsProp, STRING, null, HIGH, LogDirsDoc)
-      .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Message.MinHeaderSize), HIGH, LogSegmentBytesDoc)
+      .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Message.MinMessageOverhead), HIGH, LogSegmentBytesDoc)
 
       .define(LogRollTimeMillisProp, LONG, null, HIGH, LogRollTimeMillisDoc)
       .define(LogRollTimeHoursProp, INT, Defaults.LogRollHours, atLeast(1), HIGH, LogRollTimeHoursDoc)
@@ -591,6 +605,9 @@ object KafkaConfig {
       .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
       .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
+      .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc)
+      .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc)
+      .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc)
 
       /** ********* Replication configuration ***********/
       .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)
@@ -781,6 +798,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val logRetentionTimeMillis = getLogRetentionTimeMillis
   val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
   val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
+  val messageFormatVersion = getString(KafkaConfig.MessageFormatVersionProp)
+  val messageTimestampType = getString(KafkaConfig.MessageTimestampTypeProp)
+  val messageTimestampDifferenceMaxMs = getLong(KafkaConfig.MessageTimestampDifferenceMaxMsProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -958,6 +978,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
       s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " +
       s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}"
     )
+    require(interBrokerProtocolVersion.onOrAfter(ApiVersion(messageFormatVersion)),
+      s"message.format.version $messageFormatVersion cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersion")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 41719e2..e3e185f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -21,7 +21,7 @@ import java.net.{SocketTimeoutException}
 import java.util
 
 import kafka.admin._
-import kafka.api.KAFKA_090
+import kafka.api.KAFKA_0_9_0
 import kafka.log.LogConfig
 import kafka.log.CleanerConfig
 import kafka.log.LogManager
@@ -75,6 +75,9 @@ object KafkaServer {
     logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType)
     logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable)
     logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
+    logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.messageFormatVersion)
+    logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.messageTimestampType)
+    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.messageTimestampDifferenceMaxMs)
     logProps
   }
 }
@@ -197,7 +200,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaController.startup()
 
         /* start kafka coordinator */
-        consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
+        consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager, kafkaMetricsTime)
         consumerCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/
@@ -216,7 +219,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         Mx4jLoader.maybeLoad()
 
         /* start dynamic config manager */
-        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
+        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
                                                            ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
 
         // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
@@ -512,7 +515,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
       val shutdownSucceeded =
         // Before 0.9.0.0, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in
         // `RequestHeader`, which is used by `NetworkClient`
-        if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_090))
+        if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0))
           networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue)
         else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c5f3360..2fdb46c 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -23,7 +23,7 @@ import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
-import kafka.api.KAFKA_090
+import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_9_0}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 import ReplicaFetcherThread._
 import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse}
@@ -55,7 +55,10 @@ class ReplicaFetcherThread(name: String,
   type REQ = FetchRequest
   type PD = PartitionData
 
-  private val fetchRequestVersion: Short = if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_090)) 1 else 0
+  private val fetchRequestVersion: Short =
+    if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_0_10_0_IV0)) 2
+    else if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0)) 1
+    else 0
   private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
   private val replicaId = brokerConfig.brokerId
   private val maxWait = brokerConfig.replicaFetchWaitMaxMs

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 61b6887..16b8c3a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -26,11 +26,12 @@ import kafka.cluster.{Partition, Replica}
 import kafka.common._
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogManager}
-import kafka.message.{ByteBufferMessageSet, MessageSet}
+import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException,
-InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException}
+InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException,
+InvalidTimestampException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -332,7 +333,7 @@ class ReplicaManager(val config: KafkaConfig,
         topicPartition ->
                 ProducePartitionStatus(
                   result.info.lastOffset + 1, // required offset
-                  new PartitionResponse(result.errorCode, result.info.firstOffset)) // response status
+                  new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
       }
 
       if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
@@ -358,9 +359,9 @@ class ReplicaManager(val config: KafkaConfig,
       // Just return an error and don't handle the request at all
       val responseStatus = messagesPerPartition.map {
         case (topicAndPartition, messageSet) =>
-          (topicAndPartition ->
-                  new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
-                    LogAppendInfo.UnknownLogAppendInfo.firstOffset))
+          (topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+                                                      LogAppendInfo.UnknownLogAppendInfo.firstOffset,
+                                                      Message.NoTimestamp))
       }
       responseCallback(responseStatus)
     }
@@ -440,6 +441,10 @@ class ReplicaManager(val config: KafkaConfig,
             (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
           case imse: CorruptRecordException =>
             (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
+          case ime : InvalidMessageException =>
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(ime)))
+          case itse : InvalidTimestampException =>
+            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(itse)))
           case t: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -568,6 +573,10 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  def getMessageFormatVersion(topicAndPartition: TopicAndPartition): Option[Byte] = {
+    getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap(_.log.map(_.config.messageFormatVersion))
+  }
+
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
     replicaStateChangeLock synchronized {
       if(updateMetadataRequest.controllerEpoch < controllerEpoch) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 73743aa..fe2ce9f 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -28,6 +28,7 @@ import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
 import org.apache.log4j.Logger
 
@@ -124,7 +125,7 @@ object ConsoleConsumer extends Logging {
       }
       messageCount += 1
       try {
-        formatter.writeTo(msg.key, msg.value, System.out)
+        formatter.writeTo(msg.key, msg.value, msg.timestamp, msg.timestampType, System.out)
       } catch {
         case e: Throwable =>
           if (skipMessageOnError) {
@@ -335,7 +336,7 @@ object ConsoleConsumer extends Logging {
 }
 
 trait MessageFormatter{
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
+  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream)
 
   def init(props: Properties) {}
 
@@ -356,12 +357,16 @@ class DefaultMessageFormatter extends MessageFormatter {
       lineSeparator = props.getProperty("line.separator").getBytes
   }
 
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
+    if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
+      output.write(s"$timestampType:$timestamp".getBytes)
+      output.write(keySeparator)
+    }
     if (printKey) {
-      output.write(if (key == null) "null".getBytes() else key)
+      output.write(if (key == null) "null".getBytes else key)
       output.write(keySeparator)
     }
-    output.write(if (value == null) "null".getBytes() else value)
+    output.write(if (value == null) "null".getBytes else value)
     output.write(lineSeparator)
   }
 }
@@ -370,17 +375,19 @@ class LoggingMessageFormatter extends MessageFormatter   {
   private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
   val logger = Logger.getLogger(getClass().getName)
 
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream): Unit = {
-    defaultWriter.writeTo(key, value, output)
+  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream): Unit = {
+    defaultWriter.writeTo(key, value, timestamp, timestampType, output)
     if(logger.isInfoEnabled)
-      logger.info(s"key:${if (key == null) "null" else new String(key)}, value:${if (value == null) "null" else new String(value)}")
+      logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
+                  s"key:${if (key == null) "null" else new String(key)}, " +
+                  s"value:${if (value == null) "null" else new String(value)}")
   }
 }
 
 class NoOpMessageFormatter extends MessageFormatter {
   override def init(props: Properties) {}
 
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
+  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream){}
 }
 
 class ChecksumMessageFormatter extends MessageFormatter {
@@ -394,8 +401,12 @@ class ChecksumMessageFormatter extends MessageFormatter {
       topicStr = ""
   }
 
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
-    val chksum = new Message(value, key).checksum
+  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
+    val chksum =
+      if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
+        new Message(value, key, timestamp, timestampType, NoCompressionCodec, 0, -1, Message.MagicValue_V1).checksum
+      else
+        new Message(value, key, Message.NoTimestamp, Message.MagicValue_V0).checksum
     output.println(topicStr + "checksum:" + chksum)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index fd15014..3c41c7c 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -274,7 +274,7 @@ object DumpLogSegments {
         case NoCompressionCodec =>
           getSingleMessageIterator(messageAndOffset)
         case _ =>
-          ByteBufferMessageSet.deepIterator(message)
+          ByteBufferMessageSet.deepIterator(messageAndOffset)
       }
     } else
       getSingleMessageIterator(messageAndOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index a964f69..95b0aad 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -485,7 +485,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     override def receive() : BaseConsumerRecord = {
       val messageAndMetadata = iter.next()
-      BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key, messageAndMetadata.message)
+      BaseConsumerRecord(messageAndMetadata.topic,
+                         messageAndMetadata.partition,
+                         messageAndMetadata.offset,
+                         messageAndMetadata.timestamp,
+                         messageAndMetadata.timestampType,
+                         messageAndMetadata.key,
+                         messageAndMetadata.message)
     }
 
     override def stop() {
@@ -541,7 +547,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
       offsets.put(tp, record.offset + 1)
 
-      BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
+      BaseConsumerRecord(record.topic,
+                         record.partition,
+                         record.offset,
+                         record.timestamp,
+                         record.timestampType,
+                         record.key,
+                         record.value)
     }
 
     override def stop() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 2b8537b..d88ec41 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -139,8 +139,8 @@ object ReplayLogProducer extends Logging {
             stream
         for (messageAndMetadata <- iter) {
           try {
-            val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic,
-                                            messageAndMetadata.key(), messageAndMetadata.message()))
+            val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic, null,
+                                            messageAndMetadata.timestamp, messageAndMetadata.key(), messageAndMetadata.message()))
             if(config.isSync) {
               response.get()
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 1c2023c..e20b061 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -220,7 +220,8 @@ object SimpleConsumerShell extends Logging {
                   System.out.println("next offset = " + offset)
                 val message = messageAndOffset.message
                 val key = if(message.hasKey) Utils.readBytes(message.key) else null
-                formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out)
+                val value = if (message.isNull()) null else Utils.readBytes(message.payload)
+                formatter.writeTo(key, value, message.timestamp, message.timestampType, System.out)
                 numMessagesConsumed += 1
               } catch {
                 case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index bc3a6ce..f15c005 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -14,8 +14,10 @@ package kafka.api
 
 import java.util
 
+import kafka.coordinator.GroupCoordinator
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 
@@ -24,11 +26,10 @@ import kafka.server.KafkaConfig
 
 import java.util.ArrayList
 import org.junit.Assert._
-import org.junit.{Test, Before}
+import org.junit.{Before, Test}
 
-import scala.collection.mutable.Buffer
 import scala.collection.JavaConverters._
-import kafka.coordinator.GroupCoordinator
+import scala.collection.mutable.Buffer
 
 /**
  * Integration tests for the new consumer that cover basic usage as well as server failures
@@ -75,7 +76,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     assertEquals(1, this.consumers(0).assignment.size)
 
     this.consumers(0).seek(tp, 0)
-    consumeAndVerifyRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, startingOffset = 0)
 
     // check async commit callbacks
     val commitCallback = new CountConsumerCommitCallback()
@@ -245,7 +246,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
 
     sendRecords(5)
     consumer0.subscribe(List(topic).asJava)
-    consumeAndVerifyRecords(consumer0, 5, 0)
+    consumeAndVerifyRecords(consumer = consumer0, numRecords = 5, startingOffset = 0)
     consumer0.pause(tp)
 
     // subscribe to a new topic to trigger a rebalance
@@ -253,7 +254,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
 
     // after rebalance, our position should be reset and our pause state lost,
     // so we should be able to consume from the beginning
-    consumeAndVerifyRecords(consumer0, 0, 5)
+    consumeAndVerifyRecords(consumer = consumer0, numRecords = 0, startingOffset = 5)
   }
 
   protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
@@ -276,26 +277,33 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   protected def sendRecords(numRecords: Int, tp: TopicPartition) {
-    sendRecords(this.producers(0), numRecords, tp)
-  }
-
-  protected def sendRecords(producer: Producer[Array[Byte], Array[Byte]],
-                            numRecords: Int,
-                            tp: TopicPartition) {
     (0 until numRecords).foreach { i =>
-      producer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes))
+      this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes))
     }
-    producer.flush()
+    this.producers(0).flush()
   }
 
-  protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int,
-                                      startingKeyAndValueIndex: Int = 0, tp: TopicPartition = tp) {
+  protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]],
+                                        numRecords: Int,
+                                        startingOffset: Int,
+                                        startingKeyAndValueIndex: Int = 0,
+                                        startingTimestamp: Long = 0L,
+                                        timestampType: TimestampType = TimestampType.CREATE_TIME,
+                                        tp: TopicPartition = tp) {
     val records = consumeRecords(consumer, numRecords)
+    val now = System.currentTimeMillis()
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
       assertEquals(tp.topic(), record.topic())
       assertEquals(tp.partition(), record.partition())
+      if (timestampType == TimestampType.CREATE_TIME) {
+        assertEquals(timestampType, record.timestampType())
+        val timestamp = startingTimestamp + i
+        assertEquals(timestamp.toLong, record.timestamp())
+      } else
+        assertTrue(s"Got unexpected timestamp ${record.timestamp()}. Timestamp should be between [$startingTimestamp, $now}]",
+          record.timestamp() >= startingTimestamp && record.timestamp() <= now)
       assertEquals(offset.toLong, record.offset())
       val keyAndValueIndex = startingKeyAndValueIndex + i
       assertEquals(s"key $keyAndValueIndex", new String(record.key()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 42928a3..807b8bb 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -17,19 +17,21 @@
 
 package kafka.api
 
-import java.io.File
 import java.util.Properties
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ExecutionException, TimeUnit}
 
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig
 import kafka.message.Message
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.errors.SerializationException
+import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException}
+import org.apache.kafka.common.record.TimestampType
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+
 import scala.collection.mutable.Buffer
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@@ -54,8 +56,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     super.setUp()
 
     // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "")
-    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
+    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024 * 1024, "")
+    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024 * 1024, "")
   }
 
   @After
@@ -88,6 +90,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     object callback extends Callback {
       var offset = 0L
+
       def onCompletion(metadata: RecordMetadata, exception: Exception) {
         if (exception == null) {
           assertEquals(offset, metadata.offset())
@@ -105,24 +108,24 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       // send a normal record
-      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
       assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
 
       // send a record with null value should be ok
-      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null)
+      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes, null)
       assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
 
       // send a record with null key should be ok
-      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes)
+      val record2 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
       assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
 
       // send a record with null part id should be ok
-      val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+      val record3 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
       assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
 
       // send a record with null topic should fail
       try {
-        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes)
+        val record4 = new ProducerRecord[Array[Byte], Array[Byte]](null, partition, "key".getBytes, "value".getBytes)
         producer.send(record4, callback)
         fail("Should not allow sending a record without topic")
       } catch {
@@ -143,6 +146,81 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   }
 
   @Test
+  def testSendCompressedMessageWithCreateTime() {
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+    sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
+  }
+
+  @Test
+  def testSendNonCompressedMessageWithCreateTime() {
+    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
+    sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
+  }
+
+  @Test
+  def testSendCompressedMessageWithLogAppendTime() {
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps))
+    sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
+  }
+
+  @Test
+  def testSendNonCompressedMessageWithLogApendTime() {
+    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
+    sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
+  }
+
+  private def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType) {
+    val partition = new Integer(0)
+
+    val baseTimestamp = 123456L
+    val startTime = System.currentTimeMillis()
+
+    object callback extends Callback {
+      var offset = 0L
+      var timestampDiff = 1L
+
+      def onCompletion(metadata: RecordMetadata, exception: Exception) {
+        if (exception == null) {
+          assertEquals(offset, metadata.offset())
+          assertEquals(topic, metadata.topic())
+          if (timestampType == TimestampType.CREATE_TIME)
+            assertEquals(baseTimestamp + timestampDiff, metadata.timestamp())
+          else
+            assertTrue(metadata.timestamp() >= startTime && metadata.timestamp() <= System.currentTimeMillis())
+          assertEquals(partition, metadata.partition())
+          offset += 1
+          timestampDiff += 1
+        } else {
+          fail("Send callback returns the following exception", exception)
+        }
+      }
+    }
+
+    try {
+      // create topic
+      val topicProps = new Properties()
+      if (timestampType == TimestampType.LOG_APPEND_TIME)
+        topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
+      else
+        topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime")
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
+
+      for (i <- 1 to numRecords) {
+        val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, baseTimestamp + i, "key".getBytes, "value".getBytes)
+        producer.send(record, callback)
+      }
+      producer.close(5000L, TimeUnit.MILLISECONDS)
+      assertEquals(s"Should have offset $numRecords but only successfully sent ${callback.offset}", numRecords, callback.offset)
+    } finally {
+      producer.close()
+    }
+  }
+
+  @Test
   def testWrongSerializer() {
     // send a record with a wrong type should receive a serialization exception
     try {
@@ -155,7 +233,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     }
   }
 
-  private def createProducerWithWrongSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+  private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
@@ -176,7 +254,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       // non-blocking send a list of records
-      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
       for (i <- 1 to numRecords)
         producer.send(record0)
       val response0 = producer.send(record0)
@@ -212,9 +290,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       val leader1 = leaders(partition)
       assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined)
 
+      val now = System.currentTimeMillis()
       val responses =
         for (i <- 1 to numRecords)
-          yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes))
+        yield producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, now, null, ("value" + i).getBytes))
       val futures = responses.toList
       futures.foreach(_.get)
       for (future <- futures)
@@ -228,7 +307,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       }
 
       // make sure the fetched messages also respect the partitioning and ordering
-      val fetchResponse1 = if(leader1.get == configs(0).brokerId) {
+      val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
         consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
       } else {
         consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
@@ -238,7 +317,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
       // TODO: also check topic and partition after they are added in the return messageSet
       for (i <- 0 to numRecords - 1) {
-        assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
+        assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes, now, Message.MagicValue_V1), messageSet1(i).message)
         assertEquals(i.toLong, messageSet1(i).offset)
       }
     } finally {
@@ -257,7 +336,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // Send a message to auto-create the topic
-      val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
       assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
 
       // double check that the topic is created with leader elected
@@ -277,7 +356,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     try {
       TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
-      for(i <- 0 until 50) {
+      for (i <- 0 until 50) {
         val responses = (0 until numRecords) map (i => producer.send(record))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         producer.flush()
@@ -302,7 +381,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
 
     // Test closing from caller thread.
-    for(i <- 0 until 50) {
+    for (i <- 0 until 50) {
       val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
       val responses = (0 until numRecords) map (i => producer.send(record0))
       assertTrue("No request is complete.", responses.forall(!_.isDone()))
@@ -349,23 +428,58 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
       }
     }
-    for(i <- 0 until 50) {
+    for (i <- 0 until 50) {
       val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
-      // send message to partition 0
-      val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
-      assertTrue("No request is complete.", responses.forall(!_.isDone()))
-      // flush the messages.
-      producer.flush()
-      assertTrue("All request are complete.", responses.forall(_.isDone()))
-      // Check the messages received by broker.
-      val fetchResponse = if (leader.get == configs(0).brokerId) {
-        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-      } else {
-        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+      try {
+        // send message to partition 0
+        val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
+        assertTrue("No request is complete.", responses.forall(!_.isDone()))
+        // flush the messages.
+        producer.flush()
+        assertTrue("All request are complete.", responses.forall(_.isDone()))
+        // Check the messages received by broker.
+        val fetchResponse = if (leader.get == configs(0).brokerId) {
+          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+        } else {
+          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+        }
+        val expectedNumRecords = (i + 1) * numRecords
+        assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
+          expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
+      } finally {
+        producer.close()
       }
-      val expectedNumRecords = (i + 1) * numRecords
-      assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
-        expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
     }
   }
+
+  @Test
+  def testSendWithInvalidCreateTime() {
+    val topicProps = new Properties()
+    topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000");
+    TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
+
+    val producer = createProducer(brokerList = brokerList)
+    try {
+      producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
+      fail("Should throw CorruptedRecordException")
+    } catch {
+      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+    } finally {
+      producer.close()
+    }
+
+    // Test compressed messages.
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+    val compressedProducer = createProducer(brokerList = brokerList, props = Some(producerProps))
+    try {
+      compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
+      fail("Should throw CorruptedRecordException")
+    } catch {
+      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+    } finally {
+      compressedProducer.close()
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b2f96e5..3d7cad3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -18,20 +18,23 @@ import java.util.Properties
 
 import java.util.regex.Pattern
 
+import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer, ByteArraySerializer}
+import org.apache.kafka.test.{MockProducerInterceptor, MockConsumerInterceptor}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer, ByteArrayDeserializer, ByteArraySerializer}
 import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException}
-import org.apache.kafka.test.{MockProducerInterceptor, MockConsumerInterceptor}
+import org.apache.kafka.common.record.{CompressionType, TimestampType}
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.junit.Assert._
 import org.junit.Test
+
+import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
-import scala.collection.JavaConverters
-import JavaConverters._
 
 /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
@@ -96,14 +99,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testAutoOffsetReset() {
     sendRecords(1)
     this.consumers(0).assign(List(tp).asJava)
-    consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset = 0)
   }
 
   @Test
   def testGroupConsumption() {
     sendRecords(10)
     this.consumers(0).subscribe(List(topic).asJava)
-    consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset = 0)
   }
 
   @Test
@@ -263,7 +266,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertFalse(partitions.isEmpty)
   }
 
-  @Test(expected=classOf[InvalidTopicException])
+  @Test(expected = classOf[InvalidTopicException])
   def testPartitionsForInvalidTopic() {
     this.consumers(0).partitionsFor(";3# ads,{234")
   }
@@ -288,7 +291,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer.seek(tp, mid)
     assertEquals(mid, consumer.position(tp))
-    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt)
+
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt,
+      startingTimestamp = mid.toLong)
 
     // Test seek compressed message
     sendCompressedMessages(totalRecords.toInt, tp2)
@@ -305,7 +310,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.seek(tp2, mid)
     assertEquals(mid, consumer.position(tp2))
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt,
-      tp = tp2)
+      startingTimestamp = mid.toLong, tp = tp2)
   }
 
   private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) {
@@ -314,17 +319,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString)
     val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
         retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
-    sendRecords(producer, numRecords, tp)
+    (0 until numRecords).foreach { i =>
+      producer.send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes))
+    }
     producer.close()
   }
 
+  @Test
   def testPositionAndCommit() {
     sendRecords(5)
 
-    // committed() on a partition with no committed offset throws an exception
-    intercept[NoOffsetForPartitionException] {
-      this.consumers(0).committed(new TopicPartition(topic, 15))
-    }
+    assertNull(this.consumers(0).committed(new TopicPartition(topic, 15)))
 
     // position() on a partition that we aren't subscribed to throws an exception
     intercept[IllegalArgumentException] {
@@ -337,7 +342,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     this.consumers(0).commitSync()
     assertEquals(0L, this.consumers(0).committed(tp).offset)
 
-    consumeAndVerifyRecords(this.consumers(0), 5, 0)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
     assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
     this.consumers(0).commitSync()
     assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
@@ -346,19 +351,19 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // another consumer in the same group should get the same position
     this.consumers(1).assign(List(tp).asJava)
-    consumeAndVerifyRecords(this.consumers(1), 1, 5)
+    consumeAndVerifyRecords(consumer = this.consumers(1), numRecords = 1, startingOffset = 5)
   }
 
   @Test
   def testPartitionPauseAndResume() {
     sendRecords(5)
     this.consumers(0).assign(List(tp).asJava)
-    consumeAndVerifyRecords(this.consumers(0), 5, 0)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
     this.consumers(0).pause(tp)
     sendRecords(5)
     assertTrue(this.consumers(0).poll(0).isEmpty)
     this.consumers(0).resume(tp)
-    consumeAndVerifyRecords(this.consumers(0), 5, 5)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 5)
   }
 
   @Test
@@ -397,7 +402,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
 
     // produce a record that is larger than the configured fetch size
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
     this.producers(0).send(record)
 
     // consuming a too-large record should fail
@@ -534,7 +539,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
     producerProps.put("mock.interceptor.append", appendStr)
-    val testProducer = new KafkaProducer[String,String](producerProps, new StringSerializer, new StringSerializer)
+    val testProducer = new KafkaProducer[String, String](producerProps, new StringSerializer, new StringSerializer)
 
     // produce records
     val numRecords = 10
@@ -567,16 +572,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // commit sync and verify onCommit is called
     val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
-    testConsumer.commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava)
+    testConsumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava)
     assertEquals(2, testConsumer.committed(tp).offset)
-    assertEquals(commitCountBefore+1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+    assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
 
     // commit async and verify onCommit is called
     val commitCallback = new CountConsumerCommitCallback()
-    testConsumer.commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(5L))).asJava, commitCallback)
+    testConsumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(5L))).asJava, commitCallback)
     awaitCommitCallback(testConsumer, commitCallback)
     assertEquals(5, testConsumer.committed(tp).offset)
-    assertEquals(commitCountBefore+2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+    assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
 
     testConsumer.close()
     testProducer.close()
@@ -593,7 +598,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // produce records
     val numRecords = 100
-    val testProducer = new KafkaProducer[String,String](this.producerConfig, new StringSerializer, new StringSerializer)
+    val testProducer = new KafkaProducer[String, String](this.producerConfig, new StringSerializer, new StringSerializer)
     (0 until numRecords).map { i =>
       testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i", s"value $i"))
     }.foreach(_.get)
@@ -617,8 +622,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // change subscription to trigger rebalance
     val commitCountBeforeRebalance = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
     changeConsumerSubscriptionAndValidateAssignment(testConsumer,
-                                                    List(topic, topic2), Set(tp, tp2, new TopicPartition(topic2, 0),
-                                                    new TopicPartition(topic2, 1)),
+                                                    List(topic, topic2),
+                                                    Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)),
                                                     rebalanceListener)
 
     // after rebalancing, we should have reset to the committed positions
@@ -644,14 +649,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
     producerProps.put("mock.interceptor.append", appendStr)
-    val testProducer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer())
+    val testProducer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer())
 
     // producing records should succeed
     testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes))
 
     // create consumer with interceptor that has different key and value types from the consumer
     this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
-    val testConsumer = new KafkaConsumer[Array[Byte],Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    val testConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
     testConsumer.assign(List(tp).asJava)
     testConsumer.seek(tp, 0)
 
@@ -664,6 +669,46 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     testProducer.close()
   }
 
+  def testConsumeMessagesWithCreateTime() {
+    val numRecords = 50
+    // Test non-compressed messages
+    sendRecords(numRecords, tp)
+    this.consumers(0).assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, startingOffset = 0, startingKeyAndValueIndex = 0,
+      startingTimestamp = 0)
+
+    // Test compressed messages
+    sendCompressedMessages(numRecords, tp2)
+    this.consumers(0).assign(List(tp2).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+      startingTimestamp = 0)
+  }
+
+  @Test
+  def testConsumeMessagesWithLogAppendTime() {
+    val topicName = "testConsumeMessagesWithLogAppendTime"
+    val topicProps = new Properties()
+    topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
+    TestUtils.createTopic(zkUtils, topicName, 2, 2, servers, topicProps)
+
+    val startTime = System.currentTimeMillis()
+    val numRecords = 50
+
+    // Test non-compressed messages
+    val tp1 = new TopicPartition(topicName, 0)
+    sendRecords(numRecords, tp1)
+    this.consumers(0).assign(List(tp1).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp1, startingOffset = 0, startingKeyAndValueIndex = 0,
+      startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
+
+    // Test compressed messages
+    val tp2 = new TopicPartition(topicName, 1)
+    sendCompressedMessages(numRecords, tp2)
+    this.consumers(0).assign(List(tp2).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+      startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
+  }
+
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic
@@ -693,7 +738,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong
     validateGroupAssignment(consumerPollers, subscriptions,
-      s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3*maxSessionTimeout)
+      s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3 * maxSessionTimeout)
 
     // done with pollers and consumers
     for (poller <- consumerPollers)
@@ -810,7 +855,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // wait until topics get re-assigned and validate assignment
     validateGroupAssignment(consumerPollers, subscriptions,
-        s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added ${numOfConsumersToAdd} consumer(s)")
+      s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added ${numOfConsumersToAdd} consumer(s)")
   }
 
   /**
@@ -844,7 +889,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}", 1000L)
 
     validateGroupAssignment(consumerPollers, subscriptions,
-        s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription")
+      s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription")
   }
 
   def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V],

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 0d401f7..c4a2bd7 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -87,8 +87,9 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
         yield ("value" + i).getBytes
 
       // make sure the returned messages are correct
+      val now = System.currentTimeMillis()
       val responses = for (message <- messages)
-        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, null, message))
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, now, null, message))
       val futures = responses.toList
       for ((future, offset) <- futures zip (0 until numRecords)) {
         assertEquals(offset.toLong, future.get.offset)
@@ -101,7 +102,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
 
       var index = 0
       for (message <- messages) {
-        assertEquals(new Message(bytes = message), messageSet(index).message)
+        assertEquals(new Message(bytes = message, now, Message.MagicValue_V1), messageSet(index).message)
         assertEquals(index.toLong, messageSet(index).offset)
         index += 1
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index e4b8854..fafc4b0 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -287,4 +287,5 @@ class RequestResponseSerializationTest extends JUnitSuite {
     // new response should have 4 bytes more than the old response since delayTime is an INT32
     assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 7e6e765..587abd5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -17,12 +17,14 @@
 
 package kafka.coordinator
 
+import org.apache.kafka.common.record.Record
 import org.junit.Assert._
 
 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
-import kafka.message.MessageSet
+import kafka.message.{Message, MessageSet}
 import kafka.server.{ReplicaManager, KafkaConfig}
 import kafka.utils._
+import org.apache.kafka.common.utils.SystemTime
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
@@ -87,7 +89,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
-    groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager)
+    groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime)
     groupCoordinator.startup()
 
     // add the partition into the owned partition list
@@ -833,9 +835,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
-          new PartitionResponse(Errors.NONE.code, 0L)
+          new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
+    EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
@@ -909,9 +912,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
-          new PartitionResponse(Errors.NONE.code, 0L)
+          new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
+    EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
@@ -922,6 +926,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
     EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
+    EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index a8092de..69218ba 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -261,7 +261,8 @@ class CleanerTest extends JUnitSuite {
       log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
     
     // forward offset and append message to next segment at offset Int.MaxValue
-    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(Int.MaxValue-1), new Message("hello".getBytes, "hello".getBytes))
+    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(Int.MaxValue-1),
+      new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, Message.MagicValue_V1))
     log.append(messageSet, assignOffsets = false)
     log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
     assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
@@ -448,13 +449,19 @@ class CleanerTest extends JUnitSuite {
   def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
   
   def message(key: Int, value: Int) = 
-    new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
+                                         bytes = value.toString.getBytes,
+                                         timestamp = Message.NoTimestamp,
+                                         magicValue = Message.MagicValue_V1))
 
   def unkeyedMessage(value: Int) =
     new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes))
 
   def deleteMessage(key: Int) =
-    new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=null))
+    new ByteBufferMessageSet(new Message(key=key.toString.getBytes,
+                                         bytes=null,
+                                         timestamp = Message.NoTimestamp,
+                                         magicValue = Message.MagicValue_V1))
   
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 95085f4..0179166 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -200,4 +200,72 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
     assertEquals(oldposition, tempReopen.length)
   }
 
+  @Test
+  def testMessageFormatConversion() {
+
+    // Prepare messages.
+    val offsets = Seq(0L, 2L)
+    val messagesV0 = Seq(new Message("hello".getBytes, "k1".getBytes, Message.NoTimestamp, Message.MagicValue_V0),
+      new Message("goodbye".getBytes, "k2".getBytes, Message.NoTimestamp, Message.MagicValue_V0))
+    val messageSetV0 = new ByteBufferMessageSet(
+      compressionCodec = NoCompressionCodec,
+      offsetSeq = offsets,
+      messages = messagesV0:_*)
+    val compressedMessageSetV0 = new ByteBufferMessageSet(
+      compressionCodec = DefaultCompressionCodec,
+      offsetSeq = offsets,
+      messages = messagesV0:_*)
+
+    val messagesV1 = Seq(new Message("hello".getBytes, "k1".getBytes, 1L, Message.MagicValue_V1),
+                         new Message("goodbye".getBytes, "k2".getBytes, 2L, Message.MagicValue_V1))
+    val messageSetV1 = new ByteBufferMessageSet(
+      compressionCodec = NoCompressionCodec,
+      offsetSeq = offsets,
+      messages = messagesV1:_*)
+    val compressedMessageSetV1 = new ByteBufferMessageSet(
+      compressionCodec = DefaultCompressionCodec,
+      offsetSeq = offsets,
+      messages = messagesV1:_*)
+
+    // Down conversion
+    // down conversion for non-compressed messages
+    var fileMessageSet = new FileMessageSet(tempFile())
+    fileMessageSet.append(messageSetV1)
+    fileMessageSet.flush()
+    var convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V0)
+    verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V0)
+
+    // down conversion for compressed messages
+    fileMessageSet = new FileMessageSet(tempFile())
+    fileMessageSet.append(compressedMessageSetV1)
+    fileMessageSet.flush()
+    convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V0)
+    verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V0)
+
+    // Up conversion. In reality we only do down conversion, but up conversion should work as well.
+    // up conversion for non-compressed messages
+    fileMessageSet = new FileMessageSet(tempFile())
+    fileMessageSet.append(messageSetV0)
+    fileMessageSet.flush()
+    convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1)
+    verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1)
+
+    // up conversion for compressed messages
+    fileMessageSet = new FileMessageSet(tempFile())
+    fileMessageSet.append(compressedMessageSetV0)
+    fileMessageSet.flush()
+    convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1)
+    verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1)
+
+    def verifyConvertedMessageSet(convertedMessageSet: MessageSet, magicByte: Byte) {
+      var i = 0
+      for (messageAndOffset <- convertedMessageSet) {
+        assertEquals("magic byte should be 1", magicByte, messageAndOffset.message.magic)
+        assertEquals("offset should not change", offsets(i), messageAndOffset.offset)
+        assertEquals("key should not change", messagesV0(i).key, messageAndOffset.message.key)
+        assertEquals("payload should not change", messagesV0(i).payload, messageAndOffset.message.payload)
+        i += 1
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index de3d7a3..6b91611 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -99,7 +99,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       if (entry.message.compressionCodec == NoCompressionCodec)
         Stream.cons(entry, Stream.empty).iterator
       else
-        ByteBufferMessageSet.deepIterator(entry.message)
+        ByteBufferMessageSet.deepIterator(entry)
     }) yield {
       val key = TestUtils.readString(messageAndOffset.message.key).toInt
       val value = TestUtils.readString(messageAndOffset.message.payload).toInt

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 51cd62c..1be9e65 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -61,6 +61,7 @@ class LogConfigTest {
         case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
         case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
         case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
+        case LogConfig.MessageFormatVersionProp => assertPropertyInvalid(name, "")
         case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
       }
     })
@@ -70,7 +71,7 @@ class LogConfigTest {
     values.foreach((value) => {
       val props = new Properties
       props.setProperty(name, value.toString)
-      intercept[ConfigException] {
+      intercept[Exception] {
         LogConfig(props)
       }
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 46bfbed..91a4449 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,6 +20,7 @@ package kafka.log
 import java.io._
 import java.util.Properties
 
+import kafka.api.ApiVersion
 import kafka.common._
 import kafka.server.OffsetCheckpoint
 import kafka.utils._

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 47908e7..426b5e8 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -21,6 +21,7 @@ import java.io._
 import java.util.Properties
 import java.util.concurrent.atomic._
 import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+import kafka.api.ApiVersion
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
@@ -132,6 +133,8 @@ class LogTest extends JUnitSuite {
 
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    // We use need to use magic value 1 here because the test is message size sensitive.
+    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
     // create a log
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
@@ -160,6 +163,8 @@ class LogTest extends JUnitSuite {
   def testAppendAndReadWithSequentialOffsets() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+    // We use need to use magic value 1 here because the test is message size sensitive.
+    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
 
@@ -264,7 +269,8 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages) {
       val messages = log.read(offset, 1024*1024).messageSet
       assertEquals("Offsets not equal", offset, messages.head.offset)
-      assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, messages.head.message)
+      assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message,
+        messages.head.message.toFormatVersion(messageSets(i).head.message.magic))
       offset = messages.head.offset + 1
     }
     val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet
@@ -290,7 +296,7 @@ class LogTest extends JUnitSuite {
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
 
-    def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head.message)
+    def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head)
 
     /* we should always get the first message in the compressed set when reading any offset in the set */
     assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
@@ -343,6 +349,8 @@ class LogTest extends JUnitSuite {
     val configSegmentSize = messageSet.sizeInBytes - 1
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
+    // We use need to use magic value 1 here because the test is message size sensitive.
+    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString())
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
 
     try {
@@ -355,8 +363,8 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testCompactedTopicConstraints() {
-    val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes)
-    val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes)
+    val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes, Message.NoTimestamp, Message.CurrentMagicValue)
+    val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes, Message.NoTimestamp, Message.CurrentMagicValue)
     val unkeyedMessage = new Message(bytes = "this message does not have a key".getBytes)
 
     val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage)
@@ -404,7 +412,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testMessageSizeCheck() {
     val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
-    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes))
+    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change (I need more bytes)".getBytes))
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1