You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/08/13 22:09:02 UTC
git commit: kafka-1567;
Metric memory leaking after closing the clients; patched by Jiangjie Qin;
reviewed by Guozhang Wang and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 48cc2dc0e -> aa70a7d02
kafka-1567; Metric memory leaking after closing the clients; patched by Jiangjie Qin; reviewed by Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa70a7d0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa70a7d0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa70a7d0
Branch: refs/heads/trunk
Commit: aa70a7d02552585a83c62784b92b67ed8ae3a304
Parents: 48cc2dc
Author: Jiangjie Qin <be...@gmail.com>
Authored: Wed Aug 13 13:08:57 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Aug 13 13:08:57 2014 -0700
----------------------------------------------------------------------
.../kafka/consumer/ConsumerTopicStats.scala | 4 +
.../consumer/FetchRequestAndResponseStats.scala | 16 ++-
.../consumer/ZookeeperConsumerConnector.scala | 37 +++---
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 120 ++++++++++++++++++-
.../main/scala/kafka/producer/Producer.scala | 13 +-
.../kafka/producer/ProducerRequestStats.scala | 4 +
.../scala/kafka/producer/ProducerStats.scala | 4 +
.../kafka/producer/ProducerTopicStats.scala | 4 +
8 files changed, 175 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index ff5f470..f63e6c5 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -54,4 +54,8 @@ object ConsumerTopicStatsRegistry {
def getConsumerTopicStat(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
+
+ def removeConsumerTopicStat(clientId: String) {
+ globalStats.remove(clientId)
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 875eeeb..5243f41 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -17,10 +17,11 @@
package kafka.consumer
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-import kafka.utils.Pool
import java.util.concurrent.TimeUnit
+
import kafka.common.ClientIdAndBroker
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
+import kafka.utils.Pool
class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
@@ -53,6 +54,17 @@ object FetchRequestAndResponseStatsRegistry {
def getFetchRequestAndResponseStats(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
+
+ def removeConsumerFetchRequestAndResponseStats(clientId: String) {
+ val pattern = (clientId + "-ConsumerFetcherThread.*").r
+ val keys = globalStats.keys
+ for (key <- keys) {
+ pattern.findFirstIn(key) match {
+ case Some(_) => globalStats.remove(key)
+ case _ =>
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 65f518d..acfd064 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -17,28 +17,28 @@
package kafka.consumer
+import java.net.InetAddress
+import java.util.UUID
import java.util.concurrent._
import java.util.concurrent.atomic._
-import locks.ReentrantLock
-import collection._
+import java.util.concurrent.locks.ReentrantLock
+
+import com.yammer.metrics.core.Gauge
+import kafka.api._
+import kafka.client.ClientUtils
import kafka.cluster._
-import kafka.utils._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import java.net.InetAddress
-import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient}
-import org.apache.zookeeper.Watcher.Event.KeeperState
-import java.util.UUID
-import kafka.serializer._
-import kafka.utils.ZkUtils._
-import kafka.utils.Utils.inLock
import kafka.common._
-import com.yammer.metrics.core.Gauge
import kafka.metrics._
import kafka.network.BlockingChannel
-import kafka.client.ClientUtils
-import kafka.api._
-import scala.Some
-import kafka.common.TopicAndPartition
+import kafka.serializer._
+import kafka.utils.Utils.inLock
+import kafka.utils.ZkUtils._
+import kafka.utils._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+import scala.collection._
/**
@@ -184,7 +184,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown) {
info("ZKConsumerConnector shutting down")
-
+ val startTime = System.nanoTime()
+ KafkaMetricsGroup.removeAllConsumerMetrics(config.clientId)
rebalanceLock synchronized {
if (wildcardTopicWatcher != null)
wildcardTopicWatcher.shutdown()
@@ -208,7 +209,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
case e: Throwable =>
fatal("error during consumer connector shutdown", e)
}
- info("ZKConsumerConnector shut down completed")
+ info("ZKConsumerConnector shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index a20ab90..00df462 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -18,10 +18,15 @@
package kafka.metrics
-import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.utils.Logging
import java.util.concurrent.TimeUnit
+
import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
+import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
+import kafka.utils.Logging
+
+import scala.collection.immutable
trait KafkaMetricsGroup extends Logging {
@@ -51,4 +56,115 @@ trait KafkaMetricsGroup extends Logging {
def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
+ def removeMetric(name: String) =
+ Metrics.defaultRegistry().removeMetric(metricName(name))
+
+}
+
+object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
+ /**
+ * To make sure all the metrics be de-registered after consumer/producer close, the metric names should be
+ * put into the metric name set.
+ */
+ private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
+ // kafka.consumer.ZookeeperConsumerConnector
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
+
+ // kafka.consumer.ConsumerFetcherManager
+ new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
+ new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
+
+ // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
+ new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
+
+ // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
+ new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
+ new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
+
+ // kafka.consumer.ConsumerTopicStats
+ new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
+ new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
+
+ // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
+ new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
+ new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
+
+ // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
+ new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
+ new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
+ new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
+ new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
+
+ /**
+ * ProducerRequestStats <-- SyncProducer
+ * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
+ */
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+ )
+
+ private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
+ // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
+ new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
+ new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
+ new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
+
+ // kafka.producer.ProducerSendThread
+ new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
+
+ // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
+
+ // kafka.producer.ProducerRequestStats <-- SyncProducer
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+ )
+
+ def removeAllConsumerMetrics(clientId: String) {
+ FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
+ ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
+ ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
+ removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
+ }
+
+ def removeAllProducerMetrics(clientId: String) {
+ ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
+ ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)
+ ProducerStatsRegistry.removeProducerStats(clientId)
+ removeAllMetricsInList(KafkaMetricsGroup.producerMetricNameList, clientId)
+ }
+
+ private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
+ metricNameList.foreach(metric => {
+ val pattern = (clientId + ".*" + metric.getName +".*").r
+ val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
+ for (registeredMetric <- registeredMetrics) {
+ if (registeredMetric.getGroup == metric.getGroup &&
+ registeredMetric.getType == metric.getType) {
+ pattern.findFirstIn(registeredMetric.getName) match {
+ case Some(_) => {
+ val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
+ Metrics.defaultRegistry().removeMetric(registeredMetric)
+ val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
+ trace("Removing metric %s. Metrics registry size reduced from %d to %d".format(
+ registeredMetric, beforeRemovalSize, afterRemovalSize))
+ }
+ case _ =>
+ }
+ }
+ }
+ })
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 4798481..cd634f6 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -16,14 +16,14 @@
*/
package kafka.producer
-import async.{DefaultEventHandler, ProducerSendThread, EventHandler}
-import kafka.utils._
-import java.util.Random
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
-import kafka.serializer.Encoder
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+
import kafka.common.QueueFullException
import kafka.metrics._
+import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread}
+import kafka.serializer.Encoder
+import kafka.utils._
class Producer[K,V](val config: ProducerConfig,
@@ -126,9 +126,12 @@ class Producer[K,V](val config: ProducerConfig,
val canShutdown = hasShutdown.compareAndSet(false, true)
if(canShutdown) {
info("Shutting down producer")
+ val startTime = System.nanoTime()
+ KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
if (producerSendThread != null)
producerSendThread.shutdown
eventHandler.close
+ info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 9694220..1c46d72 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -52,5 +52,9 @@ object ProducerRequestStatsRegistry {
def getProducerRequestStats(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
+
+ def removeProducerRequestStats(clientId: String) {
+ globalStats.remove(clientId)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index e1610d3..35e3aae 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -36,4 +36,8 @@ object ProducerStatsRegistry {
def getProducerStats(clientId: String) = {
statsRegistry.getAndMaybePut(clientId)
}
+
+ def removeProducerStats(clientId: String) {
+ statsRegistry.remove(clientId)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa70a7d0/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index ed209f4..9bb1419 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -55,4 +55,8 @@ object ProducerTopicStatsRegistry {
def getProducerTopicStats(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
+
+ def removeProducerTopicStats(clientId: String) {
+ globalStats.remove(clientId)
+ }
}