You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/05/05 23:55:27 UTC

kafka git commit: HOTFIX: follow-up on KAFKA-725 to remove the check and return empty response instead of throw exceptions

Repository: kafka
Updated Branches:
  refs/heads/trunk 6856c5c21 -> 4a076a03b


HOTFIX: follow-up on KAFKA-725 to remove the check and return empty response instead of throw exceptions

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Stig D�ssing, Ismael Juma, Jun Rao

Closes #1327 from guozhangwang/K725r


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a076a03
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a076a03
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a076a03

Branch: refs/heads/trunk
Commit: 4a076a03bee376853713f4b5784b66b18ad5535c
Parents: 6856c5c
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu May 5 16:55:23 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu May 5 16:55:23 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |  6 +--
 core/src/main/scala/kafka/log/LogSegment.scala  | 40 ++++++++++----------
 .../scala/kafka/server/ReplicaManager.scala     |  8 +---
 .../src/test/scala/unit/kafka/log/LogTest.scala | 20 ++++++----
 .../unit/kafka/server/ReplicaManagerTest.scala  | 18 +++++----
 5 files changed, 50 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8465b64..e0ad73d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -483,14 +483,14 @@ class Log(val dir: File,
   }
 
   /**
-   * Read messages from the log
+   * Read messages from the log.
    *
    * @param startOffset The offset to begin reading at
    * @param maxLength The maximum number of bytes to read
-   * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
+   * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
    *
    * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
-   * @return The fetch data information including fetch starting offset metadata and messages read
+   * @return The fetch data information including fetch starting offset metadata and messages read.
    */
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
     trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 3a4bbc8..37f7579 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -113,7 +113,7 @@ class LogSegment(val log: FileMessageSet,
    * @param startOffset A lower bound on the first offset to include in the message set we read
    * @param maxSize The maximum number of bytes to include in the message set we read
    * @param maxOffset An optional maximum offset for the message set we read
-   * @param maxPosition An optional maximum position in the log segment that should be exposed for read.
+   * @param maxPosition The maximum position in the log segment that should be exposed for read
    *
    * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
    *         or null if the startOffset is larger than the largest offset in this log
@@ -137,24 +137,26 @@ class LogSegment(val log: FileMessageSet,
       return FetchDataInfo(offsetMetadata, MessageSet.Empty)
 
     // calculate the length of the message set to read based on whether or not they gave us a maxOffset
-    val length =
-      maxOffset match {
-        case None =>
-          // no max offset, just read until the max position
-          min((maxPosition - startPosition.position).toInt, maxSize)
-        case Some(offset) => {
-          // there is a max offset, translate it to a file position and use that to calculate the max read size
-          if(offset < startOffset)
-            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
-          val mapping = translateOffset(offset, startPosition.position)
-          val endPosition =
-            if(mapping == null)
-              logSize // the max offset is off the end of the log, use the end of the file
-            else
-              mapping.position
-          min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
-        }
-      }
+    val length = maxOffset match {
+      case None =>
+        // no max offset, just read until the max position
+        min((maxPosition - startPosition.position).toInt, maxSize)
+      case Some(offset) =>
+        // there is a max offset, translate it to a file position and use that to calculate the max read size;
+        // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
+        // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
+        // offset between new leader's high watermark and the log end offset, we want to return an empty response.
+        if(offset < startOffset)
+          return FetchDataInfo(offsetMetadata, MessageSet.Empty)
+        val mapping = translateOffset(offset, startPosition.position)
+        val endPosition =
+          if(mapping == null)
+            logSize // the max offset is off the end of the log, use the end of the file
+          else
+            mapping.position
+        min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
+    }
+
     FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 888912b..534de27 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -522,12 +522,8 @@ class ReplicaManager(val config: KafkaConfig,
             getReplicaOrException(topic, partition)
 
           // decide whether to only fetch committed data (i.e. messages below high watermark)
-          val maxOffsetOpt = if (readOnlyCommitted) {
-            val maxOffset = localReplica.highWatermark.messageOffset
-            if(offset > maxOffset)
-              throw new OffsetOutOfRangeException("Request for offset %d beyond high watermark %d when reading from only committed offsets".format(offset, maxOffset))
-            Some(maxOffset)
-          }
+          val maxOffsetOpt = if (readOnlyCommitted)
+            Some(localReplica.highWatermark.messageOffset)
           else
             None
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8c973a4..796f5c3 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -222,7 +222,7 @@ class LogTest extends JUnitSuite {
     // now manually truncate off all but one message from the first segment to create a gap in the messages
     log.logSegments.head.truncateTo(1)
 
-    assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset)
+    assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, log.read(1, 200, None).messageSet.head.offset)
   }
 
   /**
@@ -235,23 +235,29 @@ class LogTest extends JUnitSuite {
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
     val logProps = new Properties()
+
+    // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
-    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
+
+    assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
+
     try {
-      log.read(0, 1025)
-      fail("Expected exception on invalid read.")
+      log.read(0, 1000)
+      fail("Reading below the log start offset should throw OffsetOutOfRangeException")
     } catch {
-      case e: OffsetOutOfRangeException => "This is good."
+      case e: OffsetOutOfRangeException => // This is good.
     }
+
     try {
       log.read(1026, 1000)
-      fail("Expected exception on invalid read.")
+      fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException")
     } catch {
       case e: OffsetOutOfRangeException => // This is good.
     }
-    assertEquals("Reading from maxOffset should produce 0 byte read.", 0, log.read(1024, 1000, Some(1024)).messageSet.sizeInBytes)
+
+    assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).messageSet.sizeInBytes)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 2cdf924..5739856 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
-import kafka.message.{ByteBufferMessageSet, Message}
+import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
 import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
@@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
-import org.junit.Assert.{assertEquals, assertTrue, assertFalse}
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{Test, Before, After}
 
 import scala.collection.JavaConverters._
@@ -189,7 +189,7 @@ class ReplicaManagerTest {
   }
   
   @Test
-  def testFetchBeyondHighWatermarkNotAllowedForConsumer() {
+  def testFetchBeyondHighWatermarkReturnEmptyResponse() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     props.put("broker.id", Int.box(0))
@@ -218,7 +218,7 @@ class ReplicaManagerTest {
 
       def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
       
-      // Append a message.
+      // Append a couple of messages.
       for(i <- 1 to 2)
         rm.appendMessages(
           timeout = 1000,
@@ -229,8 +229,10 @@ class ReplicaManagerTest {
       
       var fetchCallbackFired = false
       var fetchError = 0
+      var fetchedMessages: MessageSet = null
       def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData]) = {
         fetchError = responseStatus.values.head.error
+        fetchedMessages = responseStatus.values.head.messages
         fetchCallbackFired = true
       }
       
@@ -238,25 +240,27 @@ class ReplicaManagerTest {
       rm.fetchMessages(
         timeout = 1000,
         replicaId = 1,
-        fetchMinBytes = 1,
+        fetchMinBytes = 0,
         fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
         responseCallback = fetchCallback)
         
       
       assertTrue(fetchCallbackFired)
       assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+      assertTrue("Should return some data", fetchedMessages.iterator.hasNext)
       fetchCallbackFired = false
       
       // Fetch a message above the high watermark as a consumer
       rm.fetchMessages(
         timeout = 1000,
         replicaId = -1,
-        fetchMinBytes = 1,
+        fetchMinBytes = 0,
         fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
         responseCallback = fetchCallback)
           
         assertTrue(fetchCallbackFired)
-        assertEquals("Should give OffsetOutOfRangeException", Errors.OFFSET_OUT_OF_RANGE.code, fetchError)
+        assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+        assertEquals("Should return empty response", MessageSet.Empty, fetchedMessages)
     } finally {
       rm.shutdown(checkpointHW = false)
     }