You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/08/13 22:09:02 UTC

git commit: kafka-1567; Metric memory leaking after closing the clients; patched by Jiangjie Qin; reviewed by Guozhang Wang and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 48cc2dc0e -> aa70a7d02


kafka-1567; Metric memory leaking after closing the clients; patched by Jiangjie Qin; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa70a7d0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa70a7d0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa70a7d0

Branch: refs/heads/trunk
Commit: aa70a7d02552585a83c62784b92b67ed8ae3a304
Parents: 48cc2dc
Author: Jiangjie Qin <be...@gmail.com>
Authored: Wed Aug 13 13:08:57 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Aug 13 13:08:57 2014 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerTopicStats.scala     |   4 +
 .../consumer/FetchRequestAndResponseStats.scala |  16 ++-
 .../consumer/ZookeeperConsumerConnector.scala   |  37 +++---
 .../scala/kafka/metrics/KafkaMetricsGroup.scala | 120 ++++++++++++++++++-
 .../main/scala/kafka/producer/Producer.scala    |  13 +-
 .../kafka/producer/ProducerRequestStats.scala   |   4 +
 .../scala/kafka/producer/ProducerStats.scala    |   4 +
 .../kafka/producer/ProducerTopicStats.scala     |   4 +
 8 files changed, 175 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index ff5f470..f63e6c5 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -54,4 +54,8 @@ object ConsumerTopicStatsRegistry {
   def getConsumerTopicStat(clientId: String) = {
     globalStats.getAndMaybePut(clientId)
   }
+
+  def removeConsumerTopicStat(clientId: String) {
+    globalStats.remove(clientId)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 875eeeb..5243f41 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -17,10 +17,11 @@
 
 package kafka.consumer
 
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-import kafka.utils.Pool
 import java.util.concurrent.TimeUnit
+
 import kafka.common.ClientIdAndBroker
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
+import kafka.utils.Pool
 
 class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
   val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
@@ -53,6 +54,17 @@ object FetchRequestAndResponseStatsRegistry {
   def getFetchRequestAndResponseStats(clientId: String) = {
     globalStats.getAndMaybePut(clientId)
   }
+
+  def removeConsumerFetchRequestAndResponseStats(clientId: String) {
+    val pattern = (clientId + "-ConsumerFetcherThread.*").r
+    val keys = globalStats.keys
+    for (key <- keys) {
+      pattern.findFirstIn(key) match {
+        case Some(_) => globalStats.remove(key)
+        case _ =>
+      }
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 65f518d..acfd064 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -17,28 +17,28 @@
 
 package kafka.consumer
 
+import java.net.InetAddress
+import java.util.UUID
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import locks.ReentrantLock
-import collection._
+import java.util.concurrent.locks.ReentrantLock
+
+import com.yammer.metrics.core.Gauge
+import kafka.api._
+import kafka.client.ClientUtils
 import kafka.cluster._
-import kafka.utils._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import java.net.InetAddress
-import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient}
-import org.apache.zookeeper.Watcher.Event.KeeperState
-import java.util.UUID
-import kafka.serializer._
-import kafka.utils.ZkUtils._
-import kafka.utils.Utils.inLock
 import kafka.common._
-import com.yammer.metrics.core.Gauge
 import kafka.metrics._
 import kafka.network.BlockingChannel
-import kafka.client.ClientUtils
-import kafka.api._
-import scala.Some
-import kafka.common.TopicAndPartition
+import kafka.serializer._
+import kafka.utils.Utils.inLock
+import kafka.utils.ZkUtils._
+import kafka.utils._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+import scala.collection._
 
 
 /**
@@ -184,7 +184,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val canShutdown = isShuttingDown.compareAndSet(false, true)
     if (canShutdown) {
       info("ZKConsumerConnector shutting down")
-
+      val startTime = System.nanoTime()
+      KafkaMetricsGroup.removeAllConsumerMetrics(config.clientId)
       rebalanceLock synchronized {
         if (wildcardTopicWatcher != null)
           wildcardTopicWatcher.shutdown()
@@ -208,7 +209,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           case e: Throwable =>
             fatal("error during consumer connector shutdown", e)
         }
-        info("ZKConsumerConnector shut down completed")
+        info("ZKConsumerConnector shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index a20ab90..00df462 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -18,10 +18,15 @@
 package kafka.metrics
 
 
-import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.utils.Logging
 import java.util.concurrent.TimeUnit
+
 import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
+import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
+import kafka.utils.Logging
+
+import scala.collection.immutable
 
 
 trait KafkaMetricsGroup extends Logging {
@@ -51,4 +56,115 @@ trait KafkaMetricsGroup extends Logging {
   def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
     Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
 
+  def removeMetric(name: String) =
+    Metrics.defaultRegistry().removeMetric(metricName(name))
+
+}
+
+object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
+  /**
+   * To make sure all the metrics be de-registered after consumer/producer close, the metric names should be
+   * put into the metric name set.
+   */
+  private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
+    // kafka.consumer.ZookeeperConsumerConnector
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
+
+    // kafka.consumer.ConsumerFetcherManager
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
+
+    // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
+    new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
+
+    // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
+
+    // kafka.consumer.ConsumerTopicStats
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
+
+    // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
+    new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
+
+    // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
+
+    /**
+     * ProducerRequestStats <-- SyncProducer
+     * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
+     */
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+  )
+
+  private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
+    // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
+    new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
+
+    // kafka.producer.ProducerSendThread
+    new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
+
+    // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
+
+    // kafka.producer.ProducerRequestStats <-- SyncProducer
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+  )
+
+  def removeAllConsumerMetrics(clientId: String) {
+    FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
+    ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
+    ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
+    removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
+  }
+
+  def removeAllProducerMetrics(clientId: String) {
+    ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
+    ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)
+    ProducerStatsRegistry.removeProducerStats(clientId)
+    removeAllMetricsInList(KafkaMetricsGroup.producerMetricNameList, clientId)
+  }
+
+  private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
+    metricNameList.foreach(metric => {
+      val pattern = (clientId + ".*" + metric.getName +".*").r
+      val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
+      for (registeredMetric <- registeredMetrics) {
+        if (registeredMetric.getGroup == metric.getGroup &&
+          registeredMetric.getType == metric.getType) {
+          pattern.findFirstIn(registeredMetric.getName) match {
+            case Some(_) => {
+              val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
+              Metrics.defaultRegistry().removeMetric(registeredMetric)
+              val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
+              trace("Removing metric %s. Metrics registry size reduced from %d to %d".format(
+                  registeredMetric, beforeRemovalSize, afterRemovalSize))
+            }
+            case _ =>
+          }
+        }
+      }
+    })
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 4798481..cd634f6 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -16,14 +16,14 @@
  */
 package kafka.producer
 
-import async.{DefaultEventHandler, ProducerSendThread, EventHandler}
-import kafka.utils._
-import java.util.Random
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
-import kafka.serializer.Encoder
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+
 import kafka.common.QueueFullException
 import kafka.metrics._
+import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread}
+import kafka.serializer.Encoder
+import kafka.utils._
 
 
 class Producer[K,V](val config: ProducerConfig,
@@ -126,9 +126,12 @@ class Producer[K,V](val config: ProducerConfig,
       val canShutdown = hasShutdown.compareAndSet(false, true)
       if(canShutdown) {
         info("Shutting down producer")
+        val startTime = System.nanoTime()
+        KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
         if (producerSendThread != null)
           producerSendThread.shutdown
         eventHandler.close
+        info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 9694220..1c46d72 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -52,5 +52,9 @@ object ProducerRequestStatsRegistry {
   def getProducerRequestStats(clientId: String) = {
     globalStats.getAndMaybePut(clientId)
   }
+
+  def removeProducerRequestStats(clientId: String) {
+    globalStats.remove(clientId)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index e1610d3..35e3aae 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -36,4 +36,8 @@ object ProducerStatsRegistry {
   def getProducerStats(clientId: String) = {
     statsRegistry.getAndMaybePut(clientId)
   }
+
+  def removeProducerStats(clientId: String) {
+    statsRegistry.remove(clientId)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index ed209f4..9bb1419 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -55,4 +55,8 @@ object ProducerTopicStatsRegistry {
   def getProducerTopicStats(clientId: String) = {
     globalStats.getAndMaybePut(clientId)
   }
+
+  def removeProducerTopicStats(clientId: String) {
+    globalStats.remove(clientId)
+  }
 }