You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:13 UTC
[28/36] git commit: kafka-1017;
High number of open file handles in 0.8 producer; patched by Swapnil Ghike;
reviewed by Neha Narkhede and Jun Rao
kafka-1017; High number of open file handles in 0.8 producer; patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d217f4cc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d217f4cc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d217f4cc
Branch: refs/heads/trunk
Commit: d217f4cc276eee021a0d756d3390b633e43f7115
Parents: ceb55ca
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Thu Aug 22 09:52:48 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Aug 22 09:52:48 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/producer/Producer.scala | 33 +++++----
.../producer/async/DefaultEventHandler.scala | 77 ++++++++++----------
2 files changed, 56 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d217f4cc/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index bb16a29..f582919 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -33,16 +33,17 @@ class Producer[K,V](val config: ProducerConfig,
private val hasShutdown = new AtomicBoolean(false)
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
- private val random = new Random
private var sync: Boolean = true
private var producerSendThread: ProducerSendThread[K,V] = null
+ private val lock = new Object()
+
config.producerType match {
case "sync" =>
case "async" =>
sync = false
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
queue,
- eventHandler,
+ eventHandler,
config.queueBufferingMaxMs,
config.batchNumMessages,
config.clientId)
@@ -67,12 +68,14 @@ class Producer[K,V](val config: ProducerConfig,
* @param messages the producer data object that encapsulates the topic, key and message data
*/
def send(messages: KeyedMessage[K,V]*) {
- if (hasShutdown.get)
- throw new ProducerClosedException
- recordStats(messages)
- sync match {
- case true => eventHandler.handle(messages)
- case false => asyncSend(messages)
+ lock synchronized {
+ if (hasShutdown.get)
+ throw new ProducerClosedException
+ recordStats(messages)
+ sync match {
+ case true => eventHandler.handle(messages)
+ case false => asyncSend(messages)
+ }
}
}
@@ -119,12 +122,14 @@ class Producer[K,V](val config: ProducerConfig,
* the zookeeper client connection if one exists
*/
def close() = {
- val canShutdown = hasShutdown.compareAndSet(false, true)
- if(canShutdown) {
- info("Shutting down producer")
- if (producerSendThread != null)
- producerSendThread.shutdown
- eventHandler.close
+ lock synchronized {
+ val canShutdown = hasShutdown.compareAndSet(false, true)
+ if(canShutdown) {
+ info("Shutting down producer")
+ if (producerSendThread != null)
+ producerSendThread.shutdown
+ eventHandler.close
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d217f4cc/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index f71a242..2e36d3b 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -40,8 +40,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val correlationId = new AtomicInteger(0)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
- private val lock = new Object()
-
private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
private var lastTopicMetadataRefreshTime = 0L
private val topicMetadataToRefresh = Set.empty[String]
@@ -51,47 +49,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
def handle(events: Seq[KeyedMessage[K,V]]) {
- lock synchronized {
- sendPartitionPerTopicCache.clear()
- val serializedData = serialize(events)
- serializedData.foreach {
- keyed =>
- val dataSize = keyed.message.payloadSize
- producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
- producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
- }
- var outstandingProduceRequests = serializedData
- var remainingRetries = config.messageSendMaxRetries + 1
- val correlationIdStart = correlationId.get()
- debug("Handling %d events".format(events.size))
- while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
- topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
- if (topicMetadataRefreshInterval >= 0 &&
- SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
- Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
- topicMetadataToRefresh.clear
- lastTopicMetadataRefreshTime = SystemTime.milliseconds
- }
- outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
- if (outstandingProduceRequests.size > 0) {
- info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
- // back off and update the topic metadata cache before attempting another send operation
- Thread.sleep(config.retryBackoffMs)
- // get topics of the outstanding produce requests and refresh metadata for those
- Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
- remainingRetries -= 1
- producerStats.resendRate.mark()
- }
+ val serializedData = serialize(events)
+ serializedData.foreach {
+ keyed =>
+ val dataSize = keyed.message.payloadSize
+ producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
+ producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
+ }
+ var outstandingProduceRequests = serializedData
+ var remainingRetries = config.messageSendMaxRetries + 1
+ val correlationIdStart = correlationId.get()
+ debug("Handling %d events".format(events.size))
+ while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+ topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
+ if (topicMetadataRefreshInterval >= 0 &&
+ SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
+ Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
+ sendPartitionPerTopicCache.clear()
+ topicMetadataToRefresh.clear
+ lastTopicMetadataRefreshTime = SystemTime.milliseconds
}
- if(outstandingProduceRequests.size > 0) {
- producerStats.failedSendRate.mark()
- val correlationIdEnd = correlationId.get()
- error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
- .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
- correlationIdStart, correlationIdEnd-1))
- throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
+ outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
+ if (outstandingProduceRequests.size > 0) {
+ info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
+ // back off and update the topic metadata cache before attempting another send operation
+ Thread.sleep(config.retryBackoffMs)
+ // get topics of the outstanding produce requests and refresh metadata for those
+ Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
+ sendPartitionPerTopicCache.clear()
+ remainingRetries -= 1
+ producerStats.resendRate.mark()
}
}
+ if(outstandingProduceRequests.size > 0) {
+ producerStats.failedSendRate.mark()
+ val correlationIdEnd = correlationId.get()
+ error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
+ .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
+ correlationIdStart, correlationIdEnd-1))
+ throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
+ }
}
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {