You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/30 02:01:25 UTC

git commit: correlationId is not set in FetchRequest in AbstractFetcherThread; patched by Jun Rao; reviewed by Neha Narkhede and Swapnil Ghike; kafka-738

Updated Branches:
  refs/heads/0.8 1fb3e8c03 -> b3a4fe9ce


correlationId is not set in FetchRequest in AbstractFetcherThread; patched by Jun Rao; reviewed by Neha Narkhede and Swapnil Ghike; kafka-738


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3a4fe9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3a4fe9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3a4fe9c

Branch: refs/heads/0.8
Commit: b3a4fe9cedca778a95b7f22054cb8f8ef6cf38c7
Parents: 1fb3e8c
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 29 17:00:49 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 29 17:00:49 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala   |    6 +++++-
 .../scala/kafka/server/AbstractFetcherThread.scala |   11 +++++------
 2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a4fe9c/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index ac74931..19c961e 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -201,5 +201,9 @@ class FetchRequestBuilder() {
     this
   }
 
-  def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
+  def build() = {
+    val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
+    requestMap.clear()
+    fetchRequest
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a4fe9c/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 0b286f0..1ccf578 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -46,6 +46,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
+  val fetchRequestBuilder = new FetchRequestBuilder().
+          clientId(clientId).
+          replicaId(fetcherBrokerId).
+          maxWait(maxWait).
+          minBytes(minBytes)
 
   /* callbacks to be defined in subclass */
 
@@ -65,12 +70,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   override def doWork() {
-    val fetchRequestBuilder = new FetchRequestBuilder().
-            clientId(clientId).
-            replicaId(fetcherBrokerId).
-            maxWait(maxWait).
-            minBytes(minBytes)
-
     partitionMapLock.lock()
     try {
       while (partitionMap.isEmpty)