You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/12 03:26:37 UTC

[7/11] git commit: KAFKA-683 Fix correlation id in all requests sent to kafka; reviewed by Jun Rao

KAFKA-683 Fix correlation id in all requests sent to kafka; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/03eb903c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/03eb903c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/03eb903c

Branch: refs/heads/trunk
Commit: 03eb903ce223ab55c5acbcf4243ce805aaaf4fad
Parents: c12608c
Author: Neha Narkhede <ne...@apache.org>
Authored: Fri Jan 11 14:06:00 2013 -0800
Committer: Neha Narkhede <ne...@apache.org>
Committed: Fri Jan 11 14:06:00 2013 -0800

----------------------------------------------------------------------
 config/log4j.properties                            |   44 +++++++++++----
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |    5 +-
 .../src/main/scala/kafka/api/ProducerRequest.scala |    1 -
 .../main/scala/kafka/api/StopReplicaRequest.scala  |    4 +-
 .../scala/kafka/api/TopicMetadataRequest.scala     |   19 ++++---
 core/src/main/scala/kafka/client/ClientUtils.scala |   13 +++--
 .../kafka/consumer/ConsumerFetcherThread.scala     |    2 +-
 .../controller/ControllerChannelManager.scala      |    6 +-
 .../scala/kafka/controller/KafkaController.scala   |    4 +-
 .../kafka/controller/PartitionStateMachine.scala   |    5 +-
 .../kafka/controller/ReplicaStateMachine.scala     |    2 +-
 .../scala/kafka/javaapi/TopicMetadataRequest.scala |   11 ++-
 core/src/main/scala/kafka/log/FileMessageSet.scala |    1 +
 core/src/main/scala/kafka/log/Log.scala            |   29 +++++++--
 core/src/main/scala/kafka/log/OffsetIndex.scala    |   12 +++-
 .../main/scala/kafka/network/RequestChannel.scala  |   11 +++-
 .../main/scala/kafka/network/SocketServer.scala    |    2 +-
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    8 +-
 .../kafka/producer/async/DefaultEventHandler.scala |   34 +++++++----
 .../scala/kafka/server/AbstractFetcherThread.scala |    6 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   38 +++++++------
 .../api/RequestResponseSerializationTest.scala     |    6 +-
 .../unit/kafka/integration/TopicMetadataTest.scala |    8 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    |   12 ++--
 .../unit/kafka/server/LeaderElectionTest.scala     |    2 +-
 .../scala/kafka/perf/ProducerPerformance.scala     |    1 +
 26 files changed, 179 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index 3b13181..e58c7cd 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -4,30 +4,52 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=INFO, stdout
+log4j.rootLogger=INFO, stdout 
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
 
-#log4j.appender.fileAppender=org.apache.log4j.FileAppender
-#log4j.appender.fileAppender.File=kafka-request.log
-#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
 # Turn on all our debugging info
-log4j.logger.kafka.perf=DEBUG
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
 #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
 
-log4j.logger.kafka.perf=DEBUG
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG 
+log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=TRACE, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, stateChangeAppender
+log4j.additivity.kafka.controller=false

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 9759949..99af002 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -120,8 +120,9 @@ case class LeaderAndIsrRequest (versionId: Short,
                                 controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
-    this(LeaderAndIsrRequest.CurrentVersion, 0, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker],
+           controllerEpoch: Int, correlationId: Int) = {
+    this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
       partitionStateInfos, liveBrokers, controllerEpoch)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 9edc4dd..ffa96a6 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -23,7 +23,6 @@ import scala.collection.Map
 import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
 
-
 object ProducerRequest {
   val CurrentVersion = 0.shortValue
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index deb195f..9fe849b 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -59,8 +59,8 @@ case class StopReplicaRequest(versionId: Short,
                               controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
 
-  def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
-    this(StopReplicaRequest.CurrentVersion, 0, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
+  def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = {
+    this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
          deletePartitions, partitions, controllerEpoch)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index e659532..fe1170f 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -39,29 +39,32 @@ object TopicMetadataRequest extends Logging {
     val topics = new ListBuffer[String]()
     for(i <- 0 until numTopics)
       topics += readShortString(buffer)
-    val topicsList = topics.toList
-    new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId)
+    new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList)
   }
 }
 
 case class TopicMetadataRequest(val versionId: Short,
+                                val correlationId: Int,
                                 val clientId: String,
-                                val topics: Seq[String],
-                                val correlationId: Int)
+                                val topics: Seq[String])
  extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
 
