You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Joel Koshy <jj...@gmail.com> on 2015/07/16 07:33:09 UTC
Re: kafka git commit: Hot fix for LIKAFKA-3492; force offset
commit/fetches to go to kafka regardless of request version
My bad - I accidentally pushed this to apache instead of our internal
linkedin repo! I will delete this.
On Thu, Jul 16, 2015 at 05:26:55AM +0000, jjkoshy@apache.org wrote:
> Repository: kafka
> Updated Branches:
> refs/heads/hotfix [created] a098de48e
>
>
> Hot fix for LIKAFKA-3492; force offset commit/fetches to go to kafka regardless of request version
>
>
> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
> Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a098de48
> Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a098de48
> Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a098de48
>
> Branch: refs/heads/hotfix
> Commit: a098de48e61bd0e713e88f3429f84cd57b5fb97d
> Parents: 9f80665
> Author: Joel Koshy <jj...@gmail.com>
> Authored: Wed Jul 15 22:26:17 2015 -0700
> Committer: Joel Koshy <jj...@gmail.com>
> Committed: Wed Jul 15 22:26:17 2015 -0700
>
> ----------------------------------------------------------------------
> core/src/main/scala/kafka/server/KafkaApis.scala | 12 +++++++-----
> .../test/scala/unit/kafka/server/OffsetCommitTest.scala | 2 +-
> 2 files changed, 8 insertions(+), 6 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/main/scala/kafka/server/KafkaApis.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
> index d63bc18..528d759 100644
> --- a/core/src/main/scala/kafka/server/KafkaApis.scala
> +++ b/core/src/main/scala/kafka/server/KafkaApis.scala
> @@ -159,7 +159,8 @@ class KafkaApis(val requestChannel: RequestChannel,
> requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
> }
>
> - if (offsetCommitRequest.versionId == 0) {
> + // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go to zookeeper)
> + /*if (offsetCommitRequest.versionId == 0) {
> // for version 0 always store offsets to ZK
> val responseInfo = offsetCommitRequest.requestInfo.map {
> case (topicAndPartition, metaAndError) => {
> @@ -181,7 +182,7 @@ class KafkaApis(val requestChannel: RequestChannel,
> }
>
> sendResponseCallback(responseInfo)
> - } else {
> + } else {*/
> // for version 1 and beyond store offsets in offset manager
>
> // compute the retention time based on the request version:
> @@ -222,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
> offsetCommitRequest.groupGenerationId,
> offsetData,
> sendResponseCallback)
> - }
> + //}
> }
>
> /**
> @@ -473,7 +474,8 @@ class KafkaApis(val requestChannel: RequestChannel,
> def handleOffsetFetchRequest(request: RequestChannel.Request) {
> val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
>
> - val response = if (offsetFetchRequest.versionId == 0) {
> + // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go to zookeeper)
> + val response = /*if (offsetFetchRequest.versionId == 0) {
> // version 0 reads offsets from ZK
> val responseInfo = offsetFetchRequest.requestInfo.map( topicAndPartition => {
> val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic)
> @@ -495,7 +497,7 @@ class KafkaApis(val requestChannel: RequestChannel,
> })
>
> OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
> - } else {
> + } else */ {
> // version 1 reads offsets from Kafka
> val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
> metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> index 528525b..b4a882b 100755
> --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> @@ -235,7 +235,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
> versionId = 0
> )
> assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
> - assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
> + //assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
>
> // committed offset should exist with fetch version 0
> assertEquals(1L, simpleConsumer.fetchOffsets(OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)), versionId = 0)).requestInfo.get(topicPartition).get.offset)
>