You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/19 16:56:50 UTC
[3/5] kafka git commit: KAFKA-3025;
Added timetamp to Message and use relative offset.
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index d4ce498..fe31ad4 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -31,6 +31,8 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.network.TransportLayer
import org.apache.kafka.common.utils.Utils
+import scala.collection.mutable.ArrayBuffer
+
/**
* An on-disk message set. An optional start and end position can be applied to the message set
* which will allow slicing a subset of the file.
@@ -139,7 +141,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
if(offset >= targetOffset)
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
- if(messageSize < Message.MessageOverhead)
+ if(messageSize < Message.MinMessageOverhead)
throw new IllegalStateException("Invalid message size: " + messageSize)
position += MessageSet.LogOverhead + messageSize
}
@@ -172,6 +174,63 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
/**
+ * This method is called before we write messages to socket use zero-copy transfer. We need to
+ * make sure all the messages in the message set has expected magic value
+ * @param expectedMagicValue the magic value expected
+ * @return true if all messages has expected magic value, false otherwise
+ */
+ override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
+ var location = start
+ val offsetAndSizeBuffer = ByteBuffer.allocate(MessageSet.LogOverhead)
+ val crcAndMagicByteBuffer = ByteBuffer.allocate(Message.CrcLength + Message.MagicLength)
+ while(location < end) {
+ offsetAndSizeBuffer.rewind()
+ channel.read(offsetAndSizeBuffer, location)
+ if (offsetAndSizeBuffer.hasRemaining)
+ return true
+ offsetAndSizeBuffer.rewind()
+ offsetAndSizeBuffer.getLong // skip offset field
+ val messageSize = offsetAndSizeBuffer.getInt
+ if(messageSize < Message.MinMessageOverhead)
+ throw new IllegalStateException("Invalid message size: " + messageSize)
+ crcAndMagicByteBuffer.rewind()
+ channel.read(crcAndMagicByteBuffer, location + MessageSet.LogOverhead)
+ if (crcAndMagicByteBuffer.get(Message.MagicOffset) != expectedMagicValue)
+ return false
+ location += (MessageSet.LogOverhead + messageSize)
+ }
+ true
+ }
+
+ /**
+ * Convert this message set to use specified message format.
+ */
+ def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = {
+ val offsets = new ArrayBuffer[Long]
+ val newMessages = new ArrayBuffer[Message]
+ this.iterator().foreach(messageAndOffset => {
+ val message = messageAndOffset.message
+ if (message.compressionCodec == NoCompressionCodec) {
+ newMessages += messageAndOffset.message.toFormatVersion(toMagicValue)
+ offsets += messageAndOffset.offset
+ } else {
+ // File message set only has shallow iterator. We need to do deep iteration here if needed.
+ val deepIter = ByteBufferMessageSet.deepIterator(messageAndOffset)
+ for (innerMessageAndOffset <- deepIter) {
+ newMessages += innerMessageAndOffset.message.toFormatVersion(toMagicValue)
+ offsets += innerMessageAndOffset.offset
+ }
+ }
+ })
+
+ // We use the offset seq to assign offsets so the offset of the messages does not change.
+ new ByteBufferMessageSet(
+ compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
+ offsetSeq = offsets.toSeq,
+ newMessages: _*)
+ }
+
+ /**
* Get a shallow iterator over the messages in the set.
*/
override def iterator() = iterator(Int.MaxValue)
@@ -200,7 +259,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
sizeOffsetBuffer.rewind()
val offset = sizeOffsetBuffer.getLong()
val size = sizeOffsetBuffer.getInt()
- if(size < Message.MinHeaderSize)
+ if(size < Message.MinMessageOverhead)
return allDone()
if(size > maxMessageSize)
throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 32c194d..f8c0b77 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -28,26 +28,35 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
import java.util.concurrent.atomic._
import java.text.NumberFormat
import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+import org.apache.kafka.common.record.TimestampType
import scala.collection.JavaConversions
import com.yammer.metrics.core.Gauge
object LogAppendInfo {
- val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
+ val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
}
/**
* Struct to hold various quantities we compute about each message set before appending to the log
* @param firstOffset The first offset in the message set
* @param lastOffset The last offset in the message set
- * @param shallowCount The number of shallow messages
- * @param validBytes The number of valid bytes
+ * @param timestamp The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param sourceCodec The source codec used in the message set (send by the producer)
* @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
+ * @param shallowCount The number of shallow messages
+ * @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
*/
-case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
+case class LogAppendInfo(var firstOffset: Long,
+ var lastOffset: Long,
+ var timestamp: Long,
+ sourceCodec: CompressionCodec,
+ targetCodec: CompressionCodec,
+ shallowCount: Int,
+ validBytes: Int,
+ offsetsMonotonic: Boolean)
/**
@@ -325,13 +334,23 @@ class Log(val dir: File,
if (assignOffsets) {
// assign offsets to the message set
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
+ val now = SystemTime.milliseconds
try {
- validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config
- .compact)
+ validMessages = validMessages.validateMessagesAndAssignOffsets(offset,
+ now,
+ appendInfo.sourceCodec,
+ appendInfo.targetCodec,
+ config.compact,
+ config.messageFormatVersion,
+ config.messageTimestampType,
+ config.messageTimestampDifferenceMaxMs)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
appendInfo.lastOffset = offset.get - 1
+ // If log append time is used, we put the timestamp assigned to the messages in the append info.
+ if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
+ appendInfo.timestamp = now
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
@@ -436,7 +455,7 @@ class Log(val dir: File,
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
- LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d5c247c..a3aff15 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -370,7 +370,7 @@ private[log] class Cleaner(val id: Int,
val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
- cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes)
+ cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion)
}
// trim excess index
@@ -401,10 +401,14 @@ private[log] class Cleaner(val id: Int,
* @param dest The cleaned log segment
* @param map The key=>offset mapping
* @param retainDeletes Should delete tombstones be retained while cleaning this segment
- *
+ * @param messageFormatVersion the message format version to use after compaction
*/
- private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
- dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
+ private[log] def cleanInto(topicAndPartition: TopicAndPartition,
+ source: LogSegment,
+ dest: LogSegment,
+ map: OffsetMap,
+ retainDeletes: Boolean,
+ messageFormatVersion: Byte) {
var position = 0
while (position < source.log.sizeInBytes) {
checkDone(topicAndPartition)
@@ -420,19 +424,34 @@ private[log] class Cleaner(val id: Int,
stats.readMessage(size)
if (entry.message.compressionCodec == NoCompressionCodec) {
if (shouldRetainMessage(source, map, retainDeletes, entry)) {
- ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+ val convertedMessage = entry.message.toFormatVersion(messageFormatVersion)
+ ByteBufferMessageSet.writeMessage(writeBuffer, convertedMessage, entry.offset)
stats.recopyMessage(size)
}
messagesRead += 1
} else {
- val messages = ByteBufferMessageSet.deepIterator(entry.message)
+ // We use absolute offset to decide whether retain the message or not. This is handled by
+ // deep iterator.
+ val messages = ByteBufferMessageSet.deepIterator(entry)
+ var numberOfInnerMessages = 0
+ var formatConversionNeeded = false
val retainedMessages = messages.filter(messageAndOffset => {
messagesRead += 1
+ numberOfInnerMessages += 1
+ if (messageAndOffset.message.magic != messageFormatVersion)
+ formatConversionNeeded = true
shouldRetainMessage(source, map, retainDeletes, messageAndOffset)
}).toSeq
- if (retainedMessages.nonEmpty)
- compressMessages(writeBuffer, entry.message.compressionCodec, retainedMessages)
+ // There is no messages compacted out and no message format conversion, write the original message set back
+ if (retainedMessages.size == numberOfInnerMessages && !formatConversionNeeded)
+ ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+ else if (retainedMessages.nonEmpty) {
+ val convertedRetainedMessages = retainedMessages.map(messageAndOffset => {
+ new MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion), messageAndOffset.offset)
+ })
+ compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion, convertedRetainedMessages)
+ }
}
}
@@ -452,7 +471,10 @@ private[log] class Cleaner(val id: Int,
restoreBuffers()
}
- private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messages: Seq[MessageAndOffset]) {
+ private def compressMessages(buffer: ByteBuffer,
+ compressionCodec: CompressionCodec,
+ messageFormatVersion: Byte,
+ messages: Seq[MessageAndOffset]) {
val messagesIterable = messages.toIterable.map(_.message)
if (messages.isEmpty) {
MessageSet.Empty.sizeInBytes
@@ -461,15 +483,24 @@ private[log] class Cleaner(val id: Int,
ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset)
MessageSet.messageSetSize(messagesIterable)
} else {
+ val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages.map(_.message))
+ val firstAbsoluteOffset = messages.head.offset
var offset = -1L
+ val timestampType = messages.head.message.timestampType
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 2, 1024), 1 << 16))
- messageWriter.write(codec = compressionCodec) { outputStream =>
+ messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = messageFormatVersion) { outputStream =>
val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
try {
for (messageOffset <- messages) {
val message = messageOffset.message
offset = messageOffset.offset
- output.writeLong(offset)
+ // Use inner offset when magic value is greater than 0
+ if (messageFormatVersion > Message.MagicValue_V0) {
+ // The offset of the messages are absolute offset, compute the inner offset.
+ val innerOffset = messageOffset.offset - firstAbsoluteOffset
+ output.writeLong(innerOffset)
+ } else
+ output.writeLong(offset)
output.writeInt(message.size)
output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 7fc7e33..a8fffbd 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -18,12 +18,13 @@
package kafka.log
import java.util.Properties
+
+import kafka.api.ApiVersion
+import kafka.message.{BrokerCompressionCodec, Message}
import kafka.server.KafkaConfig
-import org.apache.kafka.common.utils.Utils
-import scala.collection._
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
-import kafka.message.BrokerCompressionCodec
-import kafka.message.Message
+import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.utils.Utils
object Defaults {
val SegmentSize = kafka.server.Defaults.LogSegmentBytes
@@ -44,6 +45,9 @@ object Defaults {
val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
val CompressionType = kafka.server.Defaults.CompressionType
val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
+ val MessageFormatVersion = kafka.server.Defaults.MessageFormatVersion
+ val MessageTimestampType = kafka.server.Defaults.MessageTimestampType
+ val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.MessageTimestampDifferenceMaxMs
}
case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {
@@ -69,6 +73,9 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
+ val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp)).messageFormatVersion
+ val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
+ val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp)
def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -101,6 +108,9 @@ object LogConfig {
val MinInSyncReplicasProp = "min.insync.replicas"
val CompressionTypeProp = "compression.type"
val PreAllocateEnableProp = "preallocate"
+ val MessageFormatVersionProp = KafkaConfig.MessageFormatVersionProp
+ val MessageTimestampTypeProp = KafkaConfig.MessageTimestampTypeProp
+ val MessageTimestampDifferenceMaxMsProp = KafkaConfig.MessageTimestampDifferenceMaxMsProp
val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
@@ -125,16 +135,18 @@ object LogConfig {
"standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
"no compression; and 'producer' which means retain the original compression codec set by the producer."
val PreAllocateEnableDoc ="Should pre allocate file when create new segment?"
+ val MessageFormatVersionDoc = KafkaConfig.MessageFormatVersionDoc
+ val MessageTimestampTypeDoc = KafkaConfig.MessageTimestampTypeDoc
+ val MessageTimestampDifferenceMaxMsDoc = KafkaConfig.MessageTimestampDifferenceMaxMsDoc
private val configDef = {
- import ConfigDef.Range._
- import ConfigDef.ValidString._
- import ConfigDef.Type._
- import ConfigDef.Importance._
- import java.util.Arrays.asList
+ import org.apache.kafka.common.config.ConfigDef.Importance._
+ import org.apache.kafka.common.config.ConfigDef.Range._
+ import org.apache.kafka.common.config.ConfigDef.Type._
+ import org.apache.kafka.common.config.ConfigDef.ValidString._
new ConfigDef()
- .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinHeaderSize), MEDIUM, SegmentSizeDoc)
+ .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinMessageOverhead), MEDIUM, SegmentSizeDoc)
.define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM, SegmentMsDoc)
.define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc)
.define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc)
@@ -158,12 +170,15 @@ object LogConfig {
.define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
.define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable,
MEDIUM, PreAllocateEnableDoc)
+ .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc)
+ .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, MEDIUM, MessageTimestampTypeDoc)
+ .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc)
}
def apply(): LogConfig = LogConfig(new Properties())
def configNames() = {
- import JavaConversions._
+ import scala.collection.JavaConversions._
configDef.names().toList.sorted
}
@@ -182,7 +197,7 @@ object LogConfig {
* Check that property names are valid
*/
def validateNames(props: Properties) {
- import JavaConversions._
+ import scala.collection.JavaConversions._
val names = configDef.names()
for(name <- props.keys)
require(names.contains(name), "Unknown configuration \"%s\".".format(name))
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index aa37d52..9fc68a4 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -184,7 +184,7 @@ class LogSegment(val log: FileMessageSet,
case NoCompressionCodec =>
entry.offset
case _ =>
- ByteBufferMessageSet.deepIterator(entry.message).next().offset
+ ByteBufferMessageSet.deepIterator(entry).next().offset
}
index.append(startOffset, validBytes)
lastIndexEntry = validBytes
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 5a32de8..2867c78 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -23,28 +23,49 @@ import kafka.common.KafkaException
import java.nio.ByteBuffer
import java.nio.channels._
import java.io._
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import org.apache.kafka.common.errors.InvalidTimestampException
+import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
object ByteBufferMessageSet {
- private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = {
+ private def create(offsetAssignor: OffsetAssigner,
+ compressionCodec: CompressionCodec,
+ wrapperMessageTimestamp: Option[Long],
+ timestampType: TimestampType,
+ messages: Message*): ByteBuffer = {
if(messages.size == 0) {
MessageSet.Empty.buffer
} else if(compressionCodec == NoCompressionCodec) {
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for(message <- messages)
- writeMessage(buffer, message, offsetCounter.getAndIncrement)
+ writeMessage(buffer, message, offsetAssignor.nextAbsoluteOffset)
buffer.rewind()
buffer
} else {
+ val magicAndTimestamp = wrapperMessageTimestamp match {
+ case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
+ case None => MessageSet.magicAndLargestTimestamp(messages)
+ }
var offset = -1L
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
- messageWriter.write(codec = compressionCodec) { outputStream =>
+ messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
try {
for (message <- messages) {
- offset = offsetCounter.getAndIncrement
- output.writeLong(offset)
+ offset = offsetAssignor.nextAbsoluteOffset
+ if (message.magic != magicAndTimestamp.magic)
+ throw new IllegalArgumentException("Messages in the message set must have same magic value")
+ // Use inner offset if magic value is greater than 0
+ if (magicAndTimestamp.magic > Message.MagicValue_V0)
+ output.writeLong(offsetAssignor.toInnerOffset(offset))
+ else
+ output.writeLong(offset)
output.writeInt(message.size)
output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
}
@@ -59,40 +80,93 @@ object ByteBufferMessageSet {
}
}
- /** Deep iterator that decompresses the message sets in-place. */
- def deepIterator(wrapperMessage: Message): Iterator[MessageAndOffset] = {
+ /** Deep iterator that decompresses the message sets and adjusts timestamp and offset if needed. */
+ def deepIterator(wrapperMessageAndOffset: MessageAndOffset): Iterator[MessageAndOffset] = {
+
+ import Message._
+
new IteratorTemplate[MessageAndOffset] {
+ val wrapperMessageOffset = wrapperMessageAndOffset.offset
+ val wrapperMessage = wrapperMessageAndOffset.message
+ val wrapperMessageTimestampOpt: Option[Long] =
+ if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None
+ val wrapperMessageTimestampTypeOpt: Option[TimestampType] =
+ if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None
+ if (wrapperMessage.payload == null)
+ throw new KafkaException("Message payload is null: " + wrapperMessage)
val inputStream: InputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
val compressed: DataInputStream = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream))
+ var lastInnerOffset = -1L
- override def makeNext(): MessageAndOffset = {
+ val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
+ var innerMessageAndOffsets = new mutable.Queue[MessageAndOffset]()
try {
- // read the offset
- val offset = compressed.readLong()
- // read record size
- val size = compressed.readInt()
-
- if (size < Message.MinHeaderSize)
- throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")
-
- // read the record into an intermediate record buffer
- // and hence has to do extra copy
- val bufferArray = new Array[Byte](size)
- compressed.readFully(bufferArray, 0, size)
- val buffer = ByteBuffer.wrap(bufferArray)
-
- val newMessage = new Message(buffer)
-
- // the decompressed message should not be a wrapper message since we do not allow nested compression
- new MessageAndOffset(newMessage, offset)
+ while (true) {
+ innerMessageAndOffsets += readMessageFromStream()
+ }
} catch {
case eofe: EOFException =>
compressed.close()
- allDone()
case ioe: IOException =>
throw new KafkaException(ioe)
}
+ Some(innerMessageAndOffsets)
+ } else {
+ None
+ }
+
+ private def readMessageFromStream() = {
+ // read the offset
+ val innerOffset = compressed.readLong()
+ // read record size
+ val size = compressed.readInt()
+
+ if (size < MinMessageOverhead)
+ throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")
+
+ // read the record into an intermediate record buffer
+ // and hence has to do extra copy
+ val bufferArray = new Array[Byte](size)
+ compressed.readFully(bufferArray, 0, size)
+ val buffer = ByteBuffer.wrap(bufferArray)
+
+ // Override the timestamp if necessary
+ val newMessage = new Message(buffer, wrapperMessageTimestampOpt, wrapperMessageTimestampTypeOpt)
+
+ // Inner message and wrapper message must have same magic value
+ if (newMessage.magic != wrapperMessage.magic)
+ throw new IllegalStateException(s"Compressed message has magic value ${wrapperMessage.magic} " +
+ s"but inner message has magic value ${newMessage.magic}")
+ lastInnerOffset = innerOffset
+ new MessageAndOffset(newMessage, innerOffset)
+ }
+
+ override def makeNext(): MessageAndOffset = {
+ messageAndOffsets match {
+ // Using inner offset and timestamps
+ case Some(innerMessageAndOffsets) =>
+ if (innerMessageAndOffsets.isEmpty)
+ allDone()
+ else {
+ val messageAndOffset = innerMessageAndOffsets.dequeue()
+ val message = messageAndOffset.message
+ val relativeOffset = messageAndOffset.offset - lastInnerOffset
+ val absoluteOffset = wrapperMessageOffset + relativeOffset
+ new MessageAndOffset(message, absoluteOffset)
+ }
+ // Not using inner offset and timestamps
+ case None =>
+ try {
+ readMessageFromStream()
+ } catch {
+ case eofe: EOFException =>
+ compressed.close()
+ allDone()
+ case ioe: IOException =>
+ throw new KafkaException(ioe)
+ }
+ }
}
}
}
@@ -111,6 +185,20 @@ object ByteBufferMessageSet {
}
}
+private class OffsetAssigner(offsets: Seq[Long]) {
+ val index = new AtomicInteger(0)
+
+ def this(offsetCounter: AtomicLong, size: Int) {
+ this((offsetCounter.get() to offsetCounter.get + size).toSeq)
+ offsetCounter.addAndGet(size)
+ }
+
+ def nextAbsoluteOffset = offsets(index.getAndIncrement)
+
+ def toInnerOffset(offset: Long) = offset - offsets(0)
+
+}
+
/**
* A sequence of messages stored in a byte buffer
*
@@ -120,22 +208,87 @@ object ByteBufferMessageSet {
*
* Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
*
+ *
+ * When message format v1 is used, there will be following message format changes.
+ * - For non-compressed messages, with message v1 we are adding timestamp and timestamp type attribute. The offsets of
+ * the messages remain absolute offsets.
+ * - For Compressed messages, with message v1 we are adding timestamp, timestamp type attribute bit and using
+ * inner offsets (IO) for inner messages of compressed messages (see offset calculation details below). Timestamp type
+ * attribute is only set in wrapper messages. Inner messages always have CreateTime as timestamp type in attributes.
+ *
+ * The way timestamp set is following:
+ * For non-compressed messages: timestamp and timestamp type attribute in the messages are set and used.
+ * For compressed messages:
+ * 1. Wrapper messages' timestamp type attribute is set to proper value
+ * 2. Wrapper messages' timestamp is set to:
+ * - the max timestamp of inner messages if CreateTime is used
+ * - the current server time if wrapper message's timestamp = LogAppendTime.
+ * In this case the wrapper message timestamp is used and all the timestamps of inner messages are ignored.
+ * 3. Inner messages' timestamp will be:
+ * - used when wrapper message's timestamp type is CreateTime
+ * - ignored when wrapper message's timestamp type is LogAppendTime
+ * 4. Inner messages' timestamp type will always be ignored. However, producer must set the inner message timestamp
+ * type to CreateTime, otherwise the messages will be rejected by broker.
+ *
+ *
+ * The way absolute offset calculated is the following:
+ * Ideally the conversion from relative offset(RO) to absolute offset(AO) should be:
+ *
+ * AO = AO_Of_Last_Inner_Message + RO
+ *
+ * However, note that the message sets sent by producers are compressed in a streaming way.
+ * And the relative offset of an inner message compared with the last inner message is not known until
+ * the last inner message is written.
+ * Unfortunately we are not able to change the previously written messages after the last message is written to
+ * the message set when stream compressing is used.
+ *
+ * To solve this issue, we use the following solution:
+ *
+ * 1. When the producer creates a message set, it simply writes all the messages into a compressed message set with
+ * offset 0, 1, ... (inner offset).
+ * 2. The broker will set the offset of the wrapper message to the absolute offset of the last message in the
+ * message set.
+ * 3. When a consumer sees the message set, it first decompresses the entire message set to find out the inner
+ * offset (IO) of the last inner message. Then it computes RO and AO of previous messages:
+ *
+ * RO = IO_of_a_message - IO_of_the_last_message
+ * AO = AO_Of_Last_Inner_Message + RO
+ *
+ * 4. This solution works for compacted message set as well
+ *
*/
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {
private var shallowValidByteCount = -1
def this(compressionCodec: CompressionCodec, messages: Message*) {
- this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*))
+ this(ByteBufferMessageSet.create(new OffsetAssigner(new AtomicLong(0), messages.size), compressionCodec,
+ None, TimestampType.CREATE_TIME, messages:_*))
}
def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) {
- this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*))
+ this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, messages.size), compressionCodec,
+ None, TimestampType.CREATE_TIME, messages:_*))
+ }
+
+ def this(compressionCodec: CompressionCodec, offsetSeq: Seq[Long], messages: Message*) {
+ this(ByteBufferMessageSet.create(new OffsetAssigner(offsetSeq), compressionCodec,
+ None, TimestampType.CREATE_TIME, messages:_*))
}
def this(messages: Message*) {
this(NoCompressionCodec, new AtomicLong(0), messages: _*)
}
+ // This constructor is only used internally
+ private[kafka] def this(compressionCodec: CompressionCodec,
+ offsetCounter: AtomicLong,
+ wrapperMessageTimestamp: Option[Long],
+ timestampType: TimestampType,
+ messages: Message*) {
+ this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, messages.size), compressionCodec,
+ wrapperMessageTimestamp, timestampType, messages:_*))
+ }
+
def getBuffer = buffer
private def shallowValidBytes: Int = {
@@ -162,6 +315,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
written
}
+ override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
+ for (messageAndOffset <- shallowIterator) {
+ if (messageAndOffset.message.magic != expectedMagicValue)
+ return false
+ }
+ true
+ }
+
/** default iterator that iterates over decompressed messages */
override def iterator: Iterator[MessageAndOffset] = internalIterator()
@@ -182,7 +343,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
return allDone()
val offset = topIter.getLong()
val size = topIter.getInt()
- if(size < Message.MinHeaderSize)
+ if(size < Message.MinMessageOverhead)
throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")
// we have an incomplete message
@@ -194,7 +355,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
message.limit(size)
topIter.position(topIter.position + size)
val newMessage = new Message(message)
-
if(isShallow) {
new MessageAndOffset(newMessage, offset)
} else {
@@ -203,7 +363,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
innerIter = null
new MessageAndOffset(newMessage, offset)
case _ =>
- innerIter = ByteBufferMessageSet.deepIterator(newMessage)
+ innerIter = ByteBufferMessageSet.deepIterator(new MessageAndOffset(newMessage, offset))
if(!innerIter.hasNext)
innerIter = null
makeNext()
@@ -226,48 +386,205 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
}
/**
- * Update the offsets for this message set and do further validation on messages. This method attempts to do an
- * in-place conversion if there is no compression, but otherwise recopies the messages
+ * Update the offsets for this message set and do further validation on messages including:
+ * 1. Messages for compacted topics must have keys
+ * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
+ * starting from 0.
+ * 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
+ *
+ * This method will convert the messages in the following scenarios:
+ * A. Magic value of a message = 0 and messageFormatVersion is 1
+ * B. Magic value of a message = 1 and messageFormatVersion is 0
+ *
+ * If no format conversion or value overwriting is required for messages, this method will perform in-place
+ * operations and avoid re-compression.
*/
private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
+ now: Long = System.currentTimeMillis(),
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
- compactedTopic: Boolean = false): ByteBufferMessageSet = {
- if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
- // do in-place validation and offset assignment
- var messagePosition = 0
- buffer.mark()
- while(messagePosition < sizeInBytes - MessageSet.LogOverhead) {
- buffer.position(messagePosition)
- buffer.putLong(offsetCounter.getAndIncrement())
- val messageSize = buffer.getInt()
- val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength
- if (compactedTopic && positionAfterKeySize < sizeInBytes) {
- buffer.position(buffer.position() + Message.KeySizeOffset)
- val keySize = buffer.getInt()
- if (keySize <= 0) {
- buffer.reset()
- throw new InvalidMessageException("Compacted topic cannot accept message without key.")
- }
- }
- messagePosition += MessageSet.LogOverhead + messageSize
+ compactedTopic: Boolean = false,
+ messageFormatVersion: Byte = Message.CurrentMagicValue,
+ messageTimestampType: TimestampType,
+ messageTimestampDiffMaxMs: Long): ByteBufferMessageSet = {
+ if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
+ // check the magic value
+ if (!magicValueInAllWrapperMessages(messageFormatVersion)) {
+ // Message format conversion
+ convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
+ messageFormatVersion)
+ } else {
+ // Do in-place validation, offset assignment and maybe set timestamp
+ validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
+ messageTimestampDiffMaxMs)
}
- buffer.reset()
- this
+
} else {
- // We need to deep-iterate over the message-set if any of these are true:
- // (i) messages are compressed
- // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec
- val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
- if (compactedTopic && !messageAndOffset.message.hasKey)
- throw new InvalidMessageException("Compacted topic cannot accept message without key.")
-
- messageAndOffset.message
+ // Deal with compressed messages
+ // We cannot do in place assignment in one of the following situations:
+ // 1. Source and target compression codec are different
+ // 2. When magic value to use is 0 because offsets need to be overwritten
+ // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.
+ // 4. Message format conversion is needed.
+
+ // No in place assignment situation 1 and 2
+ var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
+
+ var maxTimestamp = Message.NoTimestamp
+ val expectedInnerOffset = new AtomicLong(0)
+ val validatedMessages = new ListBuffer[Message]
+ this.internalIterator(isShallow = false).foreach(messageAndOffset => {
+ val message = messageAndOffset.message
+ validateMessageKey(message, compactedTopic)
+ if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
+ // No in place assignment situation 3
+ // Validate the timestamp
+ validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)
+ // Check if we need to overwrite offset
+ if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement)
+ inPlaceAssignment = false
+ maxTimestamp = math.max(maxTimestamp, message.timestamp)
+ }
+
+ // No in place assignment situation 4
+ if (message.magic != messageFormatVersion)
+ inPlaceAssignment = false
+
+ validatedMessages += message.toFormatVersion(messageFormatVersion)
})
- new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
+ if (!inPlaceAssignment) {
+ // Cannot do in place assignment.
+ val wrapperMessageTimestamp = {
+ if (messageFormatVersion == Message.MagicValue_V0)
+ Some(Message.NoTimestamp)
+ else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
+ Some(maxTimestamp)
+ else // Log append time
+ Some(now)
+ }
+
+ new ByteBufferMessageSet(compressionCodec = targetCodec,
+ offsetCounter = offsetCounter,
+ wrapperMessageTimestamp = wrapperMessageTimestamp,
+ timestampType = messageTimestampType,
+ messages = validatedMessages.toBuffer: _*)
+ } else {
+ // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
+ buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
+ // validate the messages
+ validatedMessages.foreach(_.ensureValid())
+
+ var crcUpdateNeeded = true
+ val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset
+ val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
+ val timestamp = buffer.getLong(timestampOffset)
+ val attributes = buffer.get(attributeOffset)
+ if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
+ // We don't need to recompute crc if the timestamp is not updated.
+ crcUpdateNeeded = false
+ else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
+ // Set timestamp type and timestamp
+ buffer.putLong(timestampOffset, now)
+ buffer.put(attributeOffset, TimestampType.setTimestampType(attributes, TimestampType.LOG_APPEND_TIME))
+ }
+
+ if (crcUpdateNeeded) {
+ // need to recompute the crc value
+ buffer.position(MessageSet.LogOverhead)
+ val wrapperMessage = new Message(buffer.slice())
+ Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum())
+ }
+ buffer.rewind()
+ this
+ }
+ }
+ }
+
+ // We create this method to save memory copy operation. It reads from the original message set and directly
+ // writes the converted messages into new message set buffer. Hence we don't need to allocate memory for each
+ // individual message during message format conversion.
+ private def convertNonCompressedMessages(offsetCounter: AtomicLong,
+ compactedTopic: Boolean,
+ now: Long,
+ timestampType: TimestampType,
+ messageTimestampDiffMaxMs: Long,
+ toMagicValue: Byte): ByteBufferMessageSet = {
+ val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).foldLeft(0)(
+ (sizeDiff, messageAndOffset) => sizeDiff + Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue))
+ val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
+ var newMessagePosition = 0
+ this.internalIterator(isShallow = true).foreach {messageAndOffset =>
+ val message = messageAndOffset.message
+ validateMessageKey(message, compactedTopic)
+ validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs)
+ newBuffer.position(newMessagePosition)
+ // write offset.
+ newBuffer.putLong(offsetCounter.getAndIncrement)
+ // Write new message size
+ val newMessageSize = message.size + Message.headerSizeDiff(message.magic, toMagicValue)
+ newBuffer.putInt(newMessageSize)
+ // Create new message buffer
+ val newMessageBuffer = newBuffer.slice()
+ newMessageBuffer.limit(newMessageSize)
+ // Convert message
+ message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType)
+
+ newMessagePosition += MessageSet.LogOverhead + newMessageSize
}
+ newBuffer.rewind()
+ new ByteBufferMessageSet(newBuffer)
}
+ private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: AtomicLong,
+ now: Long,
+ compactedTopic: Boolean,
+ timestampType: TimestampType,
+ timestampDiffMaxMs: Long): ByteBufferMessageSet = {
+ // do in-place validation and offset assignment
+ var messagePosition = 0
+ buffer.mark()
+ while (messagePosition < sizeInBytes - MessageSet.LogOverhead) {
+ buffer.position(messagePosition)
+ buffer.putLong(offsetCounter.getAndIncrement())
+ val messageSize = buffer.getInt()
+ val messageBuffer = buffer.slice()
+ messageBuffer.limit(messageSize)
+ val message = new Message(messageBuffer)
+ validateMessageKey(message, compactedTopic)
+ if (message.magic > Message.MagicValue_V0) {
+ validateTimestamp(message, now, timestampType, timestampDiffMaxMs)
+ if (timestampType == TimestampType.LOG_APPEND_TIME) {
+ message.buffer.putLong(Message.TimestampOffset, now)
+ message.buffer.put(Message.AttributesOffset, TimestampType.setTimestampType(message.attributes, TimestampType.LOG_APPEND_TIME))
+ Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum())
+ }
+ }
+ messagePosition += MessageSet.LogOverhead + messageSize
+ }
+ buffer.reset()
+ this
+ }
+
+ private def validateMessageKey(message: Message, compactedTopic: Boolean) {
+ if (compactedTopic && !message.hasKey)
+ throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+ }
+
+ /**
+ * This method validates the timestamps of a message.
+ * If the message is using create time, this method checks if it is within acceptable range.
+ */
+ private def validateTimestamp(message: Message,
+ now: Long,
+ timestampType: TimestampType,
+ timestampDiffMaxMs: Long) {
+ if (timestampType == TimestampType.CREATE_TIME && math.abs(message.timestamp - now) > timestampDiffMaxMs)
+ throw new InvalidTimestampException(s"Timestamp ${message.timestamp} of message is out of range. " +
+ s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}")
+ if (message.timestampType == TimestampType.LOG_APPEND_TIME)
+ throw new InvalidTimestampException(s"Invalid timestamp type in message $message. Producer should not set " +
+ s"timestamp type to LogAppendTime.")
+ }
/**
* The total number of bytes in this message set, including any partial trailing messages
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 999b115..51aa11a 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -18,6 +18,9 @@
package kafka.message
import java.nio._
+
+import org.apache.kafka.common.record.TimestampType
+
import scala.math._
import kafka.utils._
import org.apache.kafka.common.utils.Utils
@@ -26,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
* Constants related to messages
*/
object Message {
-
+
/**
* The current offset and size for all the fixed-length fields
*/
@@ -36,83 +39,144 @@ object Message {
val MagicLength = 1
val AttributesOffset = MagicOffset + MagicLength
val AttributesLength = 1
- val KeySizeOffset = AttributesOffset + AttributesLength
+ // Only message format version 1 has the timestamp field.
+ val TimestampOffset = AttributesOffset + AttributesLength
+ val TimestampLength = 8
+ val KeySizeOffset_V0 = AttributesOffset + AttributesLength
+ val KeySizeOffset_V1 = TimestampOffset + TimestampLength
val KeySizeLength = 4
- val KeyOffset = KeySizeOffset + KeySizeLength
+ val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
+ val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
val ValueSizeLength = 4
-
- /** The amount of overhead bytes in a message */
- val MessageOverhead = KeyOffset + ValueSizeLength
-
+
+ private val MessageHeaderSizeMap = Map (
+ 0.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength),
+ 1.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength))
+
/**
- * The minimum valid size for the message header
+ * The amount of overhead bytes in a message
+ * This value is only used to check if the message size is valid or not. So the minimum possible message bytes is
+ * used here, which comes from a message in message format V0 with empty key and value.
*/
- val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength
+ val MinMessageOverhead = KeyOffset_V0 + ValueSizeLength
/**
- * The current "magic" value
+ * The "magic" value
+ * When magic value is 0, the message uses absolute offset and does not have a timestamp field.
+ * When magic value is 1, the message uses relative offset and has a timestamp field.
*/
- val CurrentMagicValue: Byte = 0
+ val MagicValue_V0: Byte = 0
+ val MagicValue_V1: Byte = 1
+ val CurrentMagicValue: Byte = 1
/**
* Specifies the mask for the compression code. 3 bits to hold the compression codec.
* 0 is reserved to indicate no compression
*/
val CompressionCodeMask: Int = 0x07
+ /**
+ * Specifies the mask for timestamp type. 1 bit at the 4th least significant bit.
+ * 0 for CreateTime, 1 for LogAppendTime
+ */
+ val TimestampTypeMask: Byte = 0x08
+ val TimestampTypeAttributeBitOffset: Int = 3
/**
* Compression code for uncompressed messages
*/
val NoCompression: Int = 0
+ /**
+ * To indicate timestamp is not defined so "magic" value 0 will be used.
+ */
+ val NoTimestamp: Long = -1
+
+ /**
+ * Give the header size difference between different message versions.
+ */
+ def headerSizeDiff(fromMagicValue: Byte, toMagicValue: Byte) : Int =
+ MessageHeaderSizeMap(toMagicValue) - MessageHeaderSizeMap(fromMagicValue)
+
+
}
/**
* A message. The format of an N byte message is the following:
*
* 1. 4 byte CRC32 of the message
- * 2. 1 byte "magic" identifier to allow format changes, value is 0 currently
- * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
- * 4. 4 byte key length, containing length K
- * 5. K byte key
- * 6. 4 byte payload length, containing length V
- * 7. V byte payload
+ * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
+ * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
+ * bit 0 ~ 2 : Compression codec.
+ * 0 : no compression
+ * 1 : gzip
+ * 2 : snappy
+ * 3 : lz4
+ * bit 3 : Timestamp type
+ * 0 : create time
+ * 1 : log append time
+ * bit 4 ~ 7 : reserved
+ * 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
+ * 5. 4 byte key length, containing length K
+ * 6. K byte key
+ * 7. 4 byte payload length, containing length V
+ * 8. V byte payload
*
* Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents.
+ * @param buffer the byte buffer of this message.
+ * @param wrapperMessageTimestamp the wrapper message timestamp, only not None when the message is an inner message
+ * of a compressed message.
+ * @param wrapperMessageTimestampType the wrapper message timestamp type, only not None when the message is an inner
+ * message of a compressed message.
*/
-class Message(val buffer: ByteBuffer) {
+class Message(val buffer: ByteBuffer,
+ private val wrapperMessageTimestamp: Option[Long] = None,
+ private val wrapperMessageTimestampType: Option[TimestampType] = None) {
import kafka.message.Message._
-
+
/**
* A constructor to create a Message
* @param bytes The payload of the message
- * @param codec The compression codec used on the contents of the message (if any)
* @param key The key of the message (null, if none)
+ * @param timestamp The timestamp of the message.
+ * @param timestampType The timestamp type of the message.
+ * @param codec The compression codec used on the contents of the message (if any)
* @param payloadOffset The offset into the payload array used to extract payload
* @param payloadSize The size of the payload to use
+ * @param magicValue the magic value to use
*/
def this(bytes: Array[Byte],
- key: Array[Byte],
+ key: Array[Byte],
+ timestamp: Long,
+ timestampType: TimestampType,
codec: CompressionCodec,
payloadOffset: Int,
- payloadSize: Int) = {
- this(ByteBuffer.allocate(Message.CrcLength +
- Message.MagicLength +
- Message.AttributesLength +
+ payloadSize: Int,
+ magicValue: Byte) = {
+ this(ByteBuffer.allocate(Message.CrcLength +
+ Message.MagicLength +
+ Message.AttributesLength +
+ (if (magicValue == Message.MagicValue_V0) 0
+ else Message.TimestampLength) +
Message.KeySizeLength +
(if(key == null) 0 else key.length) +
Message.ValueSizeLength +
(if(bytes == null) 0
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset)))
+ validateTimestampAndMagicValue(timestamp, magicValue)
// skip crc, we will fill that in at the end
buffer.position(MagicOffset)
- buffer.put(CurrentMagicValue)
+ buffer.put(magicValue)
var attributes: Byte = 0
- if (codec.codec > 0)
- attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
+ if (codec.codec > 0) {
+ attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
+ attributes = TimestampType.setTimestampType(attributes, timestampType)
+ }
buffer.put(attributes)
+ // Only put timestamp when "magic" value is greater than 0
+ if (magic > MagicValue_V0)
+ buffer.putLong(timestamp)
if(key == null) {
buffer.putInt(-1)
} else {
@@ -126,22 +190,25 @@ class Message(val buffer: ByteBuffer) {
if(bytes != null)
buffer.put(bytes, payloadOffset, size)
buffer.rewind()
-
+
// now compute the checksum and fill it in
Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
}
- def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) =
- this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1)
+ def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
+ this(bytes = bytes, key = key, timestamp = timestamp, timestampType = TimestampType.CREATE_TIME, codec = codec, payloadOffset = 0, payloadSize = -1, magicValue = magicValue)
- def this(bytes: Array[Byte], codec: CompressionCodec) =
- this(bytes = bytes, key = null, codec = codec)
+ def this(bytes: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
+ this(bytes = bytes, key = null, timestamp = timestamp, codec = codec, magicValue = magicValue)
- def this(bytes: Array[Byte], key: Array[Byte]) =
- this(bytes = bytes, key = key, codec = NoCompressionCodec)
+ def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, magicValue: Byte) =
+ this(bytes = bytes, key = key, timestamp = timestamp, codec = NoCompressionCodec, magicValue = magicValue)
- def this(bytes: Array[Byte]) =
- this(bytes = bytes, key = null, codec = NoCompressionCodec)
+ def this(bytes: Array[Byte], timestamp: Long, magicValue: Byte) =
+ this(bytes = bytes, key = null, timestamp = timestamp, codec = NoCompressionCodec, magicValue = magicValue)
+
+ def this(bytes: Array[Byte]) =
+ this(bytes = bytes, key = null, timestamp = Message.NoTimestamp, codec = NoCompressionCodec, magicValue = Message.CurrentMagicValue)
/**
* Compute the checksum of the message from the message contents
@@ -171,11 +238,19 @@ class Message(val buffer: ByteBuffer) {
* The complete serialized size of this message in bytes (including crc, header attributes, etc)
*/
def size: Int = buffer.limit
-
+
+ /**
+ * The position where the key size is stored.
+ */
+ def keySizeOffset = {
+ if (magic == MagicValue_V0) KeySizeOffset_V0
+ else KeySizeOffset_V1
+ }
+
/**
* The length of the key in bytes
*/
- def keySize: Int = buffer.getInt(Message.KeySizeOffset)
+ def keySize: Int = buffer.getInt(keySizeOffset)
/**
* Does the message have a key?
@@ -185,7 +260,10 @@ class Message(val buffer: ByteBuffer) {
/**
* The position where the payload size is stored
*/
- private def payloadSizeOffset = Message.KeyOffset + max(0, keySize)
+ def payloadSizeOffset = {
+ if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize)
+ else KeyOffset_V1 + max(0, keySize)
+ }
/**
* The length of the message value in bytes
@@ -206,6 +284,33 @@ class Message(val buffer: ByteBuffer) {
* The attributes stored with this message
*/
def attributes: Byte = buffer.get(AttributesOffset)
+
+ /**
+ * The timestamp of the message, only available when the "magic" value is greater than 0
+ * When magic > 0, The timestamp of a message is determined in the following way:
+ * 1. wrapperMessageTimestampType = None and wrapperMessageTimestamp is None - Uncompressed message, timestamp and timestamp type are in the message.
+ * 2. wrapperMessageTimestampType = LogAppendTime and wrapperMessageTimestamp is defined - Compressed message using LogAppendTime
+ * 3. wrapperMessageTimestampType = CreateTime and wrapperMessageTimestamp is defined - Compressed message using CreateTime
+ */
+ def timestamp: Long = {
+ if (magic == MagicValue_V0)
+ Message.NoTimestamp
+ // Case 2
+ else if (wrapperMessageTimestampType.exists(_ == TimestampType.LOG_APPEND_TIME) && wrapperMessageTimestamp.isDefined)
+ wrapperMessageTimestamp.get
+ else // case 1, 3
+ buffer.getLong(Message.TimestampOffset)
+ }
+
+ /**
+ * The timestamp type of the message
+ */
+ def timestampType = {
+ if (magic == MagicValue_V0)
+ TimestampType.NO_TIMESTAMP_TYPE
+ else
+ wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes))
+ }
/**
* The compression codec used with this message
@@ -221,8 +326,54 @@ class Message(val buffer: ByteBuffer) {
/**
* A ByteBuffer containing the message key
*/
- def key: ByteBuffer = sliceDelimited(KeySizeOffset)
-
+ def key: ByteBuffer = sliceDelimited(keySizeOffset)
+
+ /**
+ * convert the message to specified format
+ */
+ def toFormatVersion(toMagicValue: Byte): Message = {
+ if (magic == toMagicValue)
+ this
+ else {
+ val byteBuffer = ByteBuffer.allocate(size + Message.headerSizeDiff(magic, toMagicValue))
+ // Copy bytes from old messages to new message
+ convertToBuffer(toMagicValue, byteBuffer)
+ new Message(byteBuffer)
+ }
+ }
+
+ def convertToBuffer(toMagicValue: Byte,
+ byteBuffer: ByteBuffer,
+ now: Long = NoTimestamp,
+ timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes))) {
+ if (byteBuffer.remaining() < size + headerSizeDiff(magic, toMagicValue))
+ throw new IndexOutOfBoundsException("The byte buffer does not have enough capacity to hold new message format " +
+ "version " + toMagicValue)
+ if (toMagicValue == Message.MagicValue_V1) {
+ // Up-conversion, reserve CRC and update magic byte
+ byteBuffer.position(Message.MagicOffset)
+ byteBuffer.put(Message.MagicValue_V1)
+ byteBuffer.put(TimestampType.setTimestampType(attributes, timestampType))
+ // Up-conversion, insert the timestamp field
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ byteBuffer.putLong(now)
+ else
+ byteBuffer.putLong(Message.NoTimestamp)
+ byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V0, size - Message.KeySizeOffset_V0)
+ } else {
+ // Down-conversion, reserve CRC and update magic byte
+ byteBuffer.position(Message.MagicOffset)
+ byteBuffer.put(Message.MagicValue_V0)
+ byteBuffer.put(TimestampType.setTimestampType(attributes, TimestampType.CREATE_TIME))
+ // Down-conversion, skip the timestamp field
+ byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V1, size - Message.KeySizeOffset_V1)
+ }
+ // update crc value
+ val newMessage = new Message(byteBuffer)
+ Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum())
+ byteBuffer.rewind()
+ }
+
/**
* Read a size-delimited byte buffer starting at the given offset
*/
@@ -240,9 +391,25 @@ class Message(val buffer: ByteBuffer) {
}
}
- override def toString(): String =
- "Message(magic = %d, attributes = %d, crc = %d, key = %s, payload = %s)".format(magic, attributes, checksum, key, payload)
-
+ /**
+ * Validate the timestamp and "magic" value
+ */
+ private def validateTimestampAndMagicValue(timestamp: Long, magic: Byte) {
+ if (magic != MagicValue_V0 && magic != MagicValue_V1)
+ throw new IllegalArgumentException("Invalid magic value " + magic)
+ if (timestamp < 0 && timestamp != NoTimestamp)
+ throw new IllegalArgumentException("Invalid message timestamp " + timestamp)
+ if (magic == MagicValue_V0 && timestamp != NoTimestamp)
+ throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
+ }
+
+ override def toString(): String = {
+ if (magic == MagicValue_V0)
+ s"Message(magic = $magic, attributes = $attributes, crc = $checksum, key = $key, payload = $payload)"
+ else
+ s"Message(magic = $magic, attributes = $attributes, $timestampType = $timestamp, crc = $checksum, key = $key, payload = $payload)"
+ }
+
override def equals(any: Any): Boolean = {
any match {
case that: Message => this.buffer.equals(that.buffer)
@@ -251,5 +418,5 @@ class Message(val buffer: ByteBuffer) {
}
override def hashCode(): Int = buffer.hashCode
-
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/MessageAndMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
index 26b75c8..ac9ef77 100755
--- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala
+++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
@@ -18,12 +18,17 @@
package kafka.message
import kafka.serializer.Decoder
+import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Utils
-case class MessageAndMetadata[K, V](topic: String, partition: Int,
- private val rawMessage: Message, offset: Long,
+case class MessageAndMetadata[K, V](topic: String,
+ partition: Int,
+ private val rawMessage: Message,
+ offset: Long,
+ timestamp: Long = Message.NoTimestamp,
+ timestampType: TimestampType = TimestampType.CREATE_TIME,
keyDecoder: Decoder[K], valueDecoder: Decoder[V]) {
-
+
/**
* Return the decoded message key and payload
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index 28b56e6..014788a 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -54,8 +54,26 @@ object MessageSet {
*/
def entrySize(message: Message): Int = LogOverhead + message.size
+ /**
+ * Validate the "magic" values of messages are the same in a compressed message set and return the magic value of
+ * and the max timestamp of the inner messages.
+ */
+ def magicAndLargestTimestamp(messages: Seq[Message]): MagicAndTimestamp = {
+ val firstMagicValue = messages.head.magic
+ var largestTimestamp: Long = Message.NoTimestamp
+ for (message <- messages) {
+ if (message.magic != firstMagicValue)
+ throw new IllegalStateException("Messages in the same message set must have same magic value")
+ if (firstMagicValue > Message.MagicValue_V0)
+ largestTimestamp = math.max(largestTimestamp, message.timestamp)
+ }
+ MagicAndTimestamp(firstMagicValue, largestTimestamp)
+ }
+
}
+case class MagicAndTimestamp(magic: Byte, timestamp: Long)
+
/**
* A set of messages with offsets. A message set has a fixed serialized form, though the container
* for the bytes could be either in-memory or on disk. The format of each message is
@@ -70,7 +88,12 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
* Less than the complete amount may be written, but no more than maxSize can be. The number
* of bytes written is returned */
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
-
+
+ /**
+ * Check if all the wrapper messages in the message set have the expected magic value
+ */
+ def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean
+
/**
* Provides an iterator over the message/offset pairs in this set
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/message/MessageWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageWriter.scala b/core/src/main/scala/kafka/message/MessageWriter.scala
index 0c6040e..660772c 100755
--- a/core/src/main/scala/kafka/message/MessageWriter.scala
+++ b/core/src/main/scala/kafka/message/MessageWriter.scala
@@ -20,19 +20,31 @@ package kafka.message
import java.io.{InputStream, OutputStream}
import java.nio.ByteBuffer
+import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Crc32
class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize) {
import Message._
- def write(key: Array[Byte] = null, codec: CompressionCodec)(writePayload: OutputStream => Unit): Unit = {
+ def write(key: Array[Byte] = null,
+ codec: CompressionCodec,
+ timestamp: Long,
+ timestampType: TimestampType,
+ magicValue: Byte)(writePayload: OutputStream => Unit): Unit = {
withCrc32Prefix {
- write(CurrentMagicValue)
+ // write magic value
+ write(magicValue)
+ // write attributes
var attributes: Byte = 0
if (codec.codec > 0)
attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
+ if (magicValue > MagicValue_V0)
+ attributes = TimestampType.setTimestampType(attributes, timestampType)
write(attributes)
+ // Write timestamp
+ if (magicValue > MagicValue_V0)
+ writeLong(timestamp)
// write the key
if (key == null) {
writeInt(-1)
@@ -61,6 +73,17 @@ class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize)
out.write(value)
}
+ private def writeLong(value: Long): Unit = {
+ write((value >>> 56).toInt)
+ write((value >>> 48).toInt)
+ write((value >>> 40).toInt)
+ write((value >>> 32).toInt)
+ write((value >>> 24).toInt)
+ write((value >>> 16).toInt)
+ write((value >>> 8).toInt)
+ write(value.toInt)
+ }
+
private def withCrc32Prefix(writeData: => Unit): Unit = {
// get a writer for CRC value
val crcWriter = reserve(CrcLength)
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 4e67ba4..7abe48a 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -21,7 +21,7 @@ import kafka.common._
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
-import kafka.utils.{CoreUtils, Logging, SystemTime}
+import kafka.utils._
import org.apache.kafka.common.errors.{LeaderNotAvailableException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.protocol.Errors
import scala.util.Random
@@ -36,8 +36,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val encoder: Encoder[V],
private val keyEncoder: Encoder[K],
private val producerPool: ProducerPool,
- private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
+ private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
+ private val time: Time = SystemTime)
extends EventHandler[K,V] with Logging {
+
val isSync = ("sync" == config.producerType)
val correlationId = new AtomicInteger(0)
@@ -129,9 +131,22 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
events.foreach{e =>
try {
if(e.hasKey)
- serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))
+ serializedMessages += new KeyedMessage[K,Message](
+ topic = e.topic,
+ key = e.key,
+ partKey = e.partKey,
+ message = new Message(key = keyEncoder.toBytes(e.key),
+ bytes = encoder.toBytes(e.message),
+ timestamp = time.milliseconds,
+ magicValue = Message.MagicValue_V1))
else
- serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message)))
+ serializedMessages += new KeyedMessage[K,Message](
+ topic = e.topic,
+ key = e.key,
+ partKey = e.partKey,
+ message = new Message(bytes = encoder.toBytes(e.message),
+ timestamp = time.milliseconds,
+ magicValue = Message.MagicValue_V1))
} catch {
case t: Throwable =>
producerStats.serializationErrorRate.mark()
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b3873a6..8b688b9 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -104,7 +104,7 @@ abstract class AbstractFetcherThread(name: String,
} catch {
case t: Throwable =>
if (isRunning.get) {
- warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
+ warn(s"Error in fetch $fetchRequest", t)
inLock(partitionMapLock) {
partitionsWithError ++= partitionMap.keys
// there is an error occurred while fetching partitions, sleep a while
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index bc599a0..9343fde 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -19,8 +19,10 @@ package kafka.server
import java.util.Properties
+import kafka.api.ApiVersion
import kafka.common.TopicAndPartition
import kafka.log.{Log, LogConfig, LogManager}
+import kafka.utils.Logging
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.protocol.ApiKeys
@@ -38,12 +40,22 @@ trait ConfigHandler {
* The TopicConfigHandler will process topic config changes in ZK.
* The callback provides the topic name and the full properties set read from ZK
*/
-class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler {
+class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig) extends ConfigHandler with Logging {
def processConfigChanges(topic : String, topicConfig : Properties) {
val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case (topicAndPartition, log) => topicAndPartition.topic }
.mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) }
+ // Validate the compatibility of message format version.
+ Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)) match {
+ case Some(versionString) =>
+ if (!kafkaConfig.interBrokerProtocolVersion.onOrAfter(ApiVersion(versionString))) {
+ topicConfig.remove(LogConfig.MessageFormatVersionProp)
+ warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for $topic because $versionString " +
+ s"is not compatible with Kafka inter broker protocol version ${kafkaConfig.interBrokerProtocolVersion}")
+ }
+ case _ =>
+ }
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f2e9533..bd02630 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,7 @@ import kafka.common._
import kafka.controller.KafkaController
import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
import kafka.log._
-import kafka.message.{ByteBufferMessageSet, MessageSet}
+import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
import kafka.network._
import kafka.network.RequestChannel.{Session, Response}
import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
@@ -35,7 +35,7 @@ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
ClusterAuthorizationException}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ProtoUtils, ApiKeys, Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
@@ -330,7 +330,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
- val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1))
+ val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ =>
+ new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp))
var errorInResponse = false
@@ -367,7 +368,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val respHeader = new ResponseHeader(request.header.correlationId)
val respBody = request.header.apiVersion match {
case 0 => new ProduceResponse(mergedResponseStatus.asJava)
- case 1 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs)
+ case 1 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, 1)
+ case 2 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, 2)
// This case shouldn't happen unless a new version of ProducerRequest is added without
// updating this part of the code to handle it properly.
case _ => throw new IllegalArgumentException("Version %d of ProducerRequest is not handled. Code must be updated."
@@ -426,7 +428,32 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a fetch response
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
- val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus
+
+ val convertedResponseStatus =
+ // Need to down-convert message when consumer only takes magic value 0.
+ if (fetchRequest.versionId <= 1) {
+ responsePartitionData.map({ case (tp, data) =>
+ tp -> {
+ // We only do down-conversion when:
+ // 1. The message format version configured for the topic is using magic value > 0, and
+ // 2. The message set contains message whose magic > 0
+ // This is to reduce the message format conversion as much as possible. The conversion will only occur
+ // when new message format is used for the topic and we see an old request.
+ // Please notice that if the message format is changed from a higher version back to lower version this
+ // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
+ // without format down conversion.
+ if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
+ !data.messages.magicValueInAllWrapperMessages(Message.MagicValue_V0)) {
+ trace("Down converting message to V0 for fetch request from " + fetchRequest.clientId)
+ new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
+ } else
+ data
+ }
+ })
+ } else
+ responsePartitionData
+
+ val mergedResponseStatus = convertedResponseStatus ++ unauthorizedResponseStatus
mergedResponseStatus.foreach { case (topicAndPartition, data) =>
if (data.error != Errors.NONE.code) {
@@ -440,6 +467,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def fetchResponseCallback(delayTimeMs: Int) {
+ trace(s"Sending fetch response to ${fetchRequest.clientId} with ${convertedResponseStatus.values.map(_.messages.sizeInBytes).sum}" +
+ s" bytes")
val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
}
@@ -453,10 +482,9 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchResponseCallback(0)
} else {
quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
- FetchResponse.responseSize(responsePartitionData
- .groupBy(_._1.topic),
- fetchRequest.versionId),
- fetchResponseCallback)
+ FetchResponse.responseSize(responsePartitionData.groupBy(_._1.topic),
+ fetchRequest.versionId),
+ fetchResponseCallback)
}
}