You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/19 16:56:50 UTC

[3/5] kafka git commit: KAFKA-3025; Added timetamp to Message and use relative offset.

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index d4ce498..fe31ad4 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -31,6 +31,8 @@ import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.network.TransportLayer
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.mutable.ArrayBuffer
+
 /**
  * An on-disk message set. An optional start and end position can be applied to the message set
  * which will allow slicing a subset of the file.
@@ -139,7 +141,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
       if(offset >= targetOffset)
         return OffsetPosition(offset, position)
       val messageSize = buffer.getInt()
-      if(messageSize < Message.MessageOverhead)
+      if(messageSize < Message.MinMessageOverhead)
         throw new IllegalStateException("Invalid message size: " + messageSize)
       position += MessageSet.LogOverhead + messageSize
     }
@@ -172,6 +174,63 @@ class FileMessageSet private[kafka](@volatile var file: File,
   }
 
   /**
+    * This method is called before we write messages to socket use zero-copy transfer. We need to
+    * make sure all the messages in the message set has expected magic value
+    * @param expectedMagicValue the magic value expected
+    * @return true if all messages has expected magic value, false otherwise
+    */
+  override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
+    var location = start
+    val offsetAndSizeBuffer = ByteBuffer.allocate(MessageSet.LogOverhead)
+    val crcAndMagicByteBuffer = ByteBuffer.allocate(Message.CrcLength + Message.MagicLength)
+    while(location < end) {
+      offsetAndSizeBuffer.rewind()
+      channel.read(offsetAndSizeBuffer, location)
+      if (offsetAndSizeBuffer.hasRemaining)
+        return true
+      offsetAndSizeBuffer.rewind()
+      offsetAndSizeBuffer.getLong // skip offset field
+      val messageSize = offsetAndSizeBuffer.getInt
+      if(messageSize < Message.MinMessageOverhead)
+        throw new IllegalStateException("Invalid message size: " + messageSize)
+      crcAndMagicByteBuffer.rewind()
+      channel.read(crcAndMagicByteBuffer, location + MessageSet.LogOverhead)
+      if (crcAndMagicByteBuffer.get(Message.MagicOffset) != expectedMagicValue)
+        return false
+      location += (MessageSet.LogOverhead + messageSize)
+    }
+    true
+  }
+
+  /**
+   * Convert this message set to use specified message format.
+   */
+  def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = {
+    val offsets = new ArrayBuffer[Long]
+    val newMessages = new ArrayBuffer[Message]
+    this.iterator().foreach(messageAndOffset => {
+      val message = messageAndOffset.message
+      if (message.compressionCodec == NoCompressionCodec) {
+        newMessages += messageAndOffset.message.toFormatVersion(toMagicValue)
+        offsets += messageAndOffset.offset
+      } else {
+        // File message set only has shallow iterator. We need to do deep iteration here if needed.
+        val deepIter = ByteBufferMessageSet.deepIterator(messageAndOffset)
+        for (innerMessageAndOffset <- deepIter) {
+          newMessages += innerMessageAndOffset.message.toFormatVersion(toMagicValue)
+          offsets += innerMessageAndOffset.offset
+        }
+      }
+    })
+
+    // We use the offset seq to assign offsets so the offset of the messages does not change.
+    new ByteBufferMessageSet(
+      compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
+      offsetSeq = offsets.toSeq,
+      newMessages: _*)
+  }
+
+  /**
    * Get a shallow iterator over the messages in the set.
    */
   override def iterator() = iterator(Int.MaxValue)
@@ -200,7 +259,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
         sizeOffsetBuffer.rewind()
         val offset = sizeOffsetBuffer.getLong()
         val size = sizeOffsetBuffer.getInt()
-        if(size < Message.MinHeaderSize)
+        if(size < Message.MinMessageOverhead)
           return allDone()
         if(size > maxMessageSize)
           throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 32c194d..f8c0b77 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -28,26 +28,35 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
 import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+import org.apache.kafka.common.record.TimestampType
 
 import scala.collection.JavaConversions
 
 import com.yammer.metrics.core.Gauge
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
 }
 
 /**
  * Struct to hold various quantities we compute about each message set before appending to the log
  * @param firstOffset The first offset in the message set
  * @param lastOffset The last offset in the message set
- * @param shallowCount The number of shallow messages
- * @param validBytes The number of valid bytes
+ * @param timestamp The log append time (if used) of the message set, otherwise Message.NoTimestamp
  * @param sourceCodec The source codec used in the message set (send by the producer)
  * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
+ * @param shallowCount The number of shallow messages
+ * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
  */
-case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
+case class LogAppendInfo(var firstOffset: Long,
+                         var lastOffset: Long,
+                         var timestamp: Long,
+                         sourceCodec: CompressionCodec,
+                         targetCodec: CompressionCodec,
+                         shallowCount: Int,
+                         validBytes: Int,
+                         offsetsMonotonic: Boolean)
 
 
 /**
@@ -325,13 +334,23 @@ class Log(val dir: File,
         if (assignOffsets) {
           // assign offsets to the message set
           val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
+          val now = SystemTime.milliseconds
           try {
-            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config
-              .compact)
+            validMessages = validMessages.validateMessagesAndAssignOffsets(offset,
+                                                                           now,
+                                                                           appendInfo.sourceCodec,
+                                                                           appendInfo.targetCodec,
+                                                                           config.compact,
+                                                                           config.messageFormatVersion,
+                                                                           config.messageTimestampType,
+                                                                           config.messageTimestampDifferenceMaxMs)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
           appendInfo.lastOffset = offset.get - 1
+          // If log append time is used, we put the timestamp assigned to the messages in the append info.
+          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
+            appendInfo.timestamp = now
         } else {
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
@@ -436,7 +455,7 @@ class Log(val dir: File,
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
 
-    LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+    LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d5c247c..a3aff15 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -370,7 +370,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.lastModified > deleteHorizonMs
         info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
             .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes)
+        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion)
       }
 
       // trim excess index
@@ -401,10 +401,14 @@ private[log] class Cleaner(val id: Int,
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
    * @param retainDeletes Should delete tombstones be retained while cleaning this segment
-   *
+   * @param messageFormatVersion the message format version to use after compaction
    */
-  private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
-                             dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
+  private[log] def cleanInto(topicAndPartition: TopicAndPartition,
+                             source: LogSegment,
+                             dest: LogSegment,
+                             map: OffsetMap,
+                             retainDeletes: Boolean,
+                             messageFormatVersion: Byte) {
     var position = 0
     while (position < source.log.sizeInBytes) {
       checkDone(topicAndPartition)
@@ -420,19 +424,34 @@ private[log] class Cleaner(val id: Int,
         stats.readMessage(size)
         if (entry.message.compressionCodec == NoCompressionCodec) {
           if (shouldRetainMessage(source, map, retainDeletes, entry)) {
-            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+            val convertedMessage = entry.message.toFormatVersion(messageFormatVersion)
+            ByteBufferMessageSet.writeMessage(writeBuffer, convertedMessage, entry.offset)
             stats.recopyMessage(size)
           }
           messagesRead += 1
         } else {
-          val messages = ByteBufferMessageSet.deepIterator(entry.message)
+          // We use absolute offset to decide whether retain the message or not. This is handled by
+          // deep iterator.
+          val messages = ByteBufferMessageSet.deepIterator(entry)
+          var numberOfInnerMessages = 0
+          var formatConversionNeeded = false
           val retainedMessages = messages.filter(messageAndOffset => {
             messagesRead += 1
+            numberOfInnerMessages += 1
+            if (messageAndOffset.message.magic != messageFormatVersion)
+              formatConversionNeeded = true
             shouldRetainMessage(source, map, retainDeletes, messageAndOffset)
           }).toSeq
 
-          if (retainedMessages.nonEmpty)
-            compressMessages(writeBuffer, entry.message.compressionCodec, retainedMessages)
+          // There is no messages compacted out and no message format conversion, write the original message set back
+          if (retainedMessages.size == numberOfInnerMessages && !formatConversionNeeded)
+            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+          else if (retainedMessages.nonEmpty) {
+            val convertedRetainedMessages = retainedMessages.map(messageAndOffset => {
+              new MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion), messageAndOffset.offset)
+            })
+            compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion, convertedRetainedMessages)
+          }
         }
       }
 
