You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/11/06 08:53:35 UTC

[kafka] branch trunk updated: MINOR: Always return partitions with diverging epochs in fetch response (#9567)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8e211eb  MINOR: Always return partitions with diverging epochs in fetch response (#9567)
8e211eb is described below

commit 8e211eb72f9a45897cc37fed394a38096aa47feb
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Nov 6 08:51:14 2020 +0000

    MINOR: Always return partitions with diverging epochs in fetch response (#9567)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../src/main/scala/kafka/server/FetchSession.scala |  4 ++
 .../scala/unit/kafka/server/FetchSessionTest.scala | 55 ++++++++++++++++++++++
 2 files changed, 59 insertions(+)

diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 140fba0..c6280f3 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -154,6 +154,10 @@ class CachedPartition(val topic: String,
         highWatermark = -1
       mustRespond = true
     }
+    if (respData.divergingEpoch.isPresent) {
+      // Partitions with diverging epoch are always included in response to trigger truncation.
+      mustRespond = true
+    }
     mustRespond
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 5ef3105..9d99fb5 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -21,10 +21,12 @@ import java.util.{Collections, Optional}
 
 import kafka.utils.MockTime
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INVALID_SESSION_ID}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.rules.Timeout
 import org.junit.{Rule, Test}
@@ -643,4 +645,57 @@ class FetchSessionTest {
     assertTrue(resp2.responseData().isEmpty)
     assertEquals(0, cache.size)
   }
+
+  @Test
+  def testDivergingEpoch(): Unit = {
+    val time = new MockTime()
+    val cache = new FetchSessionCache(10, 1000)
+    val fetchManager = new FetchManager(time, cache)
+    val tp1 = new TopicPartition("foo", 1)
+    val tp2 = new TopicPartition("bar", 2)
+
+    val reqData = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+    reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4)))
+    reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4)))
+
+    // Full fetch context returns all partitions in the response
+    val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData, EMPTY_PART_LIST, isFollower = false)
+    assertEquals(classOf[FullFetchContext], context1.getClass)
+    val respData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+    respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
+      105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(), null))
+    val divergingEpoch = Optional.of(new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(90))
+    respData.put(tp2, new FetchResponse.PartitionData(Errors.NONE,
+      105, 105, 0, Optional.empty(), Collections.emptyList(), divergingEpoch, null))
+    val resp1 = context1.updateAndGenerateResponseData(respData)
+    assertEquals(Errors.NONE, resp1.error)
+    assertNotEquals(INVALID_SESSION_ID, resp1.sessionId)
+    assertEquals(Utils.mkSet(tp1, tp2), resp1.responseData.keySet)
+
+    // Incremental fetch context returns partitions with divergent epoch even if none
+    // of the other conditions for return are met.
+    val context2 = fetchManager.newContext(new JFetchMetadata(resp1.sessionId, 1), reqData, EMPTY_PART_LIST, isFollower = false)
+    assertEquals(classOf[IncrementalFetchContext], context2.getClass)
+    val resp2 = context2.updateAndGenerateResponseData(respData)
+    assertEquals(Errors.NONE, resp2.error)
+    assertEquals(resp1.sessionId, resp2.sessionId)
+    assertEquals(Collections.singleton(tp2), resp2.responseData.keySet)
+
+    // All partitions with divergent epoch should be returned.
+    respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
+      105, 105, 0, Optional.empty(), Collections.emptyList(), divergingEpoch, null))
+    val resp3 = context2.updateAndGenerateResponseData(respData)
+    assertEquals(Errors.NONE, resp3.error)
+    assertEquals(resp1.sessionId, resp3.sessionId)
+    assertEquals(Utils.mkSet(tp1, tp2), resp3.responseData.keySet)
+
+    // Partitions that meet other conditions should be returned regardless of whether
+    // divergingEpoch is set or not.
+    respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
+      110, 110, 0, Optional.empty(), Collections.emptyList(), Optional.empty(), null))
+    val resp4 = context2.updateAndGenerateResponseData(respData)
+    assertEquals(Errors.NONE, resp4.error)
+    assertEquals(resp1.sessionId, resp4.sessionId)
+    assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData.keySet)
+  }
 }