-def this(topics: Seq[String]) =
-  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, 0)
+  def this(topics: Seq[String], correlationId: Int) =
+    this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
-    buffer.putInt(correlationId) // correlation id not set yet
+    buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(topics.size)
     topics.foreach(topic => writeShortString(buffer, topic))
   }
 
   def sizeInBytes(): Int = {
-    2 + 4 + shortStringLength(clientId) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
+    2 +  /* version id */
+    4 + /* correlation id */
+    shortStringLength(clientId)  + /* client id */
+    4 + /* number of topics */
+    topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index c61833b..968a91f 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -20,22 +20,23 @@ object ClientUtils extends Logging{
    * @param producerConfig The producer's config
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
-    val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
+    val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
     while(i < brokers.size && !fetchMetaDataSucceeded) {
       val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
-      info("Fetching metadata for topic %s".format(topics))
+      info("Fetching metadata with correlation id %d for %d topic(s) %s".format(correlationId, topics.size, topics))
       try {
         topicMetadataResponse = producer.send(topicMetadataRequest)
         fetchMetaDataSucceeded = true
       }
       catch {
         case e =>
-          warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
+          warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
+            .format(correlationId, topics, brokers(i).toString), e)
           t = e
       } finally {
         i = i + 1
@@ -44,6 +45,8 @@ object ClientUtils extends Logging{
     }
     if(!fetchMetaDataSucceeded){
       throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
+    } else {
+      debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
     }
     return topicMetadataResponse
   }
@@ -60,7 +63,7 @@ object ClientUtils extends Logging{
     props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
     props.put("clientid", clientId)
     val producerConfig = new ProducerConfig(props)
-    fetchTopicMetadata(topics, brokers, producerConfig)
+    fetchTopicMetadata(topics, brokers, producerConfig, 0)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index c902e20..2ce024c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
-                                      clientId = config.clientId,
+                                      clientId = config.clientId + "-" + name,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
                                       socketBufferSize = config.socketBufferSize, 

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 8083cd5..e2ca1d6 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -184,13 +184,13 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
     }
   }
 
-  def sendRequestsToBrokers(controllerEpoch: Int, liveBrokers: Set[Broker]) {
+  def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int, liveBrokers: Set[Broker]) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch)
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch, correlationId)
       debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
@@ -203,7 +203,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
               debug("The stop replica request (delete = %s) sent to broker %d is %s"
                 .format(deletePartitions, broker, replicas.mkString(",")))
               sendRequest(broker, new StopReplicaRequest(deletePartitions,
-                Set.empty[(String, Int)] ++ replicas, controllerEpoch), null)
+                Set.empty[(String, Int)] ++ replicas, controllerEpoch, correlationId), null)
             }
         }
         m.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4840c0c..7b5d5c2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -34,6 +34,7 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import scala.Some
 import kafka.common.TopicAndPartition
+import java.util.concurrent.atomic.AtomicInteger
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
@@ -42,6 +43,7 @@ class ControllerContext(val zkClient: ZkClient,
                         val brokerShutdownLock: Object = new Object,
                         var epoch: Int = KafkaController.InitialControllerEpoch - 1,
                         var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1,
+                        val correlationId: AtomicInteger = new AtomicInteger(0),
                         var allTopics: Set[String] = Set.empty,
                         var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
                         var allLeaders: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
@@ -186,7 +188,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
             }
           }
       }
-      brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
 
       val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
       debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 0278782..372793b 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -85,9 +85,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
           handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
     } catch {
       case e => error("Error while moving some partitions to the online state", e)
+      // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
     }
   }
 
@@ -104,7 +105,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState), e)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 1753947..1d5bac4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -83,7 +83,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     try {
       brokerRequestBatch.newBatch()
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index dbf04fd..3d92569 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -20,15 +20,18 @@ import kafka.api._
 import java.nio.ByteBuffer
 import scala.collection.JavaConversions
 
-class TopicMetadataRequest(val correlationId: Int,
-                           val versionId: Short,
+class TopicMetadataRequest(val versionId: Short,
+                           val correlationId: Int,
                            val clientId: String,
                            val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
   val underlying: kafka.api.TopicMetadataRequest =
-    new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics), correlationId)
+    new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics))
 
   def this(topics: java.util.List[String]) =
-    this(0, kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+    this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+
+  def this(topics: java.util.List[String], correlationId: Int) =
+    this(kafka.api.TopicMetadataRequest.CurrentVersion, correlationId, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
 
   def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 7e7f344..5845bb6 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -44,6 +44,7 @@ class FileMessageSet private[kafka](val file: File,
   private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
 
   if (initChannelPositionToEnd) {
+    info("Creating or reloading log segment %s".format(file.getAbsolutePath))
     /* set the file position to the last byte in the file */
     channel.position(channel.size)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 66c07af..79db610 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -197,6 +197,7 @@ private[kafka] class Log(val dir: File,
    * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
    */
   private def recoverSegment(segment: LogSegment) {
+    info("Recovering log segment %s".format(segment.messageSet.file.getAbsolutePath))
     segment.index.truncate()
     var validBytes = 0
     var lastIndexEntry = 0
@@ -392,6 +393,10 @@ private[kafka] class Log(val dir: File,
    */
   def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
     lock synchronized {
+      debug("Garbage collecting log..")
+      debug("Segments of log %s : %s ".format(this.name, segments.view.mkString(",")))
+      debug("Index files for log %s: %s".format(this.name, segments.view.map(_.index.file.exists()).mkString(",")))
+      debug("Data files for log %s: %s".format(this.name, segments.view.map(_.messageSet.file.exists()).mkString(",")))
       val view = segments.view
       val deletable = view.takeWhile(predicate)
       for(seg <- deletable)
@@ -426,11 +431,17 @@ private[kafka] class Log(val dir: File,
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment): LogSegment = {
-    if ((segment.messageSet.sizeInBytes > maxLogFileSize) ||
-       ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) ||
-       segment.index.isFull)
+    if(segment.messageSet.sizeInBytes > maxLogFileSize) {
+      info("Rolling %s due to full data log".format(name))
       roll()
-    else
+    } else if((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) {
+      info("Rolling %s due to time based rolling".format(name))
+      roll()
+    } else if(segment.index.isFull) {
+      info("Rolling %s due to full index maxIndexSize = %d, entries = %d, maxEntries = %d"
+        .format(name, segment.index.maxIndexSize, segment.index.entries(), segment.index.maxEntries))
+      roll()
+    } else
       segment
   }
 
@@ -451,10 +462,10 @@ private[kafka] class Log(val dir: File,
     val logFile = logFilename(dir, newOffset)
     val indexFile = indexFilename(dir, newOffset)
     for(file <- List(logFile, indexFile); if file.exists) {
-      warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
+      warn("Newly rolled segment file " + file.getAbsolutePath + " already exists; deleting it first")
       file.delete()
     }
-    debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
+    info("Rolling log '" + name + "' to " + logFile.getAbsolutePath + " and " + indexFile.getAbsolutePath)
     segments.view.lastOption match {
       case Some(segment) => segment.index.trimToValidSize()
       case None => 
@@ -462,7 +473,7 @@ private[kafka] class Log(val dir: File,
 
     val segmentsView = segments.view
     if(segmentsView.size > 0 && segmentsView.last.start == newOffset)
-      throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
+      throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists".format(dir.getName, newOffset))
 
     val segment = new LogSegment(dir, 
                                  startOffset = newOffset,
@@ -555,6 +566,10 @@ private[kafka] class Log(val dir: File,
       } else {
         total += 1
       }
+      if(segment.messageSet.file.exists())
+        error("Data log file %s still exists".format(segment.messageSet.file.getAbsolutePath))
+      if(segment.index.file.exists())
+        error("Index file %s still exists".format(segment.index.file.getAbsolutePath))
     }
     total
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 23adca1..43b3575 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -83,16 +83,19 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
         Utils.swallow(raf.close())
       }
     }
-  
-  /* the maximum number of entries this index can hold */
-  def maxEntries = mmap.limit / 8
-  
+
   /* the number of entries in the index */
   private var size = new AtomicInteger(mmap.position / 8)
   
   /* the last offset in the index */
   var lastOffset = readLastOffset()
   
+  info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
+    .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
+
+  /* the maximum number of entries this index can hold */
+  def maxEntries = mmap.limit / 8
+
   /**
    * The last offset written to the index
    */
@@ -262,6 +265,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
    * Delete this index file
    */
   def delete(): Boolean = {
+    info("Deleting index " + this.file.getAbsolutePath)
     this.file.delete()
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1f7124d..0e5b7cb 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -46,7 +46,12 @@ object RequestChannel extends Logging {
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer.rewind()
-    trace("Received request: %s".format(requestObj))
+    buffer.getShort
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = ApiUtils.readShortString(buffer)
+    buffer.rewind()
+    trace("Received request v%d with correlation id %d from client %s: %s".format(versionId, correlationId, clientId, requestObj))
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds
@@ -75,8 +80,8 @@ object RequestChannel extends Logging {
              m.responseSendTimeHist.update(responseSendTime)
              m.totalTimeHist.update(totalTime)
       }
-      trace("Completed request: %s totalTime:%d queueTime:%d localTime:%d remoteTime:%d sendTime:%d"
-        .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
+      trace("Completed request v%d with correlation id %d and client %s: %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d"
+        .format(versionId, correlationId, clientId, requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 2102fbf..e5dccd3 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -319,7 +319,7 @@ private[kafka] class Processor(val id: Int,
     } else if(receive.complete) {
       val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds)
       requestChannel.sendRequest(req)
-      trace("Recieved request, sending for processing by handler: " + req)
+      trace("Received request, sending for processing by handler: " + req)
       key.attach(null)
     } else {
       // more reading to be done

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index d58a063..b209a97 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -37,7 +37,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
    * @return a sequence of (brokerId, numPartitions). Returns a zero-length
    * sequence if no brokers are available.
    */
-  def getBrokerPartitionInfo(topic: String): Seq[PartitionAndLeader] = {
+  def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
     debug("Getting broker partition info for topic %s".format(topic))
     // check if the cache has metadata for this topic
     val topicMetadata = topicPartitionInfo.get(topic)
@@ -46,7 +46,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
         case Some(m) => m
         case None =>
           // refresh the topic metadata cache
-          updateInfo(Set(topic))
+          updateInfo(Set(topic), correlationId)
           val topicMetadata = topicPartitionInfo.get(topic)
           topicMetadata match {
             case Some(m) => m
@@ -70,9 +70,9 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
    * It updates the cache by issuing a get topic metadata request to a random broker.
    * @param topics the topics for which the metadata is to be fetched
    */
-  def updateInfo(topics: Set[String]) {
+  def updateInfo(topics: Set[String], correlationId: Int) {
     var topicsMetadata: Seq[TopicMetadata] = Nil
-    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig)
+    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
     topicsMetadata = topicMetadataResponse.topicsMetadata
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 24a9dc9..58f582f 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -37,7 +37,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   val isSync = ("sync" == config.producerType)
 
   val partitionCounter = new AtomicInteger(0)
-  val correlationCounter = new AtomicInteger(0)
+  val correlationId = new AtomicInteger(0)
   val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
 
   private val lock = new Object()
@@ -60,11 +60,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.producerRetries + 1
+      val correlationIdStart = correlationId.get()
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
         topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
         if (topicMetadataRefreshInterval >= 0 &&
             SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
-          Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet))
+          Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
           topicMetadataToRefresh.clear
           lastTopicMetadataRefreshTime = SystemTime.milliseconds
         }
@@ -73,14 +74,15 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.producerRetryBackoffMs)
           // get topics of the outstanding produce requests and refresh metadata for those
-          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
+          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
           remainingRetries -= 1
           producerStats.resendRate.mark()
         }
       }
       if(outstandingProduceRequests.size > 0) {
         producerStats.failedSendRate.mark()
-        error("Failed to send the following requests: " + outstandingProduceRequests)
+        val correlationIdEnd = correlationId.get()
+        error("Failed to send the following requests with correlation ids in [%d,%d]: %s".format(correlationIdStart, correlationIdEnd-1, outstandingProduceRequests))
         throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
       }
     }
@@ -178,8 +180,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   }
 
   private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
-    debug("Getting the number of broker partitions registered for topic: " + m.topic)
-    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic)
+    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
     debug("Broker partitions registered for topic: %s are %s"
       .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
@@ -229,13 +230,17 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
       messagesPerTopic.keys.toSeq
     } else if(messagesPerTopic.size > 0) {
-      val producerRequest = new ProducerRequest(correlationCounter.getAndIncrement(), config.clientId, config.requiredAcks,
+      val currentCorrelationId = correlationId.getAndIncrement
+      val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requiredAcks,
         config.requestTimeoutMs, messagesPerTopic)
+      var failedTopicPartitions = Seq.empty[TopicAndPartition]
       try {
         val syncProducer = producerPool.getProducer(brokerId)
+        debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
+          .format(currentCorrelationId, messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
         val response = syncProducer.send(producerRequest)
-        debug("Producer sent messages for topics %s to broker %d on %s:%d"
-          .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
+        debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
+          .format(currentCorrelationId, messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
         if (response.status.size != producerRequest.data.size)
           throw new KafkaException("Incomplete response (%s) for producer request (%s)"
             .format(response, producerRequest))
@@ -244,11 +249,16 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
             trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
         }
-        response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
-          .map(partitionStatus => partitionStatus._1)
+        failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+                                    .map(partitionStatus => partitionStatus._1)
+        if(failedTopicPartitions.size > 0)
+          error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
+            .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
+        failedTopicPartitions
       } catch {
         case t: Throwable =>
-          warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), t)
+          warn("Failed to send producer request with correlation id %d to broker %d with data %s"
+            .format(currentCorrelationId, brokerId, messagesPerTopic), t)
           messagesPerTopic.keys.toSeq
       }
     } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 96f0df6..bdb1d03 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -66,7 +66,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   override def doWork() {
-    val fetchRequestuilder = new FetchRequestBuilder().
+    val fetchRequestBuilder = new FetchRequestBuilder().
             clientId(clientId).
             replicaId(fetcherBrokerId).
             maxWait(maxWait).
@@ -78,14 +78,14 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
         partitionMapCond.await()
       partitionMap.foreach {
         case((topicAndPartition, offset)) =>
-          fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
+          fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                            offset, fetchSize)
       }
     } finally {
       partitionMapLock.unlock()
     }
 
-    val fetchRequest = fetchRequestuilder.build()
+    val fetchRequest = fetchRequestBuilder.build()
     val partitionsWithError = new mutable.HashSet[TopicAndPartition]
     var response: FetchResponse = null
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5089a75..e2dfb3e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -70,7 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val apiRequest = request.requestObj.asInstanceOf[ProducerRequest]
             val producerResponseStatus = apiRequest.data.map {
               case (topicAndPartition, data) =>
-                (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+                (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L))
             }
             val errorResponse = ProducerResponse(apiRequest.correlationId, producerResponseStatus)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
@@ -125,8 +125,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
     val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest)
-    trace("Handling leader and ISR request " + leaderAndIsrRequest)
+      requestLogger.trace("Handling LeaderAndIsrRequest v%d with correlation id %d from client %s: %s"
+            .format(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, leaderAndIsrRequest.clientId, leaderAndIsrRequest.toString))
     try {
       val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
       val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
@@ -142,8 +142,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleStopReplicaRequest(request: RequestChannel.Request) {
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
-    trace("Handling stop replica request " + stopReplicaRequest)
+      requestLogger.trace("Handling StopReplicaRequest v%d with correlation id %d from client %s: %s"
+            .format(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, stopReplicaRequest.clientId, stopReplicaRequest.toString))
 
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
@@ -175,10 +175,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
     val sTime = SystemTime.milliseconds
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling producer request " + request.toString)
-    trace("Handling producer request " + request.toString)
+      requestLogger.trace("Handling ProducerRequest v%d with correlation id %d from client %s: %s"
+            .format(produceRequest.versionId, produceRequest.correlationId, produceRequest.clientId, produceRequest.toString))
 
-    val localProduceResults = appendToLocalLog(produceRequest.data)
+    val localProduceResults = appendToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
     val numPartitionsInError = localProduceResults.count(_.error.isDefined)
@@ -236,7 +236,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Helper method for handling a parsed producer request
    */
-  private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult] = {
+  private def appendToLocalLog(producerRequest: ProducerRequest): Iterable[ProduceResult] = {
+    val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data
     trace("Append [%s] to local log ".format(partitionAndData.toString))
     partitionAndData.map {case (topicAndPartition, messages) =>
       BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
@@ -259,7 +260,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case e =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark()
-          error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic, topicAndPartition.partition), e)
+          error("Error processing ProducerRequest with correlation id %d from client %s on %s:%d"
+            .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition.topic, topicAndPartition.partition), e)
           new ProduceResult(topicAndPartition, e)
        }
     }
@@ -271,8 +273,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling fetch request " + fetchRequest.toString)
-    trace("Handling fetch request " + fetchRequest.toString)
+      requestLogger.trace("Handling FetchRequest v%d with correlation id %d from client %s: %s"
+            .format(fetchRequest.versionId, fetchRequest.correlationId, fetchRequest.clientId, fetchRequest.toString))
 
     if(fetchRequest.isFromFollower) {
       maybeUpdatePartitionHw(fetchRequest)
@@ -293,7 +295,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     if(fetchRequest.maxWait <= 0 ||
        bytesReadable >= fetchRequest.minBytes ||
        fetchRequest.numPartitions <= 0) {
-      debug("Returning fetch response %s for fetch request with correlation id %d".format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId))
+      debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
+        .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
       val response = new FetchResponse(fetchRequest.correlationId, dataRead)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
@@ -380,8 +383,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetRequest(request: RequestChannel.Request) {
     val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling offset request " + offsetRequest.toString)
-    trace("Handling offset request " + offsetRequest.toString)
+      requestLogger.trace("Handling OffsetRequest v%d with correlation id %d from client %s: %s"
+            .format(offsetRequest.versionId, offsetRequest.correlationId, offsetRequest.clientId, offsetRequest.toString))
 
     val responseMap = offsetRequest.requestInfo.map(elem => {
       val (topicAndPartition, partitionOffsetRequestInfo) = elem
@@ -420,8 +423,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling topic metadata request " + metadataRequest.toString())
-    trace("Handling topic metadata request " + metadataRequest.toString())
+      requestLogger.trace("Handling TopicMetadataRequest v%d with correlation id %d from client %s: %s"
+            .format(metadataRequest.versionId, metadataRequest.correlationId, metadataRequest.clientId, metadataRequest.toString))
 
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = replicaManager.config
@@ -463,6 +466,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             topicsMetadata += topicAndMetadata
         }
       })
+    trace("Sending topic metadata for correlation id %d to client %s".format(metadataRequest.correlationId, metadataRequest.clientId))
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
     val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 509b020..26f31ec 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -89,7 +89,7 @@ object SerializationTestUtils{
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1)
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1, 0)
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
@@ -99,7 +99,7 @@ object SerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(controllerEpoch = 1, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(controllerEpoch = 1, correlationId = 0, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {
@@ -138,7 +138,7 @@ object SerializationTestUtils{
   }
 
   def createTestTopicMetadataRequest: TopicMetadataRequest = {
-    new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2), 1)
+    new TopicMetadataRequest(1, 1, "client 1", Seq(topic1, topic2))
   }
 
   def createTestTopicMetadataResponse: TopicMetadataResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index f9bbfa9..6db63ba 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -51,7 +51,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     CreateTopicCommand.createTopic(zkClient, topic, 1)
 
     // create a topic metadata request
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
 
     val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
     topicMetadataRequest.writeTo(serializedMetadataRequest)
@@ -70,7 +70,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
       0 -> configs.head.brokerId
     )
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
     val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
@@ -89,7 +89,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
       0 -> configs.head.brokerId
     )
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadataRequest = new TopicMetadataRequest(List())
+    val topicMetadataRequest = new TopicMetadataRequest(List(), 0)
     val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
@@ -103,7 +103,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     // auto create topic
     val topic = "test"
 
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
     val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 4767618..3e46dd7 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -191,7 +191,7 @@ class AsyncProducerTest extends JUnit3Suite {
                                                       producerPool = producerPool,
                                                       topicPartitionInfos = topicPartitionInfos)
 
-    val topic1Broker1Data = 
+    val topic1Broker1Data =
       ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
                                              new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
     val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
@@ -409,12 +409,12 @@ class AsyncProducerTest extends JUnit3Suite {
     // produce request for topic1 and partitions 0 and 1.  Let the first request fail
     // entirely.  The second request will succeed for partition 1 but fail for partition 0.
     // On the third try for partition 0, let it succeed.
-    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 0)
-    val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 1)
+    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 11)
+    val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 17)
     val response1 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
           (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
-    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 2)
+    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 21)
     val response2 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
@@ -424,9 +424,7 @@ class AsyncProducerTest extends JUnit3Suite {
     EasyMock.replay(mockSyncProducer)
 
     val producerPool = EasyMock.createMock(classOf[ProducerPool])
-    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
-    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
-    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
+    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(4)
     EasyMock.expect(producerPool.close())
     EasyMock.replay(producerPool)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index fcdd26e..129bc56 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -131,7 +131,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     leaderAndIsr.put((topic, partitionId),
       new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
     val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
-    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch)
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch, 0)
 
     controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
     TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)

http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index a9a5f07..0367af2 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -197,6 +197,7 @@ object ProducerPerformance extends Logging {
       props.put("batch.size", config.batchSize.toString)
       props.put("queue.enqueueTimeout.ms", "-1")
     }
+    props.put("clientid", "ProducerPerformance")
     props.put("producer.request.required.acks", config.producerRequestRequiredAcks.toString)
     props.put("producer.request.timeout.ms", config.producerRequestTimeoutMs.toString)
     props.put("producer.num.retries", config.producerNumRetries.toString)