@@ -452,7 +471,10 @@ private[log] class Cleaner(val id: Int,
     restoreBuffers()
   }
 
-  private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messages: Seq[MessageAndOffset]) {
+  private def compressMessages(buffer: ByteBuffer,
+                               compressionCodec: CompressionCodec,
+                               messageFormatVersion: Byte,
+                               messages: Seq[MessageAndOffset]) {
     val messagesIterable = messages.toIterable.map(_.message)
     if (messages.isEmpty) {
       MessageSet.Empty.sizeInBytes
@@ -461,15 +483,24 @@ private[log] class Cleaner(val id: Int,
         ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset)
       MessageSet.messageSetSize(messagesIterable)
     } else {
+      val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages.map(_.message))
+      val firstAbsoluteOffset = messages.head.offset
       var offset = -1L
+      val timestampType = messages.head.message.timestampType
       val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 2, 1024), 1 << 16))
-      messageWriter.write(codec = compressionCodec) { outputStream =>
+      messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = messageFormatVersion) { outputStream =>
         val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
         try {
           for (messageOffset <- messages) {
             val message = messageOffset.message
             offset = messageOffset.offset
-            output.writeLong(offset)
+            // Use inner offset when magic value is greater than 0
+            if (messageFormatVersion > Message.MagicValue_V0) {
+              // The offset of the messages are absolute offset, compute the inner offset.
+              val innerOffset = messageOffset.offset - firstAbsoluteOffset
+              output.writeLong(innerOffset)
+            } else
+              output.writeLong(offset)
             output.writeInt(message.size)
             output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 7fc7e33..a8fffbd 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -18,12 +18,13 @@
 package kafka.log
 
 import java.util.Properties
+
+import kafka.api.ApiVersion
+import kafka.message.{BrokerCompressionCodec, Message}
 import kafka.server.KafkaConfig
-import org.apache.kafka.common.utils.Utils
-import scala.collection._
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
-import kafka.message.BrokerCompressionCodec
-import kafka.message.Message
+import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.utils.Utils
 
 object Defaults {
   val SegmentSize = kafka.server.Defaults.LogSegmentBytes
@@ -44,6 +45,9 @@ object Defaults {
   val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
   val CompressionType = kafka.server.Defaults.CompressionType
   val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
+  val MessageFormatVersion = kafka.server.Defaults.MessageFormatVersion
+  val MessageTimestampType = kafka.server.Defaults.MessageTimestampType
+  val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.MessageTimestampDifferenceMaxMs
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {
@@ -69,6 +73,9 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
   val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
   val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
   val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
+  val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp)).messageFormatVersion
+  val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
+  val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp)
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -101,6 +108,9 @@ object LogConfig {
   val MinInSyncReplicasProp = "min.insync.replicas"
   val CompressionTypeProp = "compression.type"
   val PreAllocateEnableProp = "preallocate"
+  val MessageFormatVersionProp = KafkaConfig.MessageFormatVersionProp
+  val MessageTimestampTypeProp = KafkaConfig.MessageTimestampTypeProp
+  val MessageTimestampDifferenceMaxMsProp = KafkaConfig.MessageTimestampDifferenceMaxMsProp
 
   val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
   val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
@@ -125,16 +135,18 @@ object LogConfig {
     "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
     "no compression; and 'producer' which means retain the original compression codec set by the producer."
   val PreAllocateEnableDoc ="Should pre allocate file when create new segment?"
+  val MessageFormatVersionDoc = KafkaConfig.MessageFormatVersionDoc
+  val MessageTimestampTypeDoc = KafkaConfig.MessageTimestampTypeDoc
+  val MessageTimestampDifferenceMaxMsDoc = KafkaConfig.MessageTimestampDifferenceMaxMsDoc
 
   private val configDef = {
-    import ConfigDef.Range._
-    import ConfigDef.ValidString._
-    import ConfigDef.Type._
-    import ConfigDef.Importance._
-    import java.util.Arrays.asList
+    import org.apache.kafka.common.config.ConfigDef.Importance._
+    import org.apache.kafka.common.config.ConfigDef.Range._
+    import org.apache.kafka.common.config.ConfigDef.Type._
+    import org.apache.kafka.common.config.ConfigDef.ValidString._
 
     new ConfigDef()
-      .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinHeaderSize), MEDIUM, SegmentSizeDoc)
+      .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinMessageOverhead), MEDIUM, SegmentSizeDoc)
       .define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM, SegmentMsDoc)
       .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc)
       .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc)
@@ -158,12 +170,15 @@ object LogConfig {
       .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
       .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable,
         MEDIUM, PreAllocateEnableDoc)
+      .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc)
+      .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, MEDIUM, MessageTimestampTypeDoc)
+      .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc)
   }
 
   def apply(): LogConfig = LogConfig(new Properties())
 
   def configNames() = {
-    import JavaConversions._
+    import scala.collection.JavaConversions._
     configDef.names().toList.sorted
   }
 
