You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/05/05 23:55:27 UTC
kafka git commit: HOTFIX: follow-up on KAFKA-725 to remove the check
and return empty response instead of throw exceptions
Repository: kafka
Updated Branches:
refs/heads/trunk 6856c5c21 -> 4a076a03b
HOTFIX: follow-up on KAFKA-725 to remove the check and return empty response instead of throw exceptions
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Stig D�ssing, Ismael Juma, Jun Rao
Closes #1327 from guozhangwang/K725r
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a076a03
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a076a03
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a076a03
Branch: refs/heads/trunk
Commit: 4a076a03bee376853713f4b5784b66b18ad5535c
Parents: 6856c5c
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu May 5 16:55:23 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu May 5 16:55:23 2016 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 6 +--
core/src/main/scala/kafka/log/LogSegment.scala | 40 ++++++++++----------
.../scala/kafka/server/ReplicaManager.scala | 8 +---
.../src/test/scala/unit/kafka/log/LogTest.scala | 20 ++++++----
.../unit/kafka/server/ReplicaManagerTest.scala | 18 +++++----
5 files changed, 50 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8465b64..e0ad73d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -483,14 +483,14 @@ class Log(val dir: File,
}
/**
- * Read messages from the log
+ * Read messages from the log.
*
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
- * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
+ * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
*
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
- * @return The fetch data information including fetch starting offset metadata and messages read
+ * @return The fetch data information including fetch starting offset metadata and messages read.
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 3a4bbc8..37f7579 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -113,7 +113,7 @@ class LogSegment(val log: FileMessageSet,
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
- * @param maxPosition An optional maximum position in the log segment that should be exposed for read.
+ * @param maxPosition The maximum position in the log segment that should be exposed for read
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* or null if the startOffset is larger than the largest offset in this log
@@ -137,24 +137,26 @@ class LogSegment(val log: FileMessageSet,
return FetchDataInfo(offsetMetadata, MessageSet.Empty)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
- val length =
- maxOffset match {
- case None =>
- // no max offset, just read until the max position
- min((maxPosition - startPosition.position).toInt, maxSize)
- case Some(offset) => {
- // there is a max offset, translate it to a file position and use that to calculate the max read size
- if(offset < startOffset)
- throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
- val mapping = translateOffset(offset, startPosition.position)
- val endPosition =
- if(mapping == null)
- logSize // the max offset is off the end of the log, use the end of the file
- else
- mapping.position
- min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
- }
- }
+ val length = maxOffset match {
+ case None =>
+ // no max offset, just read until the max position
+ min((maxPosition - startPosition.position).toInt, maxSize)
+ case Some(offset) =>
+ // there is a max offset, translate it to a file position and use that to calculate the max read size;
+ // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
+ // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
+ // offset between new leader's high watermark and the log end offset, we want to return an empty response.
+ if(offset < startOffset)
+ return FetchDataInfo(offsetMetadata, MessageSet.Empty)
+ val mapping = translateOffset(offset, startPosition.position)
+ val endPosition =
+ if(mapping == null)
+ logSize // the max offset is off the end of the log, use the end of the file
+ else
+ mapping.position
+ min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
+ }
+
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 888912b..534de27 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -522,12 +522,8 @@ class ReplicaManager(val config: KafkaConfig,
getReplicaOrException(topic, partition)
// decide whether to only fetch committed data (i.e. messages below high watermark)
- val maxOffsetOpt = if (readOnlyCommitted) {
- val maxOffset = localReplica.highWatermark.messageOffset
- if(offset > maxOffset)
- throw new OffsetOutOfRangeException("Request for offset %d beyond high watermark %d when reading from only committed offsets".format(offset, maxOffset))
- Some(maxOffset)
- }
+ val maxOffsetOpt = if (readOnlyCommitted)
+ Some(localReplica.highWatermark.messageOffset)
else
None
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8c973a4..796f5c3 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -222,7 +222,7 @@ class LogTest extends JUnitSuite {
// now manually truncate off all but one message from the first segment to create a gap in the messages
log.logSegments.head.truncateTo(1)
- assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset)
+ assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, log.read(1, 200, None).messageSet.head.offset)
}
/**
@@ -235,23 +235,29 @@ class LogTest extends JUnitSuite {
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
val logProps = new Properties()
+
+ // set up replica log starting with offset 1024 and with one message (at offset 1024)
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
- assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
+
+ assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
+
try {
- log.read(0, 1025)
- fail("Expected exception on invalid read.")
+ log.read(0, 1000)
+ fail("Reading below the log start offset should throw OffsetOutOfRangeException")
} catch {
- case e: OffsetOutOfRangeException => "This is good."
+ case e: OffsetOutOfRangeException => // This is good.
}
+
try {
log.read(1026, 1000)
- fail("Expected exception on invalid read.")
+ fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException")
} catch {
case e: OffsetOutOfRangeException => // This is good.
}
- assertEquals("Reading from maxOffset should produce 0 byte read.", 0, log.read(1024, 1000, Some(1024)).messageSet.sizeInBytes)
+
+ assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).messageSet.sizeInBytes)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a076a03/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 2cdf924..5739856 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
import kafka.cluster.Broker
import kafka.common.TopicAndPartition
-import kafka.message.{ByteBufferMessageSet, Message}
+import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
@@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{MockTime => JMockTime}
import org.apache.kafka.common.{Node, TopicPartition}
import org.easymock.EasyMock
-import org.junit.Assert.{assertEquals, assertTrue, assertFalse}
+import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{Test, Before, After}
import scala.collection.JavaConverters._
@@ -189,7 +189,7 @@ class ReplicaManagerTest {
}
@Test
- def testFetchBeyondHighWatermarkNotAllowedForConsumer() {
+ def testFetchBeyondHighWatermarkReturnEmptyResponse() {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.put("broker.id", Int.box(0))
@@ -218,7 +218,7 @@ class ReplicaManagerTest {
def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
- // Append a message.
+ // Append a couple of messages.
for(i <- 1 to 2)
rm.appendMessages(
timeout = 1000,
@@ -229,8 +229,10 @@ class ReplicaManagerTest {
var fetchCallbackFired = false
var fetchError = 0
+ var fetchedMessages: MessageSet = null
def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData]) = {
fetchError = responseStatus.values.head.error
+ fetchedMessages = responseStatus.values.head.messages
fetchCallbackFired = true
}
@@ -238,25 +240,27 @@ class ReplicaManagerTest {
rm.fetchMessages(
timeout = 1000,
replicaId = 1,
- fetchMinBytes = 1,
+ fetchMinBytes = 0,
fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
responseCallback = fetchCallback)
assertTrue(fetchCallbackFired)
assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+ assertTrue("Should return some data", fetchedMessages.iterator.hasNext)
fetchCallbackFired = false
// Fetch a message above the high watermark as a consumer
rm.fetchMessages(
timeout = 1000,
replicaId = -1,
- fetchMinBytes = 1,
+ fetchMinBytes = 0,
fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
responseCallback = fetchCallback)
assertTrue(fetchCallbackFired)
- assertEquals("Should give OffsetOutOfRangeException", Errors.OFFSET_OUT_OF_RANGE.code, fetchError)
+ assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+ assertEquals("Should return empty response", MessageSet.Empty, fetchedMessages)
} finally {
rm.shutdown(checkpointHW = false)
}