You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/01/11 23:06:55 UTC
[1/2] git commit: KAFKA-683 Fix correlation id in all requests sent
to kafka; reviewed by Jun Rao
KAFKA-683 Fix correlation id in all requests sent to kafka; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/03eb903c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/03eb903c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/03eb903c
Branch: refs/heads/0.8
Commit: 03eb903ce223ab55c5acbcf4243ce805aaaf4fad
Parents: c12608c
Author: Neha Narkhede <ne...@apache.org>
Authored: Fri Jan 11 14:06:00 2013 -0800
Committer: Neha Narkhede <ne...@apache.org>
Committed: Fri Jan 11 14:06:00 2013 -0800
----------------------------------------------------------------------
config/log4j.properties | 44 +++++++++++----
.../main/scala/kafka/api/LeaderAndIsrRequest.scala | 5 +-
.../src/main/scala/kafka/api/ProducerRequest.scala | 1 -
.../main/scala/kafka/api/StopReplicaRequest.scala | 4 +-
.../scala/kafka/api/TopicMetadataRequest.scala | 19 ++++---
core/src/main/scala/kafka/client/ClientUtils.scala | 13 +++--
.../kafka/consumer/ConsumerFetcherThread.scala | 2 +-
.../controller/ControllerChannelManager.scala | 6 +-
.../scala/kafka/controller/KafkaController.scala | 4 +-
.../kafka/controller/PartitionStateMachine.scala | 5 +-
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../scala/kafka/javaapi/TopicMetadataRequest.scala | 11 ++-
core/src/main/scala/kafka/log/FileMessageSet.scala | 1 +
core/src/main/scala/kafka/log/Log.scala | 29 +++++++--
core/src/main/scala/kafka/log/OffsetIndex.scala | 12 +++-
.../main/scala/kafka/network/RequestChannel.scala | 11 +++-
.../main/scala/kafka/network/SocketServer.scala | 2 +-
.../scala/kafka/producer/BrokerPartitionInfo.scala | 8 +-
.../kafka/producer/async/DefaultEventHandler.scala | 34 +++++++----
.../scala/kafka/server/AbstractFetcherThread.scala | 6 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 38 +++++++------
.../api/RequestResponseSerializationTest.scala | 6 +-
.../unit/kafka/integration/TopicMetadataTest.scala | 8 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 12 ++--
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
.../scala/kafka/perf/ProducerPerformance.scala | 1 +
26 files changed, 179 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index 3b13181..e58c7cd 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -4,30 +4,52 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=INFO, stdout
+log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
-#log4j.appender.fileAppender=org.apache.log4j.FileAppender
-#log4j.appender.fileAppender.File=kafka-request.log
-#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
# Turn on all our debugging info
-log4j.logger.kafka.perf=DEBUG
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
-log4j.logger.kafka.perf=DEBUG
-log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=TRACE, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, stateChangeAppender
+log4j.additivity.kafka.controller=false
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 9759949..99af002 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -120,8 +120,9 @@ case class LeaderAndIsrRequest (versionId: Short,
controllerEpoch: Int)
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
- def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
- this(LeaderAndIsrRequest.CurrentVersion, 0, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
+ def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker],
+ controllerEpoch: Int, correlationId: Int) = {
+ this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
partitionStateInfos, liveBrokers, controllerEpoch)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 9edc4dd..ffa96a6 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -23,7 +23,6 @@ import scala.collection.Map
import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
-
object ProducerRequest {
val CurrentVersion = 0.shortValue
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index deb195f..9fe849b 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -59,8 +59,8 @@ case class StopReplicaRequest(versionId: Short,
controllerEpoch: Int)
extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
- def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
- this(StopReplicaRequest.CurrentVersion, 0, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
+ def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = {
+ this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
deletePartitions, partitions, controllerEpoch)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index e659532..fe1170f 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -39,29 +39,32 @@ object TopicMetadataRequest extends Logging {
val topics = new ListBuffer[String]()
for(i <- 0 until numTopics)
topics += readShortString(buffer)
- val topicsList = topics.toList
- new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId)
+ new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList)
}
}
case class TopicMetadataRequest(val versionId: Short,
+ val correlationId: Int,
val clientId: String,
- val topics: Seq[String],
- val correlationId: Int)
+ val topics: Seq[String])
extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
-def this(topics: Seq[String]) =
- this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, 0)
+ def this(topics: Seq[String], correlationId: Int) =
+ this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
- buffer.putInt(correlationId) // correlation id not set yet
+ buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(topics.size)
topics.foreach(topic => writeShortString(buffer, topic))
}
def sizeInBytes(): Int = {
- 2 + 4 + shortStringLength(clientId) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
+ 2 + /* version id */
+ 4 + /* correlation id */
+ shortStringLength(clientId) + /* client id */
+ 4 + /* number of topics */
+ topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index c61833b..968a91f 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -20,22 +20,23 @@ object ClientUtils extends Logging{
* @param producerConfig The producer's config
* @return topic metadata response
*/
- def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig): TopicMetadataResponse = {
+ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
- val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
+ val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
- info("Fetching metadata for topic %s".format(topics))
+ info("Fetching metadata with correlation id %d for %d topic(s) %s".format(correlationId, topics.size, topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
}
catch {
case e =>
- warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
+ warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
+ .format(correlationId, topics, brokers(i).toString), e)
t = e
} finally {
i = i + 1
@@ -44,6 +45,8 @@ object ClientUtils extends Logging{
}
if(!fetchMetaDataSucceeded){
throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
+ } else {
+ debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
}
return topicMetadataResponse
}
@@ -60,7 +63,7 @@ object ClientUtils extends Logging{
props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
props.put("clientid", clientId)
val producerConfig = new ProducerConfig(props)
- fetchTopicMetadata(topics, brokers, producerConfig)
+ fetchTopicMetadata(topics, brokers, producerConfig, 0)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index c902e20..2ce024c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name,
- clientId = config.clientId,
+ clientId = config.clientId + "-" + name,
sourceBroker = sourceBroker,
socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketBufferSize,
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 8083cd5..e2ca1d6 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -184,13 +184,13 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
}
}
- def sendRequestsToBrokers(controllerEpoch: Int, liveBrokers: Set[Broker]) {
+ def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int, liveBrokers: Set[Broker]) {
leaderAndIsrRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
- val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch)
+ val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch, correlationId)
debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
sendRequest(broker, leaderAndIsrRequest, null)
}
@@ -203,7 +203,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
debug("The stop replica request (delete = %s) sent to broker %d is %s"
.format(deletePartitions, broker, replicas.mkString(",")))
sendRequest(broker, new StopReplicaRequest(deletePartitions,
- Set.empty[(String, Int)] ++ replicas, controllerEpoch), null)
+ Set.empty[(String, Int)] ++ replicas, controllerEpoch, correlationId), null)
}
}
m.clear()
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4840c0c..7b5d5c2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -34,6 +34,7 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import scala.Some
import kafka.common.TopicAndPartition
+import java.util.concurrent.atomic.AtomicInteger
class ControllerContext(val zkClient: ZkClient,
var controllerChannelManager: ControllerChannelManager = null,
@@ -42,6 +43,7 @@ class ControllerContext(val zkClient: ZkClient,
val brokerShutdownLock: Object = new Object,
var epoch: Int = KafkaController.InitialControllerEpoch - 1,
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1,
+ val correlationId: AtomicInteger = new AtomicInteger(0),
var allTopics: Set[String] = Set.empty,
var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
var allLeaders: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
@@ -186,7 +188,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
}
- brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 0278782..372793b 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -85,9 +85,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector)
}
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
} catch {
case e => error("Error while moving some partitions to the online state", e)
+ // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
}
}
@@ -104,7 +105,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
partitions.foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
}
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
}catch {
case e => error("Error while moving some partitions to %s state".format(targetState), e)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 1753947..1d5bac4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -83,7 +83,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
try {
brokerRequestBatch.newBatch()
replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
}catch {
case e => error("Error while moving some replicas to %s state".format(targetState), e)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index dbf04fd..3d92569 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -20,15 +20,18 @@ import kafka.api._
import java.nio.ByteBuffer
import scala.collection.JavaConversions
-class TopicMetadataRequest(val correlationId: Int,
- val versionId: Short,
+class TopicMetadataRequest(val versionId: Short,
+ val correlationId: Int,
val clientId: String,
val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
val underlying: kafka.api.TopicMetadataRequest =
- new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics), correlationId)
+ new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics))
def this(topics: java.util.List[String]) =
- this(0, kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+ this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+
+ def this(topics: java.util.List[String], correlationId: Int) =
+ this(kafka.api.TopicMetadataRequest.CurrentVersion, correlationId, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 7e7f344..5845bb6 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -44,6 +44,7 @@ class FileMessageSet private[kafka](val file: File,
private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
if (initChannelPositionToEnd) {
+ info("Creating or reloading log segment %s".format(file.getAbsolutePath))
/* set the file position to the last byte in the file */
channel.position(channel.size)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 66c07af..79db610 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -197,6 +197,7 @@ private[kafka] class Log(val dir: File,
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
*/
private def recoverSegment(segment: LogSegment) {
+ info("Recovering log segment %s".format(segment.messageSet.file.getAbsolutePath))
segment.index.truncate()
var validBytes = 0
var lastIndexEntry = 0
@@ -392,6 +393,10 @@ private[kafka] class Log(val dir: File,
*/
def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
lock synchronized {
+ debug("Garbage collecting log..")
+ debug("Segments of log %s : %s ".format(this.name, segments.view.mkString(",")))
+ debug("Index files for log %s: %s".format(this.name, segments.view.map(_.index.file.exists()).mkString(",")))
+ debug("Data files for log %s: %s".format(this.name, segments.view.map(_.messageSet.file.exists()).mkString(",")))
val view = segments.view
val deletable = view.takeWhile(predicate)
for(seg <- deletable)
@@ -426,11 +431,17 @@ private[kafka] class Log(val dir: File,
* Roll the log over if necessary
*/
private def maybeRoll(segment: LogSegment): LogSegment = {
- if ((segment.messageSet.sizeInBytes > maxLogFileSize) ||
- ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) ||
- segment.index.isFull)
+ if(segment.messageSet.sizeInBytes > maxLogFileSize) {
+ info("Rolling %s due to full data log".format(name))
roll()
- else
+ } else if((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) {
+ info("Rolling %s due to time based rolling".format(name))
+ roll()
+ } else if(segment.index.isFull) {
+ info("Rolling %s due to full index maxIndexSize = %d, entries = %d, maxEntries = %d"
+ .format(name, segment.index.maxIndexSize, segment.index.entries(), segment.index.maxEntries))
+ roll()
+ } else
segment
}
@@ -451,10 +462,10 @@ private[kafka] class Log(val dir: File,
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
for(file <- List(logFile, indexFile); if file.exists) {
- warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
+ warn("Newly rolled segment file " + file.getAbsolutePath + " already exists; deleting it first")
file.delete()
}
- debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
+ info("Rolling log '" + name + "' to " + logFile.getAbsolutePath + " and " + indexFile.getAbsolutePath)
segments.view.lastOption match {
case Some(segment) => segment.index.trimToValidSize()
case None =>
@@ -462,7 +473,7 @@ private[kafka] class Log(val dir: File,
val segmentsView = segments.view
if(segmentsView.size > 0 && segmentsView.last.start == newOffset)
- throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
+ throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists".format(dir.getName, newOffset))
val segment = new LogSegment(dir,
startOffset = newOffset,
@@ -555,6 +566,10 @@ private[kafka] class Log(val dir: File,
} else {
total += 1
}
+ if(segment.messageSet.file.exists())
+ error("Data log file %s still exists".format(segment.messageSet.file.getAbsolutePath))
+ if(segment.index.file.exists())
+ error("Index file %s still exists".format(segment.index.file.getAbsolutePath))
}
total
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 23adca1..43b3575 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -83,16 +83,19 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
Utils.swallow(raf.close())
}
}
-
- /* the maximum number of entries this index can hold */
- def maxEntries = mmap.limit / 8
-
+
/* the number of entries in the index */
private var size = new AtomicInteger(mmap.position / 8)
/* the last offset in the index */
var lastOffset = readLastOffset()
+ info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
+ .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
+
+ /* the maximum number of entries this index can hold */
+ def maxEntries = mmap.limit / 8
+
/**
* The last offset written to the index
*/
@@ -262,6 +265,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* Delete this index file
*/
def delete(): Boolean = {
+ info("Deleting index " + this.file.getAbsolutePath)
this.file.delete()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1f7124d..0e5b7cb 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -46,7 +46,12 @@ object RequestChannel extends Logging {
val requestId = buffer.getShort()
val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
buffer.rewind()
- trace("Received request: %s".format(requestObj))
+ buffer.getShort
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = ApiUtils.readShortString(buffer)
+ buffer.rewind()
+ trace("Received request v%d with correlation id %d from client %s: %s".format(versionId, correlationId, clientId, requestObj))
def updateRequestMetrics() {
val endTimeMs = SystemTime.milliseconds
@@ -75,8 +80,8 @@ object RequestChannel extends Logging {
m.responseSendTimeHist.update(responseSendTime)
m.totalTimeHist.update(totalTime)
}
- trace("Completed request: %s totalTime:%d queueTime:%d localTime:%d remoteTime:%d sendTime:%d"
- .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
+ trace("Completed request v%d with correlation id %d and client %s: %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d"
+ .format(versionId, correlationId, clientId, requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 2102fbf..e5dccd3 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -319,7 +319,7 @@ private[kafka] class Processor(val id: Int,
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds)
requestChannel.sendRequest(req)
- trace("Recieved request, sending for processing by handler: " + req)
+ trace("Received request, sending for processing by handler: " + req)
key.attach(null)
} else {
// more reading to be done
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index d58a063..b209a97 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -37,7 +37,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
* @return a sequence of (brokerId, numPartitions). Returns a zero-length
* sequence if no brokers are available.
*/
- def getBrokerPartitionInfo(topic: String): Seq[PartitionAndLeader] = {
+ def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
debug("Getting broker partition info for topic %s".format(topic))
// check if the cache has metadata for this topic
val topicMetadata = topicPartitionInfo.get(topic)
@@ -46,7 +46,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
case Some(m) => m
case None =>
// refresh the topic metadata cache
- updateInfo(Set(topic))
+ updateInfo(Set(topic), correlationId)
val topicMetadata = topicPartitionInfo.get(topic)
topicMetadata match {
case Some(m) => m
@@ -70,9 +70,9 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
* It updates the cache by issuing a get topic metadata request to a random broker.
* @param topics the topics for which the metadata is to be fetched
*/
- def updateInfo(topics: Set[String]) {
+ def updateInfo(topics: Set[String], correlationId: Int) {
var topicsMetadata: Seq[TopicMetadata] = Nil
- val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig)
+ val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
topicsMetadata = topicMetadataResponse.topicsMetadata
// throw partition specific exception
topicsMetadata.foreach(tmd =>{
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 24a9dc9..58f582f 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -37,7 +37,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val isSync = ("sync" == config.producerType)
val partitionCounter = new AtomicInteger(0)
- val correlationCounter = new AtomicInteger(0)
+ val correlationId = new AtomicInteger(0)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
private val lock = new Object()
@@ -60,11 +60,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1
+ val correlationIdStart = correlationId.get()
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
if (topicMetadataRefreshInterval >= 0 &&
SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
- Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet))
+ Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
topicMetadataToRefresh.clear
lastTopicMetadataRefreshTime = SystemTime.milliseconds
}
@@ -73,14 +74,15 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.producerRetryBackoffMs)
// get topics of the outstanding produce requests and refresh metadata for those
- Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
+ Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
remainingRetries -= 1
producerStats.resendRate.mark()
}
}
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
- error("Failed to send the following requests: " + outstandingProduceRequests)
+ val correlationIdEnd = correlationId.get()
+ error("Failed to send the following requests with correlation ids in [%d,%d]: %s".format(correlationIdStart, correlationIdEnd-1, outstandingProduceRequests))
throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
}
}
@@ -178,8 +180,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
- debug("Getting the number of broker partitions registered for topic: " + m.topic)
- val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic)
+ val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
debug("Broker partitions registered for topic: %s are %s"
.format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
val totalNumPartitions = topicPartitionsList.length
@@ -229,13 +230,17 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
- val producerRequest = new ProducerRequest(correlationCounter.getAndIncrement(), config.clientId, config.requiredAcks,
+ val currentCorrelationId = correlationId.getAndIncrement
+ val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requiredAcks,
config.requestTimeoutMs, messagesPerTopic)
+ var failedTopicPartitions = Seq.empty[TopicAndPartition]
try {
val syncProducer = producerPool.getProducer(brokerId)
+ debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
+ .format(currentCorrelationId, messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
val response = syncProducer.send(producerRequest)
- debug("Producer sent messages for topics %s to broker %d on %s:%d"
- .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
+ debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
+ .format(currentCorrelationId, messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
if (response.status.size != producerRequest.data.size)
throw new KafkaException("Incomplete response (%s) for producer request (%s)"
.format(response, producerRequest))
@@ -244,11 +249,16 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
}
- response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
- .map(partitionStatus => partitionStatus._1)
+ failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+ .map(partitionStatus => partitionStatus._1)
+ if(failedTopicPartitions.size > 0)
+ error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
+ .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
+ failedTopicPartitions
} catch {
case t: Throwable =>
- warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), t)
+ warn("Failed to send producer request with correlation id %d to broker %d with data %s"
+ .format(currentCorrelationId, brokerId, messagesPerTopic), t)
messagesPerTopic.keys.toSeq
}
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 96f0df6..bdb1d03 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -66,7 +66,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
override def doWork() {
- val fetchRequestuilder = new FetchRequestBuilder().
+ val fetchRequestBuilder = new FetchRequestBuilder().
clientId(clientId).
replicaId(fetcherBrokerId).
maxWait(maxWait).
@@ -78,14 +78,14 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
partitionMapCond.await()
partitionMap.foreach {
case((topicAndPartition, offset)) =>
- fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
+ fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
offset, fetchSize)
}
} finally {
partitionMapLock.unlock()
}
- val fetchRequest = fetchRequestuilder.build()
+ val fetchRequest = fetchRequestBuilder.build()
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5089a75..e2dfb3e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -70,7 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val apiRequest = request.requestObj.asInstanceOf[ProducerRequest]
val producerResponseStatus = apiRequest.data.map {
case (topicAndPartition, data) =>
- (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+ (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L))
}
val errorResponse = ProducerResponse(apiRequest.correlationId, producerResponseStatus)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
@@ -125,8 +125,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest)
- trace("Handling leader and ISR request " + leaderAndIsrRequest)
+ requestLogger.trace("Handling LeaderAndIsrRequest v%d with correlation id %d from client %s: %s"
+ .format(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, leaderAndIsrRequest.clientId, leaderAndIsrRequest.toString))
try {
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
@@ -142,8 +142,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleStopReplicaRequest(request: RequestChannel.Request) {
val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
- trace("Handling stop replica request " + stopReplicaRequest)
+ requestLogger.trace("Handling StopReplicaRequest v%d with correlation id %d from client %s: %s"
+ .format(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, stopReplicaRequest.clientId, stopReplicaRequest.toString))
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
@@ -175,10 +175,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
val sTime = SystemTime.milliseconds
if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling producer request " + request.toString)
- trace("Handling producer request " + request.toString)
+ requestLogger.trace("Handling ProducerRequest v%d with correlation id %d from client %s: %s"
+ .format(produceRequest.versionId, produceRequest.correlationId, produceRequest.clientId, produceRequest.toString))
- val localProduceResults = appendToLocalLog(produceRequest.data)
+ val localProduceResults = appendToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
@@ -236,7 +236,8 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* Helper method for handling a parsed producer request
*/
- private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult] = {
+ private def appendToLocalLog(producerRequest: ProducerRequest): Iterable[ProduceResult] = {
+ val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data
trace("Append [%s] to local log ".format(partitionAndData.toString))
partitionAndData.map {case (topicAndPartition, messages) =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
@@ -259,7 +260,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case e =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark()
- error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic, topicAndPartition.partition), e)
+ error("Error processing ProducerRequest with correlation id %d from client %s on %s:%d"
+ .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition.topic, topicAndPartition.partition), e)
new ProduceResult(topicAndPartition, e)
}
}
@@ -271,8 +273,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling fetch request " + fetchRequest.toString)
- trace("Handling fetch request " + fetchRequest.toString)
+ requestLogger.trace("Handling FetchRequest v%d with correlation id %d from client %s: %s"
+ .format(fetchRequest.versionId, fetchRequest.correlationId, fetchRequest.clientId, fetchRequest.toString))
if(fetchRequest.isFromFollower) {
maybeUpdatePartitionHw(fetchRequest)
@@ -293,7 +295,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if(fetchRequest.maxWait <= 0 ||
bytesReadable >= fetchRequest.minBytes ||
fetchRequest.numPartitions <= 0) {
- debug("Returning fetch response %s for fetch request with correlation id %d".format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId))
+ debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
+ .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
val response = new FetchResponse(fetchRequest.correlationId, dataRead)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
@@ -380,8 +383,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetRequest(request: RequestChannel.Request) {
val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling offset request " + offsetRequest.toString)
- trace("Handling offset request " + offsetRequest.toString)
+ requestLogger.trace("Handling OffsetRequest v%d with correlation id %d from client %s: %s"
+ .format(offsetRequest.versionId, offsetRequest.correlationId, offsetRequest.clientId, offsetRequest.toString))
val responseMap = offsetRequest.requestInfo.map(elem => {
val (topicAndPartition, partitionOffsetRequestInfo) = elem
@@ -420,8 +423,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling topic metadata request " + metadataRequest.toString())
- trace("Handling topic metadata request " + metadataRequest.toString())
+ requestLogger.trace("Handling TopicMetadataRequest v%d with correlation id %d from client %s: %s"
+ .format(metadataRequest.versionId, metadataRequest.correlationId, metadataRequest.clientId, metadataRequest.toString))
val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
val config = replicaManager.config
@@ -463,6 +466,7 @@ class KafkaApis(val requestChannel: RequestChannel,
topicsMetadata += topicAndMetadata
}
})
+ trace("Sending topic metadata for correlation id %d to client %s".format(metadataRequest.correlationId, metadataRequest.clientId))
topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 509b020..26f31ec 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -89,7 +89,7 @@ object SerializationTestUtils{
val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
- new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1)
+ new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1, 0)
}
def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
@@ -99,7 +99,7 @@ object SerializationTestUtils{
}
def createTestStopReplicaRequest() : StopReplicaRequest = {
- new StopReplicaRequest(controllerEpoch = 1, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
+ new StopReplicaRequest(controllerEpoch = 1, correlationId = 0, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
}
def createTestStopReplicaResponse() : StopReplicaResponse = {
@@ -138,7 +138,7 @@ object SerializationTestUtils{
}
def createTestTopicMetadataRequest: TopicMetadataRequest = {
- new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2), 1)
+ new TopicMetadataRequest(1, 1, "client 1", Seq(topic1, topic2))
}
def createTestTopicMetadataResponse: TopicMetadataResponse = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index f9bbfa9..6db63ba 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -51,7 +51,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
CreateTopicCommand.createTopic(zkClient, topic, 1)
// create a topic metadata request
- val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+ val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
topicMetadataRequest.writeTo(serializedMetadataRequest)
@@ -70,7 +70,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
0 -> configs.head.brokerId
)
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
- val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+ val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
@@ -89,7 +89,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
0 -> configs.head.brokerId
)
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
- val topicMetadataRequest = new TopicMetadataRequest(List())
+ val topicMetadataRequest = new TopicMetadataRequest(List(), 0)
val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
@@ -103,7 +103,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
// auto create topic
val topic = "test"
- val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+ val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 4767618..3e46dd7 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -191,7 +191,7 @@ class AsyncProducerTest extends JUnit3Suite {
producerPool = producerPool,
topicPartitionInfos = topicPartitionInfos)
- val topic1Broker1Data =
+ val topic1Broker1Data =
ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
@@ -409,12 +409,12 @@ class AsyncProducerTest extends JUnit3Suite {
// produce request for topic1 and partitions 0 and 1. Let the first request fail
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
- val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 0)
- val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 1)
+ val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 11)
+ val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 17)
val response1 = ProducerResponse(0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
(TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
- val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 2)
+ val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 21)
val response2 = ProducerResponse(0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
@@ -424,9 +424,7 @@ class AsyncProducerTest extends JUnit3Suite {
EasyMock.replay(mockSyncProducer)
val producerPool = EasyMock.createMock(classOf[ProducerPool])
- EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
- EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
- EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
+ EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(4)
EasyMock.expect(producerPool.close())
EasyMock.replay(producerPool)
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index fcdd26e..129bc56 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -131,7 +131,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
leaderAndIsr.put((topic, partitionId),
new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
- val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch)
+ val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch, 0)
controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
http://git-wip-us.apache.org/repos/asf/kafka/blob/03eb903c/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index a9a5f07..0367af2 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -197,6 +197,7 @@ object ProducerPerformance extends Logging {
props.put("batch.size", config.batchSize.toString)
props.put("queue.enqueueTimeout.ms", "-1")
}
+ props.put("clientid", "ProducerPerformance")
props.put("producer.request.required.acks", config.producerRequestRequiredAcks.toString)
props.put("producer.request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("producer.num.retries", config.producerNumRetries.toString)