You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Joel Koshy <jj...@gmail.com> on 2015/07/16 07:33:09 UTC

Re: kafka git commit: Hot fix for LIKAFKA-3492; force offset commit/fetches to go to kafka regardless of request version

My bad - I accidentally pushed this to apache instead of our internal
linkedin repo! I will delete this.

On Thu, Jul 16, 2015 at 05:26:55AM +0000, jjkoshy@apache.org wrote:
> Repository: kafka
> Updated Branches:
>   refs/heads/hotfix [created] a098de48e
> 
> 
> Hot fix for LIKAFKA-3492; force offset commit/fetches to go to kafka regardless of request version
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
> Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a098de48
> Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a098de48
> Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a098de48
> 
> Branch: refs/heads/hotfix
> Commit: a098de48e61bd0e713e88f3429f84cd57b5fb97d
> Parents: 9f80665
> Author: Joel Koshy <jj...@gmail.com>
> Authored: Wed Jul 15 22:26:17 2015 -0700
> Committer: Joel Koshy <jj...@gmail.com>
> Committed: Wed Jul 15 22:26:17 2015 -0700
> 
> ----------------------------------------------------------------------
>  core/src/main/scala/kafka/server/KafkaApis.scala        | 12 +++++++-----
>  .../test/scala/unit/kafka/server/OffsetCommitTest.scala |  2 +-
>  2 files changed, 8 insertions(+), 6 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/main/scala/kafka/server/KafkaApis.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
> index d63bc18..528d759 100644
> --- a/core/src/main/scala/kafka/server/KafkaApis.scala
> +++ b/core/src/main/scala/kafka/server/KafkaApis.scala
> @@ -159,7 +159,8 @@ class KafkaApis(val requestChannel: RequestChannel,
>        requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
>      }
>  
> -    if (offsetCommitRequest.versionId == 0) {
> +    // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go to zookeeper)
> +    /*if (offsetCommitRequest.versionId == 0) {
>        // for version 0 always store offsets to ZK
>        val responseInfo = offsetCommitRequest.requestInfo.map {
>          case (topicAndPartition, metaAndError) => {
> @@ -181,7 +182,7 @@ class KafkaApis(val requestChannel: RequestChannel,
>        }
>  
>        sendResponseCallback(responseInfo)
> -    } else {
> +    } else {*/
>        // for version 1 and beyond store offsets in offset manager
>  
>        // compute the retention time based on the request version:
> @@ -222,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
>          offsetCommitRequest.groupGenerationId,
>          offsetData,
>          sendResponseCallback)
> -    }
> +    //}
>    }
>  
>    /**
> @@ -473,7 +474,8 @@ class KafkaApis(val requestChannel: RequestChannel,
>    def handleOffsetFetchRequest(request: RequestChannel.Request) {
>      val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
>  
> -    val response = if (offsetFetchRequest.versionId == 0) {
> +    // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go to zookeeper)
> +    val response = /*if (offsetFetchRequest.versionId == 0) {
>        // version 0 reads offsets from ZK
>        val responseInfo = offsetFetchRequest.requestInfo.map( topicAndPartition => {
>          val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic)
> @@ -495,7 +497,7 @@ class KafkaApis(val requestChannel: RequestChannel,
>        })
>  
>        OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
> -    } else {
> +    } else */ {
>        // version 1 reads offsets from Kafka
>        val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
>          metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
> 
> http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> index 528525b..b4a882b 100755
> --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> @@ -235,7 +235,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
>        versionId = 0
>      )
>      assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
> -    assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
> +    //assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
>  
>      // committed offset should exist with fetch version 0
>      assertEquals(1L, simpleConsumer.fetchOffsets(OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)), versionId = 0)).requestInfo.get(topicPartition).get.offset)
>