You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/06/11 17:35:59 UTC
[kafka] branch 2.0 updated: KAFKA-6946;
Keep the session id for incremental fetch when fetch responses are
throttled
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new bde02e7 KAFKA-6946; Keep the session id for incremental fetch when fetch responses are throttled
bde02e7 is described below
commit bde02e760315eb10abe3697d08226b2cd6ba88fc
Author: Jon Lee <jo...@linkedin.com>
AuthorDate: Mon Jun 11 10:35:30 2018 -0700
KAFKA-6946; Keep the session id for incremental fetch when fetch responses are throttled
Currently, a throttled fetch response is returned with INVALID_SESSION_ID, which causes dropping the current fetch session if incremental fetch is in progress. This patch fixes this by returning the correct session id.
Author: Jon Lee <jo...@linkedin.com>
Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Dong Lin <li...@gmail.com>
Closes #5164 from jonlee2/KAFKA-6946
(cherry picked from commit 7d85785d3de7640ca0ee61e7110a32286e328343)
Signed-off-by: Dong Lin <li...@gmail.com>
---
.../src/main/scala/kafka/server/FetchSession.scala | 21 ++++++++++++++
core/src/main/scala/kafka/server/KafkaApis.scala | 3 +-
.../scala/unit/kafka/server/FetchSessionTest.scala | 33 ++++++++++++++--------
3 files changed, 43 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 7a47780..68f79ca 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -290,6 +290,12 @@ trait FetchContext extends Logging {
def partitionsToLogString(partitions: util.Collection[TopicPartition]): String =
FetchSession.partitionsToLogString(partitions, isTraceEnabled)
+
+ /**
+ * Return an empty throttled response due to quota violation.
+ */
+ def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] =
+ new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, INVALID_SESSION_ID)
}
/**
@@ -474,6 +480,21 @@ class IncrementalFetchContext(private val time: Time,
}
}
}
+
+ override def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] = {
+ session.synchronized {
+ // Check to make sure that the session epoch didn't change in between
+ // creating this fetch context and generating this response.
+ val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+ if (session.epoch != expectedEpoch) {
+ info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " +
+ s"got ${session.epoch}. Possible duplicate request.")
+ new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new FetchSession.RESP_MAP, throttleTimeMs, session.id)
+ } else {
+ new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, session.id)
+ }
+ }
+ }
}
case class LastUsedKey(val lastUsedMs: Long,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7a39c12..874a209 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -656,8 +656,7 @@ class KafkaApis(val requestChannel: RequestChannel,
quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
}
// If throttling is required, return an empty response.
- unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition,
- FetchResponse.PartitionData[Records]](), maxThrottleTimeMs, INVALID_SESSION_ID)
+ unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
} else {
// Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 84efa6b..b79692d 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -201,25 +201,34 @@ class FetchSessionTest {
assertEquals(Errors.INVALID_FETCH_SESSION_EPOCH,
context6.updateAndGenerateResponseData(respData2).error())
+ // Test generating a throttled response for the incremental fetch session
+ val reqData7 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+ val context7 = fetchManager.newContext(
+ new JFetchMetadata(resp2.sessionId(), 2), reqData7, EMPTY_PART_LIST, false)
+ val resp7 = context7.getThrottledResponse(100)
+ assertEquals(Errors.NONE, resp7.error())
+ assertEquals(resp2.sessionId(), resp7.sessionId())
+ assertEquals(100, resp7.throttleTimeMs())
+
// Close the incremental fetch session.
val prevSessionId = resp5.sessionId
var nextSessionId = prevSessionId
do {
- val reqData7 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
- reqData7.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(0, 0, 100))
- reqData7.put(new TopicPartition("bar", 1), new FetchRequest.PartitionData(10, 0, 100))
- val context7 = fetchManager.newContext(
- new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData7, EMPTY_PART_LIST, false)
- assertEquals(classOf[SessionlessFetchContext], context7.getClass)
+ val reqData8 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+ reqData8.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(0, 0, 100))
+ reqData8.put(new TopicPartition("bar", 1), new FetchRequest.PartitionData(10, 0, 100))
+ val context8 = fetchManager.newContext(
+ new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData8, EMPTY_PART_LIST, false)
+ assertEquals(classOf[SessionlessFetchContext], context8.getClass)
assertEquals(0, cache.size())
- val respData7 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
- respData7.put(new TopicPartition("bar", 0),
+ val respData8 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+ respData8.put(new TopicPartition("bar", 0),
new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
- respData7.put(new TopicPartition("bar", 1),
+ respData8.put(new TopicPartition("bar", 1),
new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
- val resp7 = context7.updateAndGenerateResponseData(respData7)
- assertEquals(Errors.NONE, resp7.error())
- nextSessionId = resp7.sessionId()
+ val resp8 = context8.updateAndGenerateResponseData(respData8)
+ assertEquals(Errors.NONE, resp8.error())
+ nextSessionId = resp8.sessionId()
} while (nextSessionId == prevSessionId)
}
--
To stop receiving notification emails like this one, please contact
lindong@apache.org.