You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/22 20:27:00 UTC

[jira] [Commented] (KAFKA-1548) Refactor the "replica_id" in requests

    [ https://issues.apache.org/jira/browse/KAFKA-1548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301924#comment-16301924 ] 

ASF GitHub Bot commented on KAFKA-1548:
---------------------------------------

guozhangwang closed pull request #2137: KAFKA-1548 Refactor the "replica_id" in requests
URL: https://github.com/apache/kafka/pull/2137
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 00897db78f0..a08a9846f81 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -185,10 +185,6 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
 
   def isFromFollower = Request.isValidBrokerId(replicaId)
 
-  def isFromOrdinaryConsumer = replicaId == Request.OrdinaryConsumerId
-
-  def isFromLowLevelConsumer = replicaId == Request.DebuggingConsumerId
-
   def numPartitions = requestInfo.size
 
   override def toString: String = {
@@ -210,7 +206,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
     fetchRequest.append("; Version: " + versionId)
     fetchRequest.append("; CorrelationId: " + correlationId)
     fetchRequest.append("; ClientId: " + clientId)
-    fetchRequest.append("; ReplicaId: " + replicaId)
+    fetchRequest.append("; " + Request.describe(replicaId))
     fetchRequest.append("; MaxWait: " + maxWait + " ms")
     fetchRequest.append("; MinBytes: " + minBytes + " bytes")
     fetchRequest.append("; MaxBytes:" + maxBytes + " bytes")
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 416dd73c708..037a5d22381 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -106,9 +106,6 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
       )
     })
 
-  def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
-  def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
-
   override def toString: String = {
     describe(true)
   }
@@ -127,7 +124,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
     offsetRequest.append("; Version: " + versionId)
     offsetRequest.append("; CorrelationId: " + correlationId)
     offsetRequest.append("; ClientId: " + clientId)
-    offsetRequest.append("; ReplicaId: " + replicaId)
+    offsetRequest.append("; " + Request.describe(replicaId))
     if(details)
       offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetRequest.toString()
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index d013047a5fc..d8599db10eb 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -27,6 +27,13 @@ object Request {
 
   // Broker ids are non-negative int.
   def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0
+
+  def describe(replica_id: Int): String  = replica_id match {
+    case OrdinaryConsumerId => "Follower Replica Id"
+    case DebuggingConsumerId => "Tooling Consumer"
+    case _ if isValidBrokerId(replica_id) => "Normal Consumer"
+    case _ => "Invalid"
+  }
 }
 
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Refactor the "replica_id" in requests
> -------------------------------------
>
>                 Key: KAFKA-1548
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1548
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Balint Molnar
>            Priority: Minor
>              Labels: newbie
>             Fix For: 1.1.0
>
>
> Today in many requests like fetch and offset we have a integer replica_id field, if the request is from a follower consumer it is the broker id from that follower replica, if it is from a regular consumer it could be one of the two values: "-1" for ordinary consumer, or "-2" for debugging consumer. 
> Hence this replica_id field is used in two folds:
> 1) Logging for trouble shooting in request logs, which can be helpful only when this is from a follower replica, 
> 2) Deciding if it is from the consumer or a replica to logically handle the request in different ways. For this purpose we do not really care about the actually id value.
> We probably would like to do the following improvements:
> 1) Rename "replica_id" to sth. less confusing?
> 2) Change the request.toString() function based on the replica_id, whether it is a positive integer (meaning from a broker replica fetcher) or -1/-2 (meaning from a regular consumer).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)