@@ -182,7 +197,7 @@ object LogConfig {
    * Check that property names are valid
    */
   def validateNames(props: Properties) {
-    import JavaConversions._
+    import scala.collection.JavaConversions._
     val names = configDef.names()
     for(name <- props.keys)
       require(names.contains(name), "Unknown configuration \"%s\".".format(name))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index aa37d52..9fc68a4 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -184,7 +184,7 @@ class LogSegment(val log: FileMessageSet,
               case NoCompressionCodec =>
                 entry.offset
               case _ =>
-                ByteBufferMessageSet.deepIterator(entry.message).next().offset
+                ByteBufferMessageSet.deepIterator(entry).next().offset
           }
           index.append(startOffset, validBytes)
           lastIndexEntry = validBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 5a32de8..2867c78 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -23,28 +23,49 @@ import kafka.common.KafkaException
 import java.nio.ByteBuffer
 import java.nio.channels._
 import java.io._
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import org.apache.kafka.common.errors.InvalidTimestampException
+import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 
 object ByteBufferMessageSet {
 
-  private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = {
+  private def create(offsetAssignor: OffsetAssigner,
+                     compressionCodec: CompressionCodec,
+                     wrapperMessageTimestamp: Option[Long],
+                     timestampType: TimestampType,
+                     messages: Message*): ByteBuffer = {
     if(messages.size == 0) {
       MessageSet.Empty.buffer
     } else if(compressionCodec == NoCompressionCodec) {
       val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
       for(message <- messages)
-        writeMessage(buffer, message, offsetCounter.getAndIncrement)
+        writeMessage(buffer, message, offsetAssignor.nextAbsoluteOffset)
       buffer.rewind()
       buffer
     } else {
+      val magicAndTimestamp = wrapperMessageTimestamp match {
+        case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
+        case None => MessageSet.magicAndLargestTimestamp(messages)
+      }
       var offset = -1L
       val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
-      messageWriter.write(codec = compressionCodec) { outputStream =>
+      messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
         val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
         try {
           for (message <- messages) {
-            offset = offsetCounter.getAndIncrement
-            output.writeLong(offset)
+            offset = offsetAssignor.nextAbsoluteOffset
+            if (message.magic != magicAndTimestamp.magic)
+              throw new IllegalArgumentException("Messages in the message set must have same magic value")
+            // Use inner offset if magic value is greater than 0
+            if (magicAndTimestamp.magic > Message.MagicValue_V0)
+              output.writeLong(offsetAssignor.toInnerOffset(offset))
+            else
+              output.writeLong(offset)
             output.writeInt(message.size)
             output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
           }
@@ -59,40 +80,93 @@ object ByteBufferMessageSet {
     }
   }
 
-  /** Deep iterator that decompresses the message sets in-place. */
-  def deepIterator(wrapperMessage: Message): Iterator[MessageAndOffset] = {
+  /** Deep iterator that decompresses the message sets and adjusts timestamp and offset if needed. */
+  def deepIterator(wrapperMessageAndOffset: MessageAndOffset): Iterator[MessageAndOffset] = {
+
+    import Message._
+
     new IteratorTemplate[MessageAndOffset] {
 
+      val wrapperMessageOffset = wrapperMessageAndOffset.offset
+      val wrapperMessage = wrapperMessageAndOffset.message
+      val wrapperMessageTimestampOpt: Option[Long] =
+        if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None
+      val wrapperMessageTimestampTypeOpt: Option[TimestampType] =
+        if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None
+      if (wrapperMessage.payload == null)
+        throw new KafkaException("Message payload is null: " + wrapperMessage)
       val inputStream: InputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
       val compressed: DataInputStream = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream))
+      var lastInnerOffset = -1L
 
-      override def makeNext(): MessageAndOffset = {
+      val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
+        var innerMessageAndOffsets = new mutable.Queue[MessageAndOffset]()
         try {
-          // read the offset
-          val offset = compressed.readLong()
-          // read record size
-          val size = compressed.readInt()
-
-          if (size < Message.MinHeaderSize)
-            throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")
-
-          // read the record into an intermediate record buffer
-          // and hence has to do extra copy
-          val bufferArray = new Array[Byte](size)
-          compressed.readFully(bufferArray, 0, size)
-          val buffer = ByteBuffer.wrap(bufferArray)
-
-          val newMessage = new Message(buffer)
-
-          // the decompressed message should not be a wrapper message since we do not allow nested compression
-          new MessageAndOffset(newMessage, offset)
+          while (true) {
+            innerMessageAndOffsets += readMessageFromStream()
+          }
         } catch {
           case eofe: EOFException =>
             compressed.close()
-            allDone()
           case ioe: IOException =>
             throw new KafkaException(ioe)
         }
+        Some(innerMessageAndOffsets)
+      } else {
+        None
+      }
+
+      private def readMessageFromStream() = {
+        // read the offset
+        val innerOffset = compressed.readLong()
+        // read record size
+        val size = compressed.readInt()
+
+        if (size < MinMessageOverhead)
+          throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")
+
+        // read the record into an intermediate record buffer
+        // and hence has to do extra copy
+        val bufferArray = new Array[Byte](size)
+        compressed.readFully(bufferArray, 0, size)
+        val buffer = ByteBuffer.wrap(bufferArray)
+
+        // Override the timestamp if necessary
+        val newMessage = new Message(buffer, wrapperMessageTimestampOpt, wrapperMessageTimestampTypeOpt)
+
+        // Inner message and wrapper message must have same magic value
+        if (newMessage.magic != wrapperMessage.magic)
+          throw new IllegalStateException(s"Compressed message has magic value ${wrapperMessage.magic} " +
+            s"but inner message has magic value ${newMessage.magic}")
+        lastInnerOffset = innerOffset
+        new MessageAndOffset(newMessage, innerOffset)
+      }
+
+      override def makeNext(): MessageAndOffset = {
+        messageAndOffsets match {
+          // Using inner offset and timestamps
+          case Some(innerMessageAndOffsets) =>
+            if (innerMessageAndOffsets.isEmpty)
+              allDone()
+            else {
+              val messageAndOffset = innerMessageAndOffsets.dequeue()
+              val message = messageAndOffset.message
+              val relativeOffset = messageAndOffset.offset - lastInnerOffset
+              val absoluteOffset = wrapperMessageOffset + relativeOffset
+              new MessageAndOffset(message, absoluteOffset)
+            }
+          // Not using inner offset and timestamps
+          case None =>
+            try {
+              readMessageFromStream()
+            } catch {
+              case eofe: EOFException =>
+                compressed.close()
+                allDone()
+              case ioe: IOException =>
+                throw new KafkaException(ioe)
+            }
+        }
       }
     }
   }
@@ -111,6 +185,20 @@ object ByteBufferMessageSet {
   }
 }
 
+private class OffsetAssigner(offsets: Seq[Long]) {
+  val index = new AtomicInteger(0)
+
+  def this(offsetCounter: AtomicLong, size: Int) {
+    this((offsetCounter.get() to offsetCounter.get + size).toSeq)
+    offsetCounter.addAndGet(size)
+  }
+
+  def nextAbsoluteOffset = offsets(index.getAndIncrement)
+
+  def toInnerOffset(offset: Long) = offset - offsets(0)
+
+}
+
 /**
  * A sequence of messages stored in a byte buffer
  *
@@ -120,22 +208,87 @@ object ByteBufferMessageSet {
  *
  * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
  *
+ *
+ * When message format v1 is used, there will be following message format changes.
+ * - For non-compressed messages, with message v1 we are adding timestamp and timestamp type attribute. The offsets of
+ *   the messages remain absolute offsets.
+ * - For Compressed messages, with message v1 we are adding timestamp, timestamp type attribute bit and using
+ *   inner offsets (IO) for inner messages of compressed messages (see offset calculation details below). Timestamp type
+ *   attribute is only set in wrapper messages. Inner messages always have CreateTime as timestamp type in attributes.
+ *
+ * The way timestamp set is following:
+ * For non-compressed messages: timestamp and timestamp type attribute in the messages are set and used.
+ * For compressed messages:
+ * 1. Wrapper messages' timestamp type attribute is set to proper value
+ * 2. Wrapper messages' timestamp is set to:
+ *    - the max timestamp of inner messages if CreateTime is used
+ *    - the current server time if wrapper message's timestamp = LogAppendTime.
+ *      In this case the wrapper message timestamp is used and all the timestamps of inner messages are ignored.
+ * 3. Inner messages' timestamp will be:
+ *    - used when wrapper message's timestamp type is CreateTime
+ *    - ignored when wrapper message's timestamp type is LogAppendTime
+ * 4. Inner messages' timestamp type will always be ignored. However, producer must set the inner message timestamp
+ *    type to CreateTime, otherwise the messages will be rejected by broker.
+ *
+ *
+ * The way absolute offset calculated is the following:
+ * Ideally the conversion from relative offset(RO) to absolute offset(AO) should be:
+ *
+ * AO = AO_Of_Last_Inner_Message + RO
+ *
+ * However, note that the message sets sent by producers are compressed in a streaming way.
+ * And the relative offset of an inner message compared with the last inner message is not known until
+ * the last inner message is written.
+ * Unfortunately we are not able to change the previously written messages after the last message is written to
+ * the message set when stream compressing is used.
+ *
+ * To solve this issue, we use the following solution:
+ *
+ * 1. When the producer creates a message set, it simply writes all the messages into a compressed message set with
+ *    offset 0, 1, ... (inner offset).
+ * 2. The broker will set the offset of the wrapper message to the absolute offset of the last message in the
+ *    message set.
+ * 3. When a consumer sees the message set, it first decompresses the entire message set to find out the inner
+ *    offset (IO) of the last inner message. Then it computes RO and AO of previous messages:
+ *
+ *    RO = IO_of_a_message - IO_of_the_last_message
+ *    AO = AO_Of_Last_Inner_Message + RO
+ *
+ * 4. This solution works for compacted message set as well
+ *
  */
 class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {
   private var shallowValidByteCount = -1
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*))
+    this(ByteBufferMessageSet.create(new OffsetAssigner(new AtomicLong(0), messages.size), compressionCodec,
+      None, TimestampType.CREATE_TIME, messages:_*))
   }
 
   def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) {
-    this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*))
+    this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, messages.size), compressionCodec,
+      None, TimestampType.CREATE_TIME, messages:_*))
+  }
+
+  def this(compressionCodec: CompressionCodec, offsetSeq: Seq[Long], messages: Message*) {
+    this(ByteBufferMessageSet.create(new OffsetAssigner(offsetSeq), compressionCodec,
+      None, TimestampType.CREATE_TIME, messages:_*))
   }
 
   def this(messages: Message*) {
     this(NoCompressionCodec, new AtomicLong(0), messages: _*)
   }
 
