You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:03 UTC
[2/37] git commit: correlationId is not set in FetchRequest in
AbstractFetcherThread; patched by Jun Rao;
reviewed by Neha Narkhede and Swapnil Ghike; kafka-738
correlationId is not set in FetchRequest in AbstractFetcherThread; patched by Jun Rao; reviewed by Neha Narkhede and Swapnil Ghike; kafka-738
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3a4fe9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3a4fe9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3a4fe9c
Branch: refs/heads/trunk
Commit: b3a4fe9cedca778a95b7f22054cb8f8ef6cf38c7
Parents: 1fb3e8c
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 29 17:00:49 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 29 17:00:49 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/api/FetchRequest.scala | 6 +++++-
.../scala/kafka/server/AbstractFetcherThread.scala | 11 +++++------
2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a4fe9c/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index ac74931..19c961e 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -201,5 +201,9 @@ class FetchRequestBuilder() {
this
}
- def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
+ def build() = {
+ val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
+ requestMap.clear()
+ fetchRequest
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a4fe9c/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 0b286f0..1ccf578 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -46,6 +46,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
val fetcherStats = new FetcherStats(metricId)
val fetcherLagStats = new FetcherLagStats(metricId)
+ val fetchRequestBuilder = new FetchRequestBuilder().
+ clientId(clientId).
+ replicaId(fetcherBrokerId).
+ maxWait(maxWait).
+ minBytes(minBytes)
/* callbacks to be defined in subclass */
@@ -65,12 +70,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
override def doWork() {
- val fetchRequestBuilder = new FetchRequestBuilder().
- clientId(clientId).
- replicaId(fetcherBrokerId).
- maxWait(maxWait).
- minBytes(minBytes)
-
partitionMapLock.lock()
try {
while (partitionMap.isEmpty)