You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:13 UTC

[28/36] git commit: kafka-1017; High number of open file handles in 0.8 producer; patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao

kafka-1017; High number of open file handles in 0.8 producer; patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d217f4cc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d217f4cc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d217f4cc

Branch: refs/heads/trunk
Commit: d217f4cc276eee021a0d756d3390b633e43f7115
Parents: ceb55ca
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Thu Aug 22 09:52:48 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Aug 22 09:52:48 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/producer/Producer.scala    | 33 +++++----
 .../producer/async/DefaultEventHandler.scala    | 77 ++++++++++----------
 2 files changed, 56 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d217f4cc/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index bb16a29..f582919 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -33,16 +33,17 @@ class Producer[K,V](val config: ProducerConfig,
   private val hasShutdown = new AtomicBoolean(false)
   private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
 
-  private val random = new Random
   private var sync: Boolean = true
   private var producerSendThread: ProducerSendThread[K,V] = null
+  private val lock = new Object()
+
   config.producerType match {
     case "sync" =>
     case "async" =>
       sync = false
       producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                        queue,
-                                                       eventHandler, 
+                                                       eventHandler,
                                                        config.queueBufferingMaxMs,
                                                        config.batchNumMessages,
                                                        config.clientId)
@@ -67,12 +68,14 @@ class Producer[K,V](val config: ProducerConfig,
    * @param messages the producer data object that encapsulates the topic, key and message data
    */
   def send(messages: KeyedMessage[K,V]*) {
-    if (hasShutdown.get)
-      throw new ProducerClosedException
-    recordStats(messages)
-    sync match {
-      case true => eventHandler.handle(messages)
-      case false => asyncSend(messages)
+    lock synchronized {
+      if (hasShutdown.get)
+        throw new ProducerClosedException
+      recordStats(messages)
+      sync match {
+        case true => eventHandler.handle(messages)
+        case false => asyncSend(messages)
+      }
     }
   }
 
@@ -119,12 +122,14 @@ class Producer[K,V](val config: ProducerConfig,
    * the zookeeper client connection if one exists
    */
   def close() = {
-    val canShutdown = hasShutdown.compareAndSet(false, true)
-    if(canShutdown) {
-      info("Shutting down producer")
-      if (producerSendThread != null)
-        producerSendThread.shutdown
-      eventHandler.close
+    lock synchronized {
+      val canShutdown = hasShutdown.compareAndSet(false, true)
+      if(canShutdown) {
+        info("Shutting down producer")
+        if (producerSendThread != null)
+          producerSendThread.shutdown
+        eventHandler.close
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d217f4cc/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index f71a242..2e36d3b 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -40,8 +40,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   val correlationId = new AtomicInteger(0)
   val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
 
-  private val lock = new Object()
-
   private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
   private var lastTopicMetadataRefreshTime = 0L
   private val topicMetadataToRefresh = Set.empty[String]
@@ -51,47 +49,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
 
   def handle(events: Seq[KeyedMessage[K,V]]) {
-    lock synchronized {
-      sendPartitionPerTopicCache.clear()
-      val serializedData = serialize(events)
-      serializedData.foreach {
-        keyed =>
-          val dataSize = keyed.message.payloadSize
-          producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
-          producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
-      }
-      var outstandingProduceRequests = serializedData
-      var remainingRetries = config.messageSendMaxRetries + 1
-      val correlationIdStart = correlationId.get()
-      debug("Handling %d events".format(events.size))
-      while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
-        topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
-        if (topicMetadataRefreshInterval >= 0 &&
-            SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
-          Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
-          topicMetadataToRefresh.clear
-          lastTopicMetadataRefreshTime = SystemTime.milliseconds
-        }
-        outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
-        if (outstandingProduceRequests.size > 0) {
-          info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
-          // back off and update the topic metadata cache before attempting another send operation
-          Thread.sleep(config.retryBackoffMs)
-          // get topics of the outstanding produce requests and refresh metadata for those
-          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
-          remainingRetries -= 1
-          producerStats.resendRate.mark()
-        }
+    val serializedData = serialize(events)
+    serializedData.foreach {
+      keyed =>
+        val dataSize = keyed.message.payloadSize
+        producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
+        producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
+    }
+    var outstandingProduceRequests = serializedData
+    var remainingRetries = config.messageSendMaxRetries + 1
+    val correlationIdStart = correlationId.get()
+    debug("Handling %d events".format(events.size))
+    while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
+      if (topicMetadataRefreshInterval >= 0 &&
+          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
+        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
+        sendPartitionPerTopicCache.clear()
+        topicMetadataToRefresh.clear
+        lastTopicMetadataRefreshTime = SystemTime.milliseconds
       }
-      if(outstandingProduceRequests.size > 0) {
-        producerStats.failedSendRate.mark()
-        val correlationIdEnd = correlationId.get()
-        error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
-          .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
-          correlationIdStart, correlationIdEnd-1))
-        throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
+      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
+      if (outstandingProduceRequests.size > 0) {
+        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
+        // back off and update the topic metadata cache before attempting another send operation
+        Thread.sleep(config.retryBackoffMs)
+        // get topics of the outstanding produce requests and refresh metadata for those
+        Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
+        sendPartitionPerTopicCache.clear()
+        remainingRetries -= 1
+        producerStats.resendRate.mark()
       }
     }
+    if(outstandingProduceRequests.size > 0) {
+      producerStats.failedSendRate.mark()
+      val correlationIdEnd = correlationId.get()
+      error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
+        .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
+        correlationIdStart, correlationIdEnd-1))
+      throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
+    }
   }
 
   private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {