You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/26 03:08:28 UTC
kafka git commit: MINOR: Validate inner message compression attribute
Repository: kafka
Updated Branches:
refs/heads/trunk af1699bcc -> 62945280a
MINOR: Validate inner message compression attribute
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #976 from ijuma/validate-inner-message-compression-attribute
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62945280
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62945280
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62945280
Branch: refs/heads/trunk
Commit: 62945280a7fcfd69840a115fec16698bd8254323
Parents: af1699b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Feb 25 18:08:20 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 25 18:08:20 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/message/ByteBufferMessageSet.scala | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/62945280/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 856f971..6f38715 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -420,11 +420,12 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
var maxTimestamp = Message.NoTimestamp
- val expectedInnerOffset = new AtomicLong(0)
+ val expectedInnerOffset = new LongRef(0)
val validatedMessages = new mutable.ArrayBuffer[Message]
this.internalIterator(isShallow = false).foreach { messageAndOffset =>
val message = messageAndOffset.message
validateMessageKey(message, compactedTopic)
+
if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
// No in place assignment situation 3
// Validate the timestamp
@@ -435,12 +436,17 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
maxTimestamp = math.max(maxTimestamp, message.timestamp)
}
+ if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
+ throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +
+ s"compression attribute set: $message")
+
// No in place assignment situation 4
if (message.magic != messageFormatVersion)
inPlaceAssignment = false
validatedMessages += message.toFormatVersion(messageFormatVersion)
}
+
if (!inPlaceAssignment) {
// Cannot do in place assignment.
val wrapperMessageTimestamp = {