You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/17 19:18:46 UTC

[4/7] git commit: Disallow clients to set replicaId in FetchRequest; kafka-699; patched by Jun Rao; reviewed by Neha Narkhede

Disallow clients to set replicaId in FetchRequest; kafka-699; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/777f6622
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/777f6622
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/777f6622

Branch: refs/heads/trunk
Commit: 777f66220153a64cd33cd5484a64de556f4fa3a8
Parents: de1a4d7
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 15 21:26:45 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 15 21:26:45 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala   |   36 +++++++++++---
 .../main/scala/kafka/javaapi/FetchRequest.scala    |    6 +--
 2 files changed, 30 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/777f6622/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index b4fb874..7968747 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -58,13 +58,13 @@ object FetchRequest {
   }
 }
 
-case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
-                        correlationId: Int = FetchRequest.DefaultCorrelationId,
-                        clientId: String = ConsumerConfig.DefaultClientId,
-                        replicaId: Int = Request.OrdinaryConsumerId,
-                        maxWait: Int = FetchRequest.DefaultMaxWait,
-                        minBytes: Int = FetchRequest.DefaultMinBytes,
-                        requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
+case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
+                                        correlationId: Int = FetchRequest.DefaultCorrelationId,
+                                        clientId: String = ConsumerConfig.DefaultClientId,
+                                        replicaId: Int = Request.OrdinaryConsumerId,
+                                        maxWait: Int = FetchRequest.DefaultMaxWait,
+                                        minBytes: Int = FetchRequest.DefaultMinBytes,
+                                        requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
         extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
 
   /**
@@ -72,6 +72,23 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
    */
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
+  /**
+   *  Public constructor for the clients
+   */
+  def this(correlationId: Int,
+           clientId: String,
+           maxWait: Int,
+           minBytes: Int,
+           requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) {
+    this(versionId = FetchRequest.CurrentVersion,
+         correlationId = correlationId,
+         clientId = clientId,
+         replicaId = Request.OrdinaryConsumerId,
+         maxWait = maxWait,
+         minBytes= minBytes,
+         requestInfo = requestInfo)
+  }
+
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
@@ -144,7 +161,10 @@ class FetchRequestBuilder() {
     this
   }
 
-  def replicaId(replicaId: Int): FetchRequestBuilder = {
+  /**
+   * Only for internal use. Clients shouldn't set replicaId.
+   */
+  private[kafka] def replicaId(replicaId: Int): FetchRequestBuilder = {
     this.replicaId = replicaId
     this
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/777f6622/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index 44d148e..b475240 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -18,14 +18,12 @@
 package kafka.javaapi
 
 import scala.collection.JavaConversions
-import kafka.api.PartitionFetchInfo
 import java.nio.ByteBuffer
 import kafka.common.TopicAndPartition
-
+import kafka.api.{Request, PartitionFetchInfo}
 
 class FetchRequest(correlationId: Int,
                    clientId: String,
-                   replicaId: Int,
                    maxWait: Int,
                    minBytes: Int,
                    requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
@@ -35,7 +33,7 @@ class FetchRequest(correlationId: Int,
     kafka.api.FetchRequest(
       correlationId = correlationId,
       clientId = clientId,
-      replicaId = replicaId,
+      replicaId = Request.OrdinaryConsumerId,
       maxWait = maxWait,
       minBytes = minBytes,
       requestInfo = scalaMap