You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/26 03:08:28 UTC

kafka git commit: MINOR: Validate inner message compression attribute

Repository: kafka
Updated Branches:
  refs/heads/trunk af1699bcc -> 62945280a


MINOR: Validate inner message compression attribute

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #976 from ijuma/validate-inner-message-compression-attribute


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62945280
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62945280
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62945280

Branch: refs/heads/trunk
Commit: 62945280a7fcfd69840a115fec16698bd8254323
Parents: af1699b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Feb 25 18:08:20 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 25 18:08:20 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62945280/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 856f971..6f38715 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -420,11 +420,12 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
 
       var maxTimestamp = Message.NoTimestamp
-      val expectedInnerOffset = new AtomicLong(0)
+      val expectedInnerOffset = new LongRef(0)
       val validatedMessages = new mutable.ArrayBuffer[Message]
       this.internalIterator(isShallow = false).foreach { messageAndOffset =>
         val message = messageAndOffset.message
         validateMessageKey(message, compactedTopic)
+
         if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
           // No in place assignment situation 3
           // Validate the timestamp
@@ -435,12 +436,17 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           maxTimestamp = math.max(maxTimestamp, message.timestamp)
         }
 
+        if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
+          throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +
+            s"compression attribute set: $message")
+
         // No in place assignment situation 4
         if (message.magic != messageFormatVersion)
           inPlaceAssignment = false
 
         validatedMessages += message.toFormatVersion(messageFormatVersion)
       }
+
       if (!inPlaceAssignment) {
         // Cannot do in place assignment.
         val wrapperMessageTimestamp = {