You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/10/09 17:05:51 UTC

git commit: kafka-1670; Corrupt log files for segment.bytes values close to Int.MaxInt; patched by Sriharsha Chintalapani; reviewed by Jay Kreps and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 68b9f7716 -> c940470e3


kafka-1670; Corrupt log files for segment.bytes values close to Int.MaxInt; patched by Sriharsha Chintalapani; reviewed by Jay Kreps and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c940470e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c940470e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c940470e

Branch: refs/heads/trunk
Commit: c940470e32916e2dbbe8f95bd295950a3681c5b7
Parents: 68b9f77
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Thu Oct 9 08:05:32 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Oct 9 08:05:32 2014 -0700

----------------------------------------------------------------------
 .../errors/RecordBatchTooLargeException.java    | 39 ++++++++++++++++++++
 .../apache/kafka/common/protocol/Errors.java    |  3 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |  5 ++-
 .../MessageSetSizeTooLargeException.scala       | 22 +++++++++++
 core/src/main/scala/kafka/log/Log.scala         | 27 +++++++++++---
 .../scala/unit/kafka/log/LogManagerTest.scala   |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 34 ++++++++++++-----
 .../scala/unit/kafka/server/LogOffsetTest.scala |  8 ++--
 8 files changed, 117 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java
new file mode 100644
index 0000000..f3f3f27
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This record batch is larger than the maximum allowable size
+ */
+public class RecordBatchTooLargeException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public RecordBatchTooLargeException() {
+        super();
+    }
+
+    public RecordBatchTooLargeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RecordBatchTooLargeException(String message) {
+        super(message);
+    }
+
+    public RecordBatchTooLargeException(Throwable cause) {
+        super(cause);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d434f42..d5f5de3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -43,7 +43,8 @@ public enum Errors {
     OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
     NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
     // TODO: errorCode 14, 15, 16
-    INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic."));
+    INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
+    RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server."));
 
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 3fae791..a190607 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -19,7 +19,6 @@ package kafka.common
 
 import kafka.message.InvalidMessageException
 import java.nio.ByteBuffer
-import java.lang.Throwable
 import scala.Predef._
 
 /**
@@ -47,6 +46,7 @@ object ErrorMapping {
   val ConsumerCoordinatorNotAvailableCode: Short = 15
   val NotCoordinatorForConsumerCode: Short = 16
   val InvalidTopicCode : Short = 17
+  val MessageSetSizeTooLargeCode: Short = 18
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -65,7 +65,8 @@ object ErrorMapping {
       classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
       classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
       classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
-      classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode
+      classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
+      classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala b/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala
new file mode 100644
index 0000000..94a616e
--- /dev/null
+++ b/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala
@@ -0,0 +1,22 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.common
+
+class MessageSetSizeTooLargeException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0ddf97b..a123cdc 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -252,9 +252,6 @@ class Log(val dir: File,
       lock synchronized {
         appendInfo.firstOffset = nextOffsetMetadata.messageOffset
 
-        // maybe roll the log if this segment is full
-        val segment = maybeRoll()
-
         if(assignOffsets) {
           // assign offsets to the message set
           val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
@@ -282,6 +279,16 @@ class Log(val dir: File,
           }
         }
 
+        // check messages set size may be exceed config.segmentSize
+        if(validMessages.sizeInBytes > config.segmentSize) {
+          throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
+            .format(validMessages.sizeInBytes, config.segmentSize))
+        }
+
+
+        // maybe roll the log if this segment is full
+        val segment = maybeRoll(validMessages.sizeInBytes)
+
         // now append to the log
         segment.append(appendInfo.firstOffset, validMessages)
 
@@ -489,12 +496,20 @@ class Log(val dir: File,
   def logEndOffset: Long = nextOffsetMetadata.messageOffset
 
   /**
-   * Roll the log over to a new empty log segment if necessary
+   * Roll the log over to a new empty log segment if necessary.
+   *
+   * @param messagesSize The messages set size in bytes
+   * logSegment will be rolled if one of the following conditions met
+   * <ol>
+   * <li> The logSegment is full
+   * <li> The maxTime has elapsed
+   * <li> The index is full
+   * </ol>
    * @return The currently active segment after (perhaps) rolling to a new segment
    */
-  private def maybeRoll(): LogSegment = {
+  private def maybeRoll(messagesSize: Int): LogSegment = {
     val segment = activeSegment
-    if (segment.size > config.segmentSize || 
+    if (segment.size > config.segmentSize - messagesSize ||
         segment.size > 0 && time.milliseconds - segment.created > config.segmentMs ||
         segment.index.isFull) {
       debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."

http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 59bd8a9..90cd530 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -114,7 +114,7 @@ class LogManagerTest extends JUnit3Suite {
     val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
     logManager.shutdown()
 
-    val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
+    val config = logConfig.copy(segmentSize = 10 * setSize, retentionSize = 5L * 10L * setSize + 10L)
     logManager = createLogManager()
     logManager.startup
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 577d102..a0cbd3b 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -18,15 +18,13 @@
 package kafka.log
 
 import java.io._
-import java.util.ArrayList
 import java.util.concurrent.atomic._
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message._
-import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
+import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException}
 import kafka.utils._
-import scala.Some
 import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
@@ -239,7 +237,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
-    val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
     
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
@@ -286,7 +284,26 @@ class LogTest extends JUnitSuite {
   }
 
   /**
-   * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the 
+   *  MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by
+   * appending a message set larger than the config.segmentSize setting and checking that an exception is thrown.
+   */
+  @Test
+  def testMessageSetSizeCheck() {
+    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
+    // append messages to log
+    val configSegmentSize = messageSet.sizeInBytes - 1
+    val log = new Log(logDir, logConfig.copy(segmentSize = configSegmentSize), recoveryPoint = 0L, time.scheduler, time = time)
+
+    try {
+      log.append(messageSet)
+      fail("message set should throw MessageSetSizeTooLargeException.")
+    } catch {
+      case e: MessageSetSizeTooLargeException => // this is good
+    }
+  }
+
+  /**
+   * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
    * setting and checking that an exception is thrown.
    */
   @Test
@@ -305,10 +322,9 @@ class LogTest extends JUnitSuite {
       log.append(second)
       fail("Second message set should throw MessageSizeTooLargeException.")
     } catch {
-        case e: MessageSizeTooLargeException => // this is good
+      case e: MessageSizeTooLargeException => // this is good
     }
   }
-  
   /**
    * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly.
    */
@@ -375,7 +391,7 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
-    val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+    val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
 
     // create a log
     val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
@@ -429,7 +445,7 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
-    val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+    val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
     val config = logConfig.copy(segmentSize = segmentSize)
     val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c940470e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9556ed9..c06ee75 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     log.flush()
 
     val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10)
-    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
+    assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
     val topicAndPartition = TopicAndPartition(topic, part)
@@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
       replicaId = 0)
     val consumerOffsets =
       simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
+    assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), consumerOffsets)
 
     // try to fetch using latest offset
     val fetchResponse = simpleConsumer.fetch(
@@ -155,14 +155,14 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
 
     val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10)
-    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
+    assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
     val topicAndPartition = TopicAndPartition(topic, part)
     val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
     val consumerOffsets =
       simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
+    assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), consumerOffsets)
   }
 
   @Test