You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/10/09 17:05:51 UTC
git commit: kafka-1670;
Corrupt log files for segment.bytes values close to Int.MaxInt;
patched by Sriharsha Chintalapani; reviewed by Jay Kreps and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 68b9f7716 -> c940470e3
kafka-1670; Corrupt log files for segment.bytes values close to Int.MaxInt; patched by Sriharsha Chintalapani; reviewed by Jay Kreps and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c940470e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c940470e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c940470e
Branch: refs/heads/trunk
Commit: c940470e32916e2dbbe8f95bd295950a3681c5b7
Parents: 68b9f77
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Thu Oct 9 08:05:32 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Oct 9 08:05:32 2014 -0700
----------------------------------------------------------------------
.../errors/RecordBatchTooLargeException.java | 39 ++++++++++++++++++++
.../apache/kafka/common/protocol/Errors.java | 3 +-
.../main/scala/kafka/common/ErrorMapping.scala | 5 ++-
.../MessageSetSizeTooLargeException.scala | 22 +++++++++++
core/src/main/scala/kafka/log/Log.scala | 27 +++++++++++---
.../scala/unit/kafka/log/LogManagerTest.scala | 2 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 34 ++++++++++++-----
.../scala/unit/kafka/server/LogOffsetTest.scala | 8 ++--
8 files changed, 117 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java
new file mode 100644
index 0000000..f3f3f27
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This record batch is larger than the maximum allowable size
+ */
+public class RecordBatchTooLargeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RecordBatchTooLargeException() {
+ super();
+ }
+
+ public RecordBatchTooLargeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RecordBatchTooLargeException(String message) {
+ super(message);
+ }
+
+ public RecordBatchTooLargeException(Throwable cause) {
+ super(cause);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d434f42..d5f5de3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -43,7 +43,8 @@ public enum Errors {
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
// TODO: errorCode 14, 15, 16
- INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic."));
+ INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
+ RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server."));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 3fae791..a190607 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -19,7 +19,6 @@ package kafka.common
import kafka.message.InvalidMessageException
import java.nio.ByteBuffer
-import java.lang.Throwable
import scala.Predef._
/**
@@ -47,6 +46,7 @@ object ErrorMapping {
val ConsumerCoordinatorNotAvailableCode: Short = 15
val NotCoordinatorForConsumerCode: Short = 16
val InvalidTopicCode : Short = 17
+ val MessageSetSizeTooLargeCode: Short = 18
private val exceptionToCode =
Map[Class[Throwable], Short](
@@ -65,7 +65,8 @@ object ErrorMapping {
classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
- classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode
+ classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
+ classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode
).withDefaultValue(UnknownCode)
/* invert the mapping */
http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala b/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala
new file mode 100644
index 0000000..94a616e
--- /dev/null
+++ b/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class MessageSetSizeTooLargeException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0ddf97b..a123cdc 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -252,9 +252,6 @@ class Log(val dir: File,
lock synchronized {
appendInfo.firstOffset = nextOffsetMetadata.messageOffset
- // maybe roll the log if this segment is full
- val segment = maybeRoll()
-
if(assignOffsets) {
// assign offsets to the message set
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
@@ -282,6 +279,16 @@ class Log(val dir: File,
}
}
+ // check messages set size may be exceed config.segmentSize
+ if(validMessages.sizeInBytes > config.segmentSize) {
+ throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
+ .format(validMessages.sizeInBytes, config.segmentSize))
+ }
+
+
+ // maybe roll the log if this segment is full
+ val segment = maybeRoll(validMessages.sizeInBytes)
+
// now append to the log
segment.append(appendInfo.firstOffset, validMessages)
@@ -489,12 +496,20 @@ class Log(val dir: File,
def logEndOffset: Long = nextOffsetMetadata.messageOffset
/**
- * Roll the log over to a new empty log segment if necessary
+ * Roll the log over to a new empty log segment if necessary.
+ *
+ * @param messagesSize The messages set size in bytes
+ * logSegment will be rolled if one of the following conditions met
+ * <ol>
+ * <li> The logSegment is full
+ * <li> The maxTime has elapsed
+ * <li> The index is full
+ * </ol>
* @return The currently active segment after (perhaps) rolling to a new segment
*/
- private def maybeRoll(): LogSegment = {
+ private def maybeRoll(messagesSize: Int): LogSegment = {
val segment = activeSegment
- if (segment.size > config.segmentSize ||
+ if (segment.size > config.segmentSize - messagesSize ||
segment.size > 0 && time.milliseconds - segment.created > config.segmentMs ||
segment.index.isFull) {
debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 59bd8a9..90cd530 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -114,7 +114,7 @@ class LogManagerTest extends JUnit3Suite {
val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
logManager.shutdown()
- val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
+ val config = logConfig.copy(segmentSize = 10 * setSize, retentionSize = 5L * 10L * setSize + 10L)
logManager = createLogManager()
logManager.startup
http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 577d102..a0cbd3b 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -18,15 +18,13 @@
package kafka.log
import java.io._
-import java.util.ArrayList
import java.util.concurrent.atomic._
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.message._
-import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
+import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException}
import kafka.utils._
-import scala.Some
import kafka.server.KafkaConfig
class LogTest extends JUnitSuite {
@@ -239,7 +237,7 @@ class LogTest extends JUnitSuite {
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
- val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
@@ -286,7 +284,26 @@ class LogTest extends JUnitSuite {
}
/**
- * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
+ * MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by
+ * appending a message set larger than the config.segmentSize setting and checking that an exception is thrown.
+ */
+ @Test
+ def testMessageSetSizeCheck() {
+ val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
+ // append messages to log
+ val configSegmentSize = messageSet.sizeInBytes - 1
+ val log = new Log(logDir, logConfig.copy(segmentSize = configSegmentSize), recoveryPoint = 0L, time.scheduler, time = time)
+
+ try {
+ log.append(messageSet)
+ fail("message set should throw MessageSetSizeTooLargeException.")
+ } catch {
+ case e: MessageSetSizeTooLargeException => // this is good
+ }
+ }
+
+ /**
+ * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
* setting and checking that an exception is thrown.
*/
@Test
@@ -305,10 +322,9 @@ class LogTest extends JUnitSuite {
log.append(second)
fail("Second message set should throw MessageSizeTooLargeException.")
} catch {
- case e: MessageSizeTooLargeException => // this is good
+ case e: MessageSizeTooLargeException => // this is good
}
}
-
/**
* Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly.
*/
@@ -375,7 +391,7 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
- val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+ val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
// create a log
val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
@@ -429,7 +445,7 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
- val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+ val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
val config = logConfig.copy(segmentSize = segmentSize)
val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9556ed9..c06ee75 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
log.flush()
val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10)
- assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
+ assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
val topicAndPartition = TopicAndPartition(topic, part)
@@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
replicaId = 0)
val consumerOffsets =
simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
- assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
+ assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), consumerOffsets)
// try to fetch using latest offset
val fetchResponse = simpleConsumer.fetch(
@@ -155,14 +155,14 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10)
- assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
+ assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
val topicAndPartition = TopicAndPartition(topic, part)
val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
val consumerOffsets =
simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
- assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
+ assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), consumerOffsets)
}
@Test