+  // This constructor is only used internally
+  private[kafka] def this(compressionCodec: CompressionCodec,
+                          offsetCounter: AtomicLong,
+                          wrapperMessageTimestamp: Option[Long],
+                          timestampType: TimestampType,
+                          messages: Message*) {
+    this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, messages.size), compressionCodec,
+      wrapperMessageTimestamp, timestampType, messages:_*))
+  }
+
   def getBuffer = buffer
 
   private def shallowValidBytes: Int = {
@@ -162,6 +315,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
     written
   }
 
+  override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
+    for (messageAndOffset <- shallowIterator) {
+      if (messageAndOffset.message.magic != expectedMagicValue)
+        return false
+    }
+    true
+  }
+
   /** default iterator that iterates over decompressed messages */
   override def iterator: Iterator[MessageAndOffset] = internalIterator()
 
@@ -182,7 +343,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           return allDone()
         val offset = topIter.getLong()
         val size = topIter.getInt()
-        if(size < Message.MinHeaderSize)
+        if(size < Message.MinMessageOverhead)
           throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")
 
         // we have an incomplete message
@@ -194,7 +355,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
         message.limit(size)
         topIter.position(topIter.position + size)
         val newMessage = new Message(message)
-
         if(isShallow) {
           new MessageAndOffset(newMessage, offset)
         } else {
@@ -203,7 +363,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
               innerIter = null
               new MessageAndOffset(newMessage, offset)
             case _ =>
-              innerIter = ByteBufferMessageSet.deepIterator(newMessage)
+              innerIter = ByteBufferMessageSet.deepIterator(new MessageAndOffset(newMessage, offset))
               if(!innerIter.hasNext)
                 innerIter = null
               makeNext()
@@ -226,48 +386,205 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   }
 
   /**
-   * Update the offsets for this message set and do further validation on messages. This method attempts to do an
-   * in-place conversion if there is no compression, but otherwise recopies the messages
+   * Update the offsets for this message set and do further validation on messages including:
+   * 1. Messages for compacted topics must have keys
+   * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
+   *    starting from 0.
+   * 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
+   *
+   * This method will convert the messages in the following scenarios:
+   * A. Magic value of a message = 0 and messageFormatVersion is 1
+   * B. Magic value of a message = 1 and messageFormatVersion is 0
+   *
+   * If no format conversion or value overwriting is required for messages, this method will perform in-place
+   * operations and avoid re-compression.
    */
   private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
+                                                      now: Long = System.currentTimeMillis(),
                                                       sourceCodec: CompressionCodec,
                                                       targetCodec: CompressionCodec,
-                                                      compactedTopic: Boolean = false): ByteBufferMessageSet = {
-    if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
-      // do in-place validation and offset assignment
-      var messagePosition = 0
-      buffer.mark()
-      while(messagePosition < sizeInBytes - MessageSet.LogOverhead) {
-        buffer.position(messagePosition)
-        buffer.putLong(offsetCounter.getAndIncrement())
-        val messageSize = buffer.getInt()
-        val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength
-        if (compactedTopic && positionAfterKeySize < sizeInBytes) {
-          buffer.position(buffer.position() + Message.KeySizeOffset)
-          val keySize = buffer.getInt()
-          if (keySize <= 0) {
-            buffer.reset()
-            throw new InvalidMessageException("Compacted topic cannot accept message without key.")
-          }
-        }
-        messagePosition += MessageSet.LogOverhead + messageSize
+                                                      compactedTopic: Boolean = false,
+                                                      messageFormatVersion: Byte = Message.CurrentMagicValue,
+                                                      messageTimestampType: TimestampType,
+                                                      messageTimestampDiffMaxMs: Long): ByteBufferMessageSet = {
+    if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
+      // check the magic value
+      if (!magicValueInAllWrapperMessages(messageFormatVersion)) {
+        // Message format conversion
+        convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
+          messageFormatVersion)
+      } else {
+        // Do in-place validation, offset assignment and maybe set timestamp
+        validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
+          messageTimestampDiffMaxMs)
       }
-      buffer.reset()
-      this
+
     } else {
-      // We need to deep-iterate over the message-set if any of these are true:
-      // (i) messages are compressed
-      // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec
-      val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
-        if (compactedTopic && !messageAndOffset.message.hasKey)
-          throw new InvalidMessageException("Compacted topic cannot accept message without key.")
-
-        messageAndOffset.message
+      // Deal with compressed messages
+      // We cannot do in place assignment in one of the following situations:
+      // 1. Source and target compression codec are different
+      // 2. When magic value to use is 0 because offsets need to be overwritten
+      // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.
+      // 4. Message format conversion is needed.
+
+      // No in place assignment situation 1 and 2
+      var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
+
+      var maxTimestamp = Message.NoTimestamp
+      val expectedInnerOffset = new AtomicLong(0)
+      val validatedMessages = new ListBuffer[Message]
+      this.internalIterator(isShallow = false).foreach(messageAndOffset => {
+        val message = messageAndOffset.message
+        validateMessageKey(message, compactedTopic)
+        if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
+          // No in place assignment situation 3
+          // Validate the timestamp
+          validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)
+          // Check if we need to overwrite offset
+          if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement)
+            inPlaceAssignment = false
+          maxTimestamp = math.max(maxTimestamp, message.timestamp)
+        }
+
+        // No in place assignment situation 4
+        if (message.magic != messageFormatVersion)
+          inPlaceAssignment = false
+
+        validatedMessages += message.toFormatVersion(messageFormatVersion)
       })
