You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/09 17:51:47 UTC
kafka git commit: HOTFIX: KAFKA-3160 follow-up,
catch decompression errors in constructor
Repository: kafka
Updated Branches:
refs/heads/trunk 2caf872c2 -> 4331bf4ff
HOTFIX: KAFKA-3160 follow-up, catch decompression errors in constructor
After testing KAFKA-3160 a bit more, I found that the error code was not being set properly in ProduceResponse. This happened because the validation error is raised in the CompressionFactory constructor, which was not wrapped in a try / catch.
ijuma junrao
(This contribution is my original work and I license the work under Apache 2.0.)
Author: Dana Powers <da...@gmail.com>
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>, Gwen Shapira <cs...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1344 from dpkp/decompress_error_code
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4331bf4f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4331bf4f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4331bf4f
Branch: refs/heads/trunk
Commit: 4331bf4ff2d0e7ab1a24ea29382897162c1ed91c
Parents: 2caf872
Author: Dana Powers <da...@gmail.com>
Authored: Mon May 9 18:40:54 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon May 9 18:40:54 2016 +0100
----------------------------------------------------------------------
.../kafka/message/ByteBufferMessageSet.scala | 13 +--
.../kafka/message/InvalidMessageException.scala | 3 +-
.../unit/kafka/server/ProduceRequestTest.scala | 86 ++++++++++++++++++++
3 files changed, 95 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4331bf4f/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 677355a..a116d4b 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -23,11 +23,7 @@ import java.nio.ByteBuffer
import java.nio.channels._
import java.io._
import java.util.ArrayDeque
-import java.util.concurrent.atomic.AtomicLong
-import scala.collection.JavaConverters._
-
-import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.errors.InvalidTimestampException
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Utils
@@ -96,7 +92,12 @@ object ByteBufferMessageSet {
if (wrapperMessage.payload == null)
throw new KafkaException(s"Message payload is null: $wrapperMessage")
val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
- val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
+ val compressed = try {
+ new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
+ } catch {
+ case ioe: IOException =>
+ throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe)
+ }
var lastInnerOffset = -1L
val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
@@ -108,7 +109,7 @@ object ByteBufferMessageSet {
case eofe: EOFException =>
compressed.close()
case ioe: IOException =>
- throw new CorruptRecordException(ioe)
+ throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe)
}
Some(innerMessageAndOffsets)
} else None
http://git-wip-us.apache.org/repos/asf/kafka/blob/4331bf4f/core/src/main/scala/kafka/message/InvalidMessageException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/InvalidMessageException.scala b/core/src/main/scala/kafka/message/InvalidMessageException.scala
index df22516..ef83500 100644
--- a/core/src/main/scala/kafka/message/InvalidMessageException.scala
+++ b/core/src/main/scala/kafka/message/InvalidMessageException.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
* Because ByteBufferMessageSet.scala and Message.scala are used in both server and client code having
* InvalidMessageException extend CorruptRecordException allows us to change server code without affecting the client.
*/
-class InvalidMessageException(message: String) extends CorruptRecordException(message) {
+class InvalidMessageException(message: String, throwable: Throwable) extends CorruptRecordException(message, throwable) {
+ def this(message: String) = this(null, null)
def this() = this(null)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4331bf4f/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
new file mode 100644
index 0000000..67f7d41
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.message.{ByteBufferMessageSet, LZ4CompressionCodec, Message}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+/**
+ * Subclasses of `BaseProduceSendRequestTest` exercise the producer and produce request/response. This class
+ * complements those classes with tests that require lower-level access to the protocol.
+ */
+class ProduceRequestTest extends BaseRequestTest {
+
+ @Test
+ def testSimpleProduceRequest() {
+ val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
+ val messageBuffer = new ByteBufferMessageSet(new Message("value".getBytes, "key".getBytes,
+ System.currentTimeMillis(), 1: Byte)).buffer
+ val topicPartition = new TopicPartition("topic", partition)
+ val partitionRecords = Map(topicPartition -> messageBuffer)
+ val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
+ assertEquals(1, produceResponse.responses.size)
+ val (tp, partitionResponse) = produceResponse.responses.asScala.head
+ assertEquals(topicPartition, tp)
+ assertEquals(Errors.NONE.code, partitionResponse.errorCode)
+ assertEquals(0, partitionResponse.baseOffset)
+ assertEquals(-1, partitionResponse.timestamp)
+ }
+
+ /* returns a pair of partition id and leader id */
+ private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) = {
+ val partitionToLeader = TestUtils.createTopic(zkUtils, topic, 3, 2, servers)
+ partitionToLeader.collectFirst {
+ case (partition, Some(leader)) if leader != -1 => (partition, leader)
+ }.getOrElse(fail(s"No leader elected for topic $topic"))
+ }
+
+ @Test
+ def testCorruptLz4ProduceRequest() {
+ val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
+ val messageBuffer = new ByteBufferMessageSet(LZ4CompressionCodec, new Message("value".getBytes, "key".getBytes,
+ System.currentTimeMillis(), 1: Byte)).buffer
+ // Change the lz4 checksum value so that it doesn't match the contents
+ messageBuffer.array.update(40, 0)
+ val topicPartition = new TopicPartition("topic", partition)
+ val partitionRecords = Map(topicPartition -> messageBuffer)
+ val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
+ assertEquals(1, produceResponse.responses.size)
+ val (tp, partitionResponse) = produceResponse.responses.asScala.head
+ assertEquals(topicPartition, tp)
+ assertEquals(Errors.CORRUPT_MESSAGE.code, partitionResponse.errorCode)
+ assertEquals(-1, partitionResponse.baseOffset)
+ assertEquals(-1, partitionResponse.timestamp)
+ }
+
+ private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = {
+ val socket = connect(s = servers.find(_.config.brokerId == leaderId).map(_.socketServer).getOrElse {
+ fail(s"Could not find broker with id $leaderId")
+ })
+ val response = send(socket, request, ApiKeys.PRODUCE, ProtoUtils.latestVersion(ApiKeys.PRODUCE.id))
+ ProduceResponse.parse(response)
+ }
+
+}