You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/09 17:51:47 UTC

kafka git commit: HOTFIX: KAFKA-3160 follow-up, catch decompression errors in constructor

Repository: kafka
Updated Branches:
  refs/heads/trunk 2caf872c2 -> 4331bf4ff


HOTFIX: KAFKA-3160 follow-up, catch decompression errors in constructor

After testing KAFKA-3160 a bit more, I found that the error code was not being set properly in ProduceResponse. This happened because the validation error is raised in the CompressionFactory constructor, which was not wrapped in a try / catch.

ijuma junrao

(This contribution is my original work and I license the work under Apache 2.0.)

Author: Dana Powers <da...@gmail.com>
Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>, Gwen Shapira <cs...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1344 from dpkp/decompress_error_code


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4331bf4f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4331bf4f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4331bf4f

Branch: refs/heads/trunk
Commit: 4331bf4ff2d0e7ab1a24ea29382897162c1ed91c
Parents: 2caf872
Author: Dana Powers <da...@gmail.com>
Authored: Mon May 9 18:40:54 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon May 9 18:40:54 2016 +0100

----------------------------------------------------------------------
 .../kafka/message/ByteBufferMessageSet.scala    | 13 +--
 .../kafka/message/InvalidMessageException.scala |  3 +-
 .../unit/kafka/server/ProduceRequestTest.scala  | 86 ++++++++++++++++++++
 3 files changed, 95 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4331bf4f/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 677355a..a116d4b 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -23,11 +23,7 @@ import java.nio.ByteBuffer
 import java.nio.channels._
 import java.io._
 import java.util.ArrayDeque
-import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.JavaConverters._
-
-import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
@@ -96,7 +92,12 @@ object ByteBufferMessageSet {
       if (wrapperMessage.payload == null)
         throw new KafkaException(s"Message payload is null: $wrapperMessage")
       val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
-      val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
+      val compressed = try {
+        new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
+      } catch {
+        case ioe: IOException =>
+          throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe)
+      }
       var lastInnerOffset = -1L
 
       val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
@@ -108,7 +109,7 @@ object ByteBufferMessageSet {
           case eofe: EOFException =>
             compressed.close()
           case ioe: IOException =>
-            throw new CorruptRecordException(ioe)
+            throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe)
         }
         Some(innerMessageAndOffsets)
       } else None

http://git-wip-us.apache.org/repos/asf/kafka/blob/4331bf4f/core/src/main/scala/kafka/message/InvalidMessageException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/InvalidMessageException.scala b/core/src/main/scala/kafka/message/InvalidMessageException.scala
index df22516..ef83500 100644
--- a/core/src/main/scala/kafka/message/InvalidMessageException.scala
+++ b/core/src/main/scala/kafka/message/InvalidMessageException.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
  * Because ByteBufferMessageSet.scala and Message.scala are used in both server and client code having
  * InvalidMessageException extend CorruptRecordException allows us to change server code without affecting the client.
  */
-class InvalidMessageException(message: String) extends CorruptRecordException(message) {
+class InvalidMessageException(message: String, throwable: Throwable) extends CorruptRecordException(message, throwable) {
+  def this(message: String) = this(null, null)
   def this() = this(null)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4331bf4f/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
new file mode 100644
index 0000000..67f7d41
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.message.{ByteBufferMessageSet, LZ4CompressionCodec, Message}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+/**
+  * Subclasses of `BaseProduceSendRequestTest` exercise the producer and produce request/response. This class
+  * complements those classes with tests that require lower-level access to the protocol.
+  */
+class ProduceRequestTest extends BaseRequestTest {
+
+  @Test
+  def testSimpleProduceRequest() {
+    val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
+    val messageBuffer = new ByteBufferMessageSet(new Message("value".getBytes, "key".getBytes,
+      System.currentTimeMillis(), 1: Byte)).buffer
+    val topicPartition = new TopicPartition("topic", partition)
+    val partitionRecords = Map(topicPartition -> messageBuffer)
+    val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
+    assertEquals(1, produceResponse.responses.size)
+    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+    assertEquals(topicPartition, tp)
+    assertEquals(Errors.NONE.code, partitionResponse.errorCode)
+    assertEquals(0, partitionResponse.baseOffset)
+    assertEquals(-1, partitionResponse.timestamp)
+  }
+
+  /* returns a pair of partition id and leader id */
+  private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) = {
+    val partitionToLeader = TestUtils.createTopic(zkUtils, topic, 3, 2, servers)
+    partitionToLeader.collectFirst {
+      case (partition, Some(leader)) if leader != -1 => (partition, leader)
+    }.getOrElse(fail(s"No leader elected for topic $topic"))
+  }
+
+  @Test
+  def testCorruptLz4ProduceRequest() {
+    val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
+    val messageBuffer = new ByteBufferMessageSet(LZ4CompressionCodec, new Message("value".getBytes, "key".getBytes,
+      System.currentTimeMillis(), 1: Byte)).buffer
+    // Change the lz4 checksum value so that it doesn't match the contents
+    messageBuffer.array.update(40, 0)
+    val topicPartition = new TopicPartition("topic", partition)
+    val partitionRecords = Map(topicPartition -> messageBuffer)
+    val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
+    assertEquals(1, produceResponse.responses.size)
+    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+    assertEquals(topicPartition, tp)
+    assertEquals(Errors.CORRUPT_MESSAGE.code, partitionResponse.errorCode)
+    assertEquals(-1, partitionResponse.baseOffset)
+    assertEquals(-1, partitionResponse.timestamp)
+  }
+
+  private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = {
+    val socket = connect(s = servers.find(_.config.brokerId == leaderId).map(_.socketServer).getOrElse {
+      fail(s"Could not find broker with id $leaderId")
+    })
+    val response = send(socket, request, ApiKeys.PRODUCE, ProtoUtils.latestVersion(ApiKeys.PRODUCE.id))
+    ProduceResponse.parse(response)
+  }
+
+}