-      new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
+      if (!inPlaceAssignment) {
+        // Cannot do in place assignment.
+        val wrapperMessageTimestamp = {
+          if (messageFormatVersion == Message.MagicValue_V0)
+            Some(Message.NoTimestamp)
+          else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
+            Some(maxTimestamp)
+          else // Log append time
+            Some(now)
+        }
+
+        new ByteBufferMessageSet(compressionCodec = targetCodec,
+                                 offsetCounter = offsetCounter,
+                                 wrapperMessageTimestamp = wrapperMessageTimestamp,
+                                 timestampType = messageTimestampType,
+                                 messages = validatedMessages.toBuffer: _*)
+      } else {
+        // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
+        buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
+        // validate the messages
+        validatedMessages.foreach(_.ensureValid())
+
+        var crcUpdateNeeded = true
+        val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset
+        val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
+        val timestamp = buffer.getLong(timestampOffset)
+        val attributes = buffer.get(attributeOffset)
+        if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
+            // We don't need to recompute crc if the timestamp is not updated.
+            crcUpdateNeeded = false
+        else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
+          // Set timestamp type and timestamp
+          buffer.putLong(timestampOffset, now)
+          buffer.put(attributeOffset, TimestampType.setTimestampType(attributes, TimestampType.LOG_APPEND_TIME))
+        }
+
+        if (crcUpdateNeeded) {
+          // need to recompute the crc value
+          buffer.position(MessageSet.LogOverhead)
+          val wrapperMessage = new Message(buffer.slice())
+          Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum())
+        }
+        buffer.rewind()
+        this
+      }
+    }
+  }
+
+  // We create this method to save memory copy operation. It reads from the original message set and directly
+  // writes the converted messages into new message set buffer. Hence we don't need to allocate memory for each
+  // individual message during message format conversion.
+  private def convertNonCompressedMessages(offsetCounter: AtomicLong,
+                                           compactedTopic: Boolean,
+                                           now: Long,
+                                           timestampType: TimestampType,
+                                           messageTimestampDiffMaxMs: Long,
+                                           toMagicValue: Byte): ByteBufferMessageSet = {
+    val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).foldLeft(0)(
+      (sizeDiff, messageAndOffset) => sizeDiff + Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue))
+    val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
+    var newMessagePosition = 0
+    this.internalIterator(isShallow = true).foreach {messageAndOffset =>
+      val message = messageAndOffset.message
+      validateMessageKey(message, compactedTopic)
+      validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs)
+      newBuffer.position(newMessagePosition)
+      // write offset.
+      newBuffer.putLong(offsetCounter.getAndIncrement)
+      // Write new message size
+      val newMessageSize = message.size + Message.headerSizeDiff(message.magic, toMagicValue)
+      newBuffer.putInt(newMessageSize)
+      // Create new message buffer
+      val newMessageBuffer = newBuffer.slice()
+      newMessageBuffer.limit(newMessageSize)
+      // Convert message
+      message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType)
+
+      newMessagePosition += MessageSet.LogOverhead + newMessageSize
     }
+    newBuffer.rewind()
+    new ByteBufferMessageSet(newBuffer)
   }
 
+  private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: AtomicLong,
+                                                                  now: Long,
+                                                                  compactedTopic: Boolean,
+                                                                  timestampType: TimestampType,
+                                                                  timestampDiffMaxMs: Long): ByteBufferMessageSet = {
+    // do in-place validation and offset assignment
+    var messagePosition = 0
+    buffer.mark()
+    while (messagePosition < sizeInBytes - MessageSet.LogOverhead) {
+      buffer.position(messagePosition)
+      buffer.putLong(offsetCounter.getAndIncrement())
+      val messageSize = buffer.getInt()
+      val messageBuffer = buffer.slice()
+      messageBuffer.limit(messageSize)
+      val message = new Message(messageBuffer)
+      validateMessageKey(message, compactedTopic)
+      if (message.magic > Message.MagicValue_V0) {
+        validateTimestamp(message, now, timestampType, timestampDiffMaxMs)
+        if (timestampType == TimestampType.LOG_APPEND_TIME) {
+          message.buffer.putLong(Message.TimestampOffset, now)
+          message.buffer.put(Message.AttributesOffset, TimestampType.setTimestampType(message.attributes, TimestampType.LOG_APPEND_TIME))
+          Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum())
+        }
+      }
+      messagePosition += MessageSet.LogOverhead + messageSize
+    }
+    buffer.reset()
+    this
+  }
+
+  private def validateMessageKey(message: Message, compactedTopic: Boolean) {
+    if (compactedTopic && !message.hasKey)
+      throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+  }
+
+  /**
+   * This method validates the timestamps of a message.
+   * If the message is using create time, this method checks if it is within acceptable range.
+   */
+  private def validateTimestamp(message: Message,
+                                now: Long,
+                                timestampType: TimestampType,
+                                timestampDiffMaxMs: Long) {
+    if (timestampType == TimestampType.CREATE_TIME && math.abs(message.timestamp - now) > timestampDiffMaxMs)
+      throw new InvalidTimestampException(s"Timestamp ${message.timestamp} of message is out of range. " +
+        s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}")
+    if (message.timestampType == TimestampType.LOG_APPEND_TIME)
+      throw new InvalidTimestampException(s"Invalid timestamp type in message $message. Producer should not set " +
+        s"timestamp type to LogAppendTime.")
+  }
 
   /**
    * The total number of bytes in this message set, including any partial trailing messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 999b115..51aa11a 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -18,6 +18,9 @@
 package kafka.message
 
 import java.nio._
+
+import org.apache.kafka.common.record.TimestampType
+
 import scala.math._
 import kafka.utils._
 import org.apache.kafka.common.utils.Utils
@@ -26,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
  * Constants related to messages
  */
 object Message {
-  
+
   /**
    * The current offset and size for all the fixed-length fields
    */
@@ -36,83 +39,144 @@ object Message {
   val MagicLength = 1
   val AttributesOffset = MagicOffset + MagicLength
   val AttributesLength = 1
-  val KeySizeOffset = AttributesOffset + AttributesLength
+  // Only message format version 1 has the timestamp field.
+  val TimestampOffset = AttributesOffset + AttributesLength
+  val TimestampLength = 8
+  val KeySizeOffset_V0 = AttributesOffset + AttributesLength
+  val KeySizeOffset_V1 = TimestampOffset + TimestampLength
   val KeySizeLength = 4
-  val KeyOffset = KeySizeOffset + KeySizeLength
+  val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
+  val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
   val ValueSizeLength = 4
-  
-  /** The amount of overhead bytes in a message */
-  val MessageOverhead = KeyOffset + ValueSizeLength
-  
+
+  private val MessageHeaderSizeMap = Map (
+    0.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength),
+    1.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength))
+
   /**
-   * The minimum valid size for the message header
+   * The amount of overhead bytes in a message
+   * This value is only used to check if the message size is valid or not. So the minimum possible message bytes is
+   * used here, which comes from a message in message format V0 with empty key and value.
    */
-  val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength
+  val MinMessageOverhead = KeyOffset_V0 + ValueSizeLength
   
   /**
-   * The current "magic" value
+   * The "magic" value
+   * When magic value is 0, the message uses absolute offset and does not have a timestamp field.
+   * When magic value is 1, the message uses relative offset and has a timestamp field.
    */
-  val CurrentMagicValue: Byte = 0
+  val MagicValue_V0: Byte = 0
+  val MagicValue_V1: Byte = 1
+  val CurrentMagicValue: Byte = 1
 
   /**
    * Specifies the mask for the compression code. 3 bits to hold the compression codec.
    * 0 is reserved to indicate no compression
    */
   val CompressionCodeMask: Int = 0x07
+  /**
+   * Specifies the mask for timestamp type. 1 bit at the 4th least significant bit.
+   * 0 for CreateTime, 1 for LogAppendTime
+   */
+  val TimestampTypeMask: Byte = 0x08
+  val TimestampTypeAttributeBitOffset: Int = 3
 
   /**
    * Compression code for uncompressed messages
    */
   val NoCompression: Int = 0
 
+  /**
+   * To indicate timestamp is not defined so "magic" value 0 will be used.
+   */
+  val NoTimestamp: Long = -1
+
+  /**
+   * Give the header size difference between different message versions.
+   */
+  def headerSizeDiff(fromMagicValue: Byte, toMagicValue: Byte) : Int =
+    MessageHeaderSizeMap(toMagicValue) - MessageHeaderSizeMap(fromMagicValue)
+
+
 }
 
 /**
  * A message. The format of an N byte message is the following:
  *
  * 1. 4 byte CRC32 of the message
- * 2. 1 byte "magic" identifier to allow format changes, value is 0 currently
- * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
- * 4. 4 byte key length, containing length K
- * 5. K byte key
- * 6. 4 byte payload length, containing length V
- * 7. V byte payload
+ * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
+ * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
+ *    bit 0 ~ 2 : Compression codec.
+ *      0 : no compression
+ *      1 : gzip
+ *      2 : snappy
+ *      3 : lz4
+ *    bit 3 : Timestamp type
+ *      0 : create time
+ *      1 : log append time
+ *    bit 4 ~ 7 : reserved
+ * 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
+ * 5. 4 byte key length, containing length K
+ * 6. K byte key
+ * 7. 4 byte payload length, containing length V
+ * 8. V byte payload
  *
  * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents.
+ * @param buffer the byte buffer of this message.
+ * @param wrapperMessageTimestamp the wrapper message timestamp, only not None when the message is an inner message
+ *                                of a compressed message.
+ * @param wrapperMessageTimestampType the wrapper message timestamp type, only not None when the message is an inner
+ *                                    message of a compressed message.
  */
-class Message(val buffer: ByteBuffer) {
+class Message(val buffer: ByteBuffer,
+              private val wrapperMessageTimestamp: Option[Long] = None,
+              private val wrapperMessageTimestampType: Option[TimestampType] = None) {
   
   import kafka.message.Message._
-  
+
   /**
    * A constructor to create a Message
    * @param bytes The payload of the message
-   * @param codec The compression codec used on the contents of the message (if any)
    * @param key The key of the message (null, if none)
+   * @param timestamp The timestamp of the message.
+   * @param timestampType The timestamp type of the message.
+   * @param codec The compression codec used on the contents of the message (if any)
    * @param payloadOffset The offset into the payload array used to extract payload
    * @param payloadSize The size of the payload to use
+   * @param magicValue the magic value to use
    */
   def this(bytes: Array[Byte], 
-           key: Array[Byte],            
+           key: Array[Byte],
+           timestamp: Long,
+           timestampType: TimestampType,
            codec: CompressionCodec, 
            payloadOffset: Int, 
-           payloadSize: Int) = {
-    this(ByteBuffer.allocate(Message.CrcLength + 
-                             Message.MagicLength + 
-                             Message.AttributesLength + 
+           payloadSize: Int,
+           magicValue: Byte) = {
+    this(ByteBuffer.allocate(Message.CrcLength +
+                             Message.MagicLength +
+                             Message.AttributesLength +
+                             (if (magicValue == Message.MagicValue_V0) 0
+                              else Message.TimestampLength) +
                              Message.KeySizeLength + 
                              (if(key == null) 0 else key.length) + 
                              Message.ValueSizeLength + 
                              (if(bytes == null) 0 
                               else if(payloadSize >= 0) payloadSize 
                               else bytes.length - payloadOffset)))
+    validateTimestampAndMagicValue(timestamp, magicValue)
     // skip crc, we will fill that in at the end
     buffer.position(MagicOffset)
-    buffer.put(CurrentMagicValue)
+    buffer.put(magicValue)
     var attributes: Byte = 0
-    if (codec.codec > 0)
-      attributes =  (attributes | (CompressionCodeMask & codec.codec)).toByte
+    if (codec.codec > 0) {
+      attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
+      attributes = TimestampType.setTimestampType(attributes, timestampType)
+    }
     buffer.put(attributes)
+    // Only put timestamp when "magic" value is greater than 0
+    if (magic > MagicValue_V0)
+      buffer.putLong(timestamp)
     if(key == null) {
       buffer.putInt(-1)
     } else {
@@ -126,22 +190,25 @@ class Message(val buffer: ByteBuffer) {
     if(bytes != null)
       buffer.put(bytes, payloadOffset, size)
     buffer.rewind()
-    
+
     // now compute the checksum and fill it in
     Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
   }
   
-  def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = 
-    this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1)
+  def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
+    this(bytes = bytes, key = key, timestamp = timestamp, timestampType = TimestampType.CREATE_TIME, codec = codec, payloadOffset = 0, payloadSize = -1, magicValue = magicValue)
   
-  def this(bytes: Array[Byte], codec: CompressionCodec) = 
-    this(bytes = bytes, key = null, codec = codec)
+  def this(bytes: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
+    this(bytes = bytes, key = null, timestamp = timestamp, codec = codec, magicValue = magicValue)
   
-  def this(bytes: Array[Byte], key: Array[Byte]) = 
-    this(bytes = bytes, key = key, codec = NoCompressionCodec)
+  def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, magicValue: Byte) =
+    this(bytes = bytes, key = key, timestamp = timestamp, codec = NoCompressionCodec, magicValue = magicValue)
     
-  def this(bytes: Array[Byte]) = 
-    this(bytes = bytes, key = null, codec = NoCompressionCodec)
+  def this(bytes: Array[Byte], timestamp: Long, magicValue: Byte) =
+    this(bytes = bytes, key = null, timestamp = timestamp, codec = NoCompressionCodec, magicValue = magicValue)
+
+  def this(bytes: Array[Byte]) =
+    this(bytes = bytes, key = null, timestamp = Message.NoTimestamp, codec = NoCompressionCodec, magicValue = Message.CurrentMagicValue)
     
   /**
    * Compute the checksum of the message from the message contents
@@ -171,11 +238,19 @@ class Message(val buffer: ByteBuffer) {
    * The complete serialized size of this message in bytes (including crc, header attributes, etc)
    */
   def size: Int = buffer.limit
-  
+
+  /**
+   * The position where the key size is stored.
+   */
+  def keySizeOffset = {
+    if (magic == MagicValue_V0) KeySizeOffset_V0
+    else KeySizeOffset_V1
+  }
+
   /**
    * The length of the key in bytes
    */
-  def keySize: Int = buffer.getInt(Message.KeySizeOffset)
+  def keySize: Int = buffer.getInt(keySizeOffset)
   
   /**
    * Does the message have a key?
@@ -185,7 +260,10 @@ class Message(val buffer: ByteBuffer) {
   /**
    * The position where the payload size is stored
    */
-  private def payloadSizeOffset = Message.KeyOffset + max(0, keySize)
+  def payloadSizeOffset = {
+    if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize)
+    else KeyOffset_V1 + max(0, keySize)
+  }
   
   /**
    * The length of the message value in bytes
@@ -206,6 +284,33 @@ class Message(val buffer: ByteBuffer) {
    * The attributes stored with this message
    */
   def attributes: Byte = buffer.get(AttributesOffset)
+
+  /**
+   * The timestamp of the message, only available when the "magic" value is greater than 0
+   * When magic > 0, The timestamp of a message is determined in the following way:
+   * 1. wrapperMessageTimestampType = None and wrapperMessageTimestamp is None - Uncompressed message, timestamp and timestamp type are in the message.
+   * 2. wrapperMessageTimestampType = LogAppendTime and wrapperMessageTimestamp is defined - Compressed message using LogAppendTime
+   * 3. wrapperMessageTimestampType = CreateTime and wrapperMessageTimestamp is defined - Compressed message using CreateTime
+   */
+  def timestamp: Long = {
+    if (magic == MagicValue_V0)
+      Message.NoTimestamp
+    // Case 2
+    else if (wrapperMessageTimestampType.exists(_ == TimestampType.LOG_APPEND_TIME) && wrapperMessageTimestamp.isDefined)
+      wrapperMessageTimestamp.get
+    else // case 1, 3
+      buffer.getLong(Message.TimestampOffset)
+  }
+
+  /**
+   * The timestamp type of the message
+   */
+  def timestampType = {
+    if (magic == MagicValue_V0)
+      TimestampType.NO_TIMESTAMP_TYPE
+    else
+      wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes))
+  }
   
   /**
    * The compression codec used with this message
@@ -221,8 +326,54 @@ class Message(val buffer: ByteBuffer) {
   /**
    * A ByteBuffer containing the message key
    */
-  def key: ByteBuffer = sliceDelimited(KeySizeOffset)
-  
+  def key: ByteBuffer = sliceDelimited(keySizeOffset)
+
+  /**
+   * convert the message to specified format
+   */
+  def toFormatVersion(toMagicValue: Byte): Message = {
+    if (magic == toMagicValue)
+      this
+    else {
+      val byteBuffer = ByteBuffer.allocate(size + Message.headerSizeDiff(magic, toMagicValue))
+      // Copy bytes from old messages to new message
+      convertToBuffer(toMagicValue, byteBuffer)
+      new Message(byteBuffer)
+    }
+  }
+
+  def convertToBuffer(toMagicValue: Byte,
+                      byteBuffer: ByteBuffer,
+                      now: Long = NoTimestamp,
+                      timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes))) {
+    if (byteBuffer.remaining() < size + headerSizeDiff(magic, toMagicValue))
+      throw new IndexOutOfBoundsException("The byte buffer does not have enough capacity to hold new message format " +
+        "version " + toMagicValue)
+    if (toMagicValue == Message.MagicValue_V1) {
+      // Up-conversion, reserve CRC and update magic byte
+      byteBuffer.position(Message.MagicOffset)
+      byteBuffer.put(Message.MagicValue_V1)
+      byteBuffer.put(TimestampType.setTimestampType(attributes, timestampType))
+      // Up-conversion, insert the timestamp field
+      if (timestampType == TimestampType.LOG_APPEND_TIME)
+        byteBuffer.putLong(now)
+      else
+        byteBuffer.putLong(Message.NoTimestamp)
+      byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V0, size - Message.KeySizeOffset_V0)
+    } else {
+      // Down-conversion, reserve CRC and update magic byte
+      byteBuffer.position(Message.MagicOffset)
+      byteBuffer.put(Message.MagicValue_V0)
+      byteBuffer.put(TimestampType.setTimestampType(attributes, TimestampType.CREATE_TIME))
+      // Down-conversion, skip the timestamp field
+      byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V1, size - Message.KeySizeOffset_V1)
+    }
+    // update crc value
+    val newMessage = new Message(byteBuffer)
+    Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum())
+    byteBuffer.rewind()
+  }
+
   /**
    * Read a size-delimited byte buffer starting at the given offset
    */
@@ -240,9 +391,25 @@ class Message(val buffer: ByteBuffer) {
     }
   }
 
-  override def toString(): String = 
-    "Message(magic = %d, attributes = %d, crc = %d, key = %s, payload = %s)".format(magic, attributes, checksum, key, payload)
-  
+  /**
+   * Validate the timestamp and "magic" value
+   */
+  private def validateTimestampAndMagicValue(timestamp: Long, magic: Byte) {
+    if (magic != MagicValue_V0 && magic != MagicValue_V1)
+      throw new IllegalArgumentException("Invalid magic value " + magic)
+    if (timestamp < 0 && timestamp != NoTimestamp)
+      throw new IllegalArgumentException("Invalid message timestamp " + timestamp)
+    if (magic == MagicValue_V0 && timestamp != NoTimestamp)
+      throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
+  }
+
+  override def toString(): String = {
+    if (magic == MagicValue_V0)
+      s"Message(magic = $magic, attributes = $attributes, crc = $checksum, key = $key, payload = $payload)"
+    else
+      s"Message(magic = $magic, attributes = $attributes, $timestampType = $timestamp, crc = $checksum, key = $key, payload = $payload)"
+  }
+
   override def equals(any: Any): Boolean = {
     any match {
       case that: Message => this.buffer.equals(that.buffer)
@@ -251,5 +418,5 @@ class Message(val buffer: ByteBuffer) {
   }
   
   override def hashCode(): Int = buffer.hashCode
-  
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/MessageAndMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
index 26b75c8..ac9ef77 100755
--- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala
+++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
@@ -18,12 +18,17 @@
 package kafka.message
 
 import kafka.serializer.Decoder
+import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
 
-case class MessageAndMetadata[K, V](topic: String, partition: Int,
-                                    private val rawMessage: Message, offset: Long,
+case class MessageAndMetadata[K, V](topic: String,
+                                    partition: Int,
+                                    private val rawMessage: Message,
+                                    offset: Long,
+                                    timestamp: Long = Message.NoTimestamp,
+                                    timestampType: TimestampType = TimestampType.CREATE_TIME,
                                     keyDecoder: Decoder[K], valueDecoder: Decoder[V]) {
-
+  
   /**
    * Return the decoded message key and payload
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index 28b56e6..014788a 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -54,8 +54,26 @@ object MessageSet {
    */
   def entrySize(message: Message): Int = LogOverhead + message.size
 
+  /**
+   * Validate the "magic" values of messages are the same in a compressed message set and return the magic value of
+   * and the max timestamp of the inner messages.
+   */
+  def magicAndLargestTimestamp(messages: Seq[Message]): MagicAndTimestamp = {
+    val firstMagicValue = messages.head.magic
+    var largestTimestamp: Long = Message.NoTimestamp
+    for (message <- messages) {
+      if (message.magic != firstMagicValue)
+        throw new IllegalStateException("Messages in the same message set must have same magic value")
+      if (firstMagicValue > Message.MagicValue_V0)
+        largestTimestamp = math.max(largestTimestamp, message.timestamp)
+    }
+    MagicAndTimestamp(firstMagicValue, largestTimestamp)
+  }
+
 }
 
+case class MagicAndTimestamp(magic: Byte, timestamp: Long)
+
 /**
  * A set of messages with offsets. A message set has a fixed serialized form, though the container
  * for the bytes could be either in-memory or on disk. The format of each message is
@@ -70,7 +88,12 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
     * Less than the complete amount may be written, but no more than maxSize can be. The number
     * of bytes written is returned */
   def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
-  
+
+  /**
+   * Check if all the wrapper messages in the message set have the expected magic value
+   */
+  def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean
+
   /**
    * Provides an iterator over the message/offset pairs in this set
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/MessageWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageWriter.scala b/core/src/main/scala/kafka/message/MessageWriter.scala
index 0c6040e..660772c 100755
--- a/core/src/main/scala/kafka/message/MessageWriter.scala
+++ b/core/src/main/scala/kafka/message/MessageWriter.scala
@@ -20,19 +20,31 @@ package kafka.message
 import java.io.{InputStream, OutputStream}
 import java.nio.ByteBuffer
 
+import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Crc32
 
 class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize) {
 
   import Message._
 
-  def write(key: Array[Byte] = null, codec: CompressionCodec)(writePayload: OutputStream => Unit): Unit = {
+  def write(key: Array[Byte] = null,
+            codec: CompressionCodec,
+            timestamp: Long,
+            timestampType: TimestampType,
+            magicValue: Byte)(writePayload: OutputStream => Unit): Unit = {
     withCrc32Prefix {
-      write(CurrentMagicValue)
+      // write magic value
+      write(magicValue)
+      // write attributes
       var attributes: Byte = 0
       if (codec.codec > 0)
         attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
+      if (magicValue > MagicValue_V0)
+        attributes = TimestampType.setTimestampType(attributes, timestampType)
       write(attributes)
+      // Write timestamp
+      if (magicValue > MagicValue_V0)
+        writeLong(timestamp)
       // write the key
       if (key == null) {
         writeInt(-1)
@@ -61,6 +73,17 @@ class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize)
     out.write(value)
   }
 
+  private def writeLong(value: Long): Unit = {
+    write((value >>> 56).toInt)
+    write((value >>> 48).toInt)
+    write((value >>> 40).toInt)
+    write((value >>> 32).toInt)
+    write((value >>> 24).toInt)
+    write((value >>> 16).toInt)
+    write((value >>> 8).toInt)
+    write(value.toInt)
+  }
+
   private def withCrc32Prefix(writeData: => Unit): Unit = {
     // get a writer for CRC value
     val crcWriter = reserve(CrcLength)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 4e67ba4..7abe48a 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -21,7 +21,7 @@ import kafka.common._
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.producer._
 import kafka.serializer.Encoder
-import kafka.utils.{CoreUtils, Logging, SystemTime}
+import kafka.utils._
 import org.apache.kafka.common.errors.{LeaderNotAvailableException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.protocol.Errors
 import scala.util.Random
@@ -36,8 +36,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
                                private val encoder: Encoder[V],
                                private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
-                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
+                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
+                               private val time: Time = SystemTime)
   extends EventHandler[K,V] with Logging {
+
   val isSync = ("sync" == config.producerType)
 
   val correlationId = new AtomicInteger(0)
@@ -129,9 +131,22 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     events.foreach{e =>
       try {
         if(e.hasKey)
-          serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))
+          serializedMessages += new KeyedMessage[K,Message](
+            topic = e.topic,
+            key = e.key,
+            partKey = e.partKey,
+            message = new Message(key = keyEncoder.toBytes(e.key),
+                                  bytes = encoder.toBytes(e.message),
+                                  timestamp = time.milliseconds,
+                                  magicValue = Message.MagicValue_V1))
         else
-          serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message)))
+          serializedMessages += new KeyedMessage[K,Message](
+            topic = e.topic,
+            key = e.key,
+            partKey = e.partKey,
+            message = new Message(bytes = encoder.toBytes(e.message),
+                                  timestamp = time.milliseconds,
+                                  magicValue = Message.MagicValue_V1))
       } catch {
         case t: Throwable =>
           producerStats.serializationErrorRate.mark()

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b3873a6..8b688b9 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -104,7 +104,7 @@ abstract class AbstractFetcherThread(name: String,
     } catch {
       case t: Throwable =>
         if (isRunning.get) {
-          warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
+          warn(s"Error in fetch $fetchRequest", t)
           inLock(partitionMapLock) {
             partitionsWithError ++= partitionMap.keys
             // there is an error occurred while fetching partitions, sleep a while

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index bc599a0..9343fde 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -19,8 +19,10 @@ package kafka.server
 
 import java.util.Properties
 
+import kafka.api.ApiVersion
 import kafka.common.TopicAndPartition
 import kafka.log.{Log, LogConfig, LogManager}
+import kafka.utils.Logging
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.protocol.ApiKeys
 
@@ -38,12 +40,22 @@ trait ConfigHandler {
  * The TopicConfigHandler will process topic config changes in ZK.
  * The callback provides the topic name and the full properties set read from ZK
  */
-class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler {
+class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig) extends ConfigHandler with Logging {
 
   def processConfigChanges(topic : String, topicConfig : Properties) {
     val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
     val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case (topicAndPartition, log) => topicAndPartition.topic }
             .mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) }
+    // Validate the compatibility of message format version.
+    Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)) match {
+      case Some(versionString) =>
+        if (!kafkaConfig.interBrokerProtocolVersion.onOrAfter(ApiVersion(versionString))) {
+          topicConfig.remove(LogConfig.MessageFormatVersionProp)
+          warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for $topic because $versionString " +
+            s"is not compatible with Kafka inter broker protocol version ${kafkaConfig.interBrokerProtocolVersion}")
+        }
+      case _ =>
+    }
 
     if (logsByTopic.contains(topic)) {
       /* combine the default properties with the overrides in zk to create the new LogConfig */

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f2e9533..bd02630 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,7 @@ import kafka.common._
 import kafka.controller.KafkaController
 import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
 import kafka.log._
-import kafka.message.{ByteBufferMessageSet, MessageSet}
+import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.network._
 import kafka.network.RequestChannel.{Session, Response}
 import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
@@ -35,7 +35,7 @@ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
 ClusterAuthorizationException}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ProtoUtils, ApiKeys, Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
 DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
 LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
@@ -330,7 +330,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
 
-      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1))
+      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ =>
+        new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp))
 
       var errorInResponse = false
 
@@ -367,7 +368,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           val respHeader = new ResponseHeader(request.header.correlationId)
           val respBody = request.header.apiVersion match {
             case 0 => new ProduceResponse(mergedResponseStatus.asJava)
-            case 1 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs)
+            case 1 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, 1)
+            case 2 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, 2)
             // This case shouldn't happen unless a new version of ProducerRequest is added without
             // updating this part of the code to handle it properly.
             case _ => throw new IllegalArgumentException("Version %d of ProducerRequest is not handled. Code must be updated."
@@ -426,7 +428,32 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a fetch response
     def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
-      val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus
+
+      val convertedResponseStatus =
+        // Need to down-convert message when consumer only takes magic value 0.
+        if (fetchRequest.versionId <= 1) {
+          responsePartitionData.map({ case (tp, data) =>
+            tp -> {
+              // We only do down-conversion when:
+              // 1. The message format version configured for the topic is using magic value > 0, and
+              // 2. The message set contains message whose magic > 0
+              // This is to reduce the message format conversion as much as possible. The conversion will only occur
+              // when new message format is used for the topic and we see an old request.
+              // Please notice that if the message format is changed from a higher version back to lower version this
+              // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
+              // without format down conversion.
+              if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
+                  !data.messages.magicValueInAllWrapperMessages(Message.MagicValue_V0)) {
+                trace("Down converting message to V0 for fetch request from " + fetchRequest.clientId)
+                new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
+              } else
+                data
+            }
+          })
+        } else
+          responsePartitionData
+
+      val mergedResponseStatus = convertedResponseStatus ++ unauthorizedResponseStatus
 
       mergedResponseStatus.foreach { case (topicAndPartition, data) =>
         if (data.error != Errors.NONE.code) {
@@ -440,6 +467,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       def fetchResponseCallback(delayTimeMs: Int) {
+          trace(s"Sending fetch response to ${fetchRequest.clientId} with ${convertedResponseStatus.values.map(_.messages.sizeInBytes).sum}" +
+            s" bytes")
         val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)
         requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
       }
@@ -453,10 +482,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchResponseCallback(0)
       } else {
         quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
-                                                                   FetchResponse.responseSize(responsePartitionData
-                                                                                                      .groupBy(_._1.topic),
-                                                                                              fetchRequest.versionId),
-                                                                   fetchResponseCallback)
+                                                               FetchResponse.responseSize(responsePartitionData.groupBy(_._1.topic),
+                                                                                          fetchRequest.versionId),
+                                                               fetchResponseCallback)
       }
     }