You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/06/11 17:35:59 UTC

[kafka] branch 2.0 updated: KAFKA-6946; Keep the session id for incremental fetch when fetch responses are throttled

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new bde02e7  KAFKA-6946; Keep the session id for incremental fetch when fetch responses are throttled
bde02e7 is described below

commit bde02e760315eb10abe3697d08226b2cd6ba88fc
Author: Jon Lee <jo...@linkedin.com>
AuthorDate: Mon Jun 11 10:35:30 2018 -0700

    KAFKA-6946; Keep the session id for incremental fetch when fetch responses are throttled
    
    Currently, a throttled fetch response is returned with INVALID_SESSION_ID, which causes dropping the current fetch session if incremental fetch is in progress. This patch fixes this by returning the correct session id.
    
    Author: Jon Lee <jo...@linkedin.com>
    
    Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Dong Lin <li...@gmail.com>
    
    Closes #5164 from jonlee2/KAFKA-6946
    
    (cherry picked from commit 7d85785d3de7640ca0ee61e7110a32286e328343)
    Signed-off-by: Dong Lin <li...@gmail.com>
---
 .../src/main/scala/kafka/server/FetchSession.scala | 21 ++++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   |  3 +-
 .../scala/unit/kafka/server/FetchSessionTest.scala | 33 ++++++++++++++--------
 3 files changed, 43 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 7a47780..68f79ca 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -290,6 +290,12 @@ trait FetchContext extends Logging {
 
   def partitionsToLogString(partitions: util.Collection[TopicPartition]): String =
     FetchSession.partitionsToLogString(partitions, isTraceEnabled)
+
+  /**
+    * Return an empty throttled response due to quota violation.
+    */
+  def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] =
+    new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, INVALID_SESSION_ID)
 }
 
 /**
@@ -474,6 +480,21 @@ class IncrementalFetchContext(private val time: Time,
       }
     }
   }
+
+  override def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] = {
+    session.synchronized {
+      // Check to make sure that the session epoch didn't change in between
+      // creating this fetch context and generating this response.
+      val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+      if (session.epoch != expectedEpoch) {
+        info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " +
+          s"got ${session.epoch}.  Possible duplicate request.")
+        new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new FetchSession.RESP_MAP, throttleTimeMs, session.id)
+      } else {
+        new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, session.id)
+      }
+    }
+  }
 }
 
 case class LastUsedKey(val lastUsedMs: Long,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7a39c12..874a209 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -656,8 +656,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
           }
           // If throttling is required, return an empty response.
-          unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition,
-            FetchResponse.PartitionData[Records]](), maxThrottleTimeMs, INVALID_SESSION_ID)
+          unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
         } else {
           // Get the actual response. This will update the fetch context.
           unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 84efa6b..b79692d 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -201,25 +201,34 @@ class FetchSessionTest {
     assertEquals(Errors.INVALID_FETCH_SESSION_EPOCH,
       context6.updateAndGenerateResponseData(respData2).error())
 
+    // Test generating a throttled response for the incremental fetch session
+    val reqData7 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+    val context7 = fetchManager.newContext(
+      new JFetchMetadata(resp2.sessionId(), 2), reqData7, EMPTY_PART_LIST, false)
+    val resp7 = context7.getThrottledResponse(100)
+    assertEquals(Errors.NONE, resp7.error())
+    assertEquals(resp2.sessionId(), resp7.sessionId())
+    assertEquals(100, resp7.throttleTimeMs())
+
     // Close the incremental fetch session.
     val prevSessionId = resp5.sessionId
     var nextSessionId = prevSessionId
     do {
-      val reqData7 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
-      reqData7.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(0, 0, 100))
-      reqData7.put(new TopicPartition("bar", 1), new FetchRequest.PartitionData(10, 0, 100))
-      val context7 = fetchManager.newContext(
-        new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData7, EMPTY_PART_LIST, false)
-      assertEquals(classOf[SessionlessFetchContext], context7.getClass)
+      val reqData8 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+      reqData8.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(0, 0, 100))
+      reqData8.put(new TopicPartition("bar", 1), new FetchRequest.PartitionData(10, 0, 100))
+      val context8 = fetchManager.newContext(
+        new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData8, EMPTY_PART_LIST, false)
+      assertEquals(classOf[SessionlessFetchContext], context8.getClass)
       assertEquals(0, cache.size())
-      val respData7 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
-      respData7.put(new TopicPartition("bar", 0),
+      val respData8 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+      respData8.put(new TopicPartition("bar", 0),
         new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
-      respData7.put(new TopicPartition("bar", 1),
+      respData8.put(new TopicPartition("bar", 1),
         new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
-      val resp7 = context7.updateAndGenerateResponseData(respData7)
-      assertEquals(Errors.NONE, resp7.error())
-      nextSessionId = resp7.sessionId()
+      val resp8 = context8.updateAndGenerateResponseData(respData8)
+      assertEquals(Errors.NONE, resp8.error())
+      nextSessionId = resp8.sessionId()
     } while (nextSessionId == prevSessionId)
   }
 

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.