You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:57 UTC

[41/50] [abbrv] kafka git commit: KAFKA-725: Return OffsetOutOfRange error from ReplicaManager when non-follower attempts reading an offset that's above high watermark.

KAFKA-725: Return OffsetOutOfRange error from ReplicaManager when non-follower attempts reading an offset that's above high watermark.

This should make Log.read act the same when startOffset is larger than maxOffset as it would if startOffset was larger than logEndOffset. The current behavior can result in an IllegalArgumentException from LogSegment if a consumer attempts to fetch an offset above the high watermark which is present in the leader's log. It seems more correct if Log.read presents the view of the log to consumers as if it simply ended at maxOffset (high watermark).

I've tried to describe an example scenario of this happening here https://issues.apache.org/jira/browse/KAFKA-725?focusedCommentId=15221673&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15221673

I'm not sure I understand why ReplicaManager sets maxOffset to the high watermark, and not high watermark + 1. Isn't the high watermark the last committed message, and readable by consumers?

Tests passed for me locally on second try, seems like it just hit a flaky test.

Author: Stig Rohde Døssing <sd...@it-minds.dk>

Reviewers: Jiangjie Qin <be...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #1178 from srdo/KAFKA-725


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4d467c2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4d467c2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4d467c2e

Branch: refs/heads/0.10.0
Commit: 4d467c2ec275b5659c2da0ca196409dffaa3caf3
Parents: 9beafae
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Fri Apr 8 09:44:51 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 8 09:44:51 2016 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/ReplicaManager.scala     |   8 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   9 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 118 +++++++++++++++----
 3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4d467c2e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f050e27..22657f4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -522,8 +522,12 @@ class ReplicaManager(val config: KafkaConfig,
             getReplicaOrException(topic, partition)
 
           // decide whether to only fetch committed data (i.e. messages below high watermark)
-          val maxOffsetOpt = if (readOnlyCommitted)
-            Some(localReplica.highWatermark.messageOffset)
+          val maxOffsetOpt = if (readOnlyCommitted) {
+            val maxOffset = localReplica.highWatermark.messageOffset
+            if(offset > maxOffset)
+              throw new OffsetOutOfRangeException("Request for offset %d beyond high watermark %d when reading from only committed offsets".format(offset, maxOffset))
+            Some(maxOffset)
+          }
           else
             None
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d467c2e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4d75d53..3f6a275 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -228,6 +228,7 @@ class LogTest extends JUnitSuite {
   /**
    * Test reading at the boundary of the log, specifically
    * - reading from the logEndOffset should give an empty message set
+   * - reading from the the maxOffset should give an empty message set
    * - reading beyond the log end offset should throw an OffsetOutOfRangeException
    */
   @Test
@@ -236,19 +237,21 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
-    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes)
+    log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
     try {
-      log.read(0, 1024)
+      log.read(0, 1025)
       fail("Expected exception on invalid read.")
     } catch {
       case e: OffsetOutOfRangeException => "This is good."
     }
     try {
-      log.read(1025, 1000)
+      log.read(1026, 1000)
       fail("Expected exception on invalid read.")
     } catch {
       case e: OffsetOutOfRangeException => // This is good.
     }
+    assertEquals("Reading from maxOffset should produce 0 byte read.", 0, log.read(1024, 1000, Some(1024)).messageSet.sizeInBytes)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4d467c2e/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ee14af4..c2c670e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -35,8 +35,8 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
+import org.junit.Assert.{assertEquals, assertTrue, assertFalse}
+import org.junit.{Test, Before, After}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
@@ -44,17 +44,28 @@ import scala.collection.Map
 class ReplicaManagerTest {
 
   val topic = "test-topic"
+  val time = new MockTime()
+  val jTime = new JMockTime
+  val metrics = new Metrics
+  var zkClient : ZkClient = _
+  var zkUtils : ZkUtils = _
+    
+  @Before
+  def setUp() {
+    zkClient = EasyMock.createMock(classOf[ZkClient])
+    zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
+  }
+  
+  @After
+  def tearDown() {
+    metrics.close();
+  }
 
   @Test
   def testHighWaterMarkDirectoryMapping() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
     try {
@@ -64,7 +75,6 @@ class ReplicaManagerTest {
     } finally {
       // shutdown the replica manager upon test completion
       rm.shutdown(false)
-      metrics.close()
     }
   }
 
@@ -73,12 +83,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
     try {
@@ -88,7 +93,6 @@ class ReplicaManagerTest {
     } finally {
       // shutdown the replica manager upon test completion
       rm.shutdown(checkpointHW = false)
-      metrics.close()
     }
   }
 
@@ -96,12 +100,7 @@ class ReplicaManagerTest {
   def testIllegalRequiredAcks() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), Option(this.getClass.getName))
     try {
@@ -116,7 +115,6 @@ class ReplicaManagerTest {
         responseCallback = callback)
     } finally {
       rm.shutdown(checkpointHW = false)
-      metrics.close()
     }
 
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
@@ -127,12 +125,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
 
@@ -192,7 +185,80 @@ class ReplicaManagerTest {
       assertTrue(fetchCallbackFired)
     } finally {
       rm.shutdown(checkpointHW = false)
-      metrics.close()
+    }
+  }
+  
+  @Test
+  def testFetchBeyondHighWatermarkNotAllowedForConsumer() {
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    props.put("broker.id", Int.box(0))
+    val config = KafkaConfig.fromProps(props)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
+      new AtomicBoolean(false), Option(this.getClass.getName))
+    try {
+      val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1), new Broker(1, "host2", 2))
+      val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+      EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+      EasyMock.replay(metadataCache)
+      
+      val brokerList : java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
+      val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
+      
+      val partition = rm.getOrCreatePartition(topic, 0)
+      partition.getOrCreateReplica(0)
+      
+      // Make this replica the leader.
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava)
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+      rm.getLeaderReplicaIfLocal(topic, 0)
+
+      def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
+      
+      // Append a message.
+      for(i <- 1 to 2)
+        rm.appendMessages(
+          timeout = 1000,
+          requiredAcks = -1,
+          internalTopicsAllowed = false,
+          messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("message %d".format(i).getBytes))),
+          responseCallback = produceCallback)
+      
+      var fetchCallbackFired = false
+      var fetchError = 0
+      def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData]) = {
+        fetchError = responseStatus.values.head.error
+        fetchCallbackFired = true
+      }
+      
+      // Fetch a message above the high watermark as a follower
+      rm.fetchMessages(
+        timeout = 1000,
+        replicaId = 1,
+        fetchMinBytes = 1,
+        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
+        responseCallback = fetchCallback)
+        
+      
+      assertTrue(fetchCallbackFired)
+      assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+      fetchCallbackFired = false
+      
+      // Fetch a message above the high watermark as a consumer
+      rm.fetchMessages(
+        timeout = 1000,
+        replicaId = -1,
+        fetchMinBytes = 1,
+        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
+        responseCallback = fetchCallback)
+          
+        assertTrue(fetchCallbackFired)
+        assertEquals("Should give OffsetOutOfRangeException", Errors.OFFSET_OUT_OF_RANGE.code, fetchError)
+    } finally {
+      rm.shutdown(checkpointHW = false)
     }
   }
 }