You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/16 06:26:54 UTC
git commit: Disallow clients to set replicaId in FetchRequest;
kafka-699; patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 de1a4d727 -> 777f66220
Disallow clients to set replicaId in FetchRequest; kafka-699; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/777f6622
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/777f6622
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/777f6622
Branch: refs/heads/0.8
Commit: 777f66220153a64cd33cd5484a64de556f4fa3a8
Parents: de1a4d7
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 15 21:26:45 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 15 21:26:45 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/api/FetchRequest.scala | 36 +++++++++++---
.../main/scala/kafka/javaapi/FetchRequest.scala | 6 +--
2 files changed, 30 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/777f6622/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index b4fb874..7968747 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -58,13 +58,13 @@ object FetchRequest {
}
}
-case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
- correlationId: Int = FetchRequest.DefaultCorrelationId,
- clientId: String = ConsumerConfig.DefaultClientId,
- replicaId: Int = Request.OrdinaryConsumerId,
- maxWait: Int = FetchRequest.DefaultMaxWait,
- minBytes: Int = FetchRequest.DefaultMinBytes,
- requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
+case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
+ correlationId: Int = FetchRequest.DefaultCorrelationId,
+ clientId: String = ConsumerConfig.DefaultClientId,
+ replicaId: Int = Request.OrdinaryConsumerId,
+ maxWait: Int = FetchRequest.DefaultMaxWait,
+ minBytes: Int = FetchRequest.DefaultMinBytes,
+ requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
/**
@@ -72,6 +72,23 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
*/
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+ /**
+ * Public constructor for the clients
+ */
+ def this(correlationId: Int,
+ clientId: String,
+ maxWait: Int,
+ minBytes: Int,
+ requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) {
+ this(versionId = FetchRequest.CurrentVersion,
+ correlationId = correlationId,
+ clientId = clientId,
+ replicaId = Request.OrdinaryConsumerId,
+ maxWait = maxWait,
+ minBytes= minBytes,
+ requestInfo = requestInfo)
+ }
+
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
@@ -144,7 +161,10 @@ class FetchRequestBuilder() {
this
}
- def replicaId(replicaId: Int): FetchRequestBuilder = {
+ /**
+ * Only for internal use. Clients shouldn't set replicaId.
+ */
+ private[kafka] def replicaId(replicaId: Int): FetchRequestBuilder = {
this.replicaId = replicaId
this
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/777f6622/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index 44d148e..b475240 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -18,14 +18,12 @@
package kafka.javaapi
import scala.collection.JavaConversions
-import kafka.api.PartitionFetchInfo
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
-
+import kafka.api.{Request, PartitionFetchInfo}
class FetchRequest(correlationId: Int,
clientId: String,
- replicaId: Int,
maxWait: Int,
minBytes: Int,
requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
@@ -35,7 +33,7 @@ class FetchRequest(correlationId: Int,
kafka.api.FetchRequest(
correlationId = correlationId,
clientId = clientId,
- replicaId = replicaId,
+ replicaId = Request.OrdinaryConsumerId,
maxWait = maxWait,
minBytes = minBytes,
requestInfo = scalaMap