You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/28 01:10:58 UTC
kafka git commit: KAFKA-1866 LogStartOffset gauge throws exceptions
after log.delete(); reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk b56f5973c -> 687abc98a
KAFKA-1866 LogStartOffset gauge throws exceptions after log.delete(); reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/687abc98
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/687abc98
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/687abc98
Branch: refs/heads/trunk
Commit: 687abc98a4600bf90ed7a7acb6fb2a5e6eac2055
Parents: b56f597
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Fri Feb 27 16:10:37 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 27 16:10:47 2015 -0800
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 11 ++++++-
core/src/main/scala/kafka/log/Log.scala | 12 ++++++-
.../scala/unit/kafka/metrics/MetricsTest.scala | 33 +++++++++++++++++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 4 +--
4 files changed, 52 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/687abc98/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index bfe4f45..c4bf48a 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -60,6 +60,7 @@ class Partition(val topic: String,
this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
+ val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
newGauge("UnderReplicated",
new Gauge[Int] {
@@ -67,7 +68,7 @@ class Partition(val topic: String,
if (isUnderReplicated) 1 else 0
}
},
- Map("topic" -> topic, "partition" -> partitionId.toString)
+ tags
)
def isUnderReplicated(): Boolean = {
@@ -141,6 +142,7 @@ class Partition(val topic: String,
leaderReplicaIdOpt = None
try {
logManager.deleteLog(TopicAndPartition(topic, partitionId))
+ removePartitionMetrics()
} catch {
case e: IOException =>
fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e)
@@ -436,6 +438,13 @@ class Partition(val topic: String,
}
}
+ /**
+ * remove deleted log metrics
+ */
+ private def removePartitionMetrics() {
+ removeMetric("UnderReplicated", tags)
+ }
+
override def equals(that: Any): Boolean = {
if(!(that.isInstanceOf[Partition]))
return false
http://git-wip-us.apache.org/repos/asf/kafka/blob/687abc98/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 846023b..0c4efa8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -624,6 +624,7 @@ class Log(val dir: File,
*/
private[log] def delete() {
lock synchronized {
+ removeLogMetrics()
logSegments.foreach(_.delete())
segments.clear()
Utils.rm(dir)
@@ -769,7 +770,16 @@ class Log(val dir: File,
newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
}
}
-
+
+ /**
+ * remove deleted log metrics
+ */
+ private[log] def removeLogMetrics(): Unit = {
+ removeMetric("NumLogSegments", tags)
+ removeMetric("LogStartOffset", tags)
+ removeMetric("LogEndOffset", tags)
+ removeMetric("Size", tags)
+ }
/**
* Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it.
* @param segment The segment to add
http://git-wip-us.apache.org/repos/asf/kafka/blob/687abc98/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 3cf23b3..111e4a2 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -18,15 +18,20 @@
package kafka.consumer
import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.MetricPredicate
+import org.junit.Test
import junit.framework.Assert._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
-import scala.collection._
-import org.scalatest.junit.JUnit3Suite
import kafka.message._
import kafka.serializer._
import kafka.utils._
+import kafka.admin.AdminUtils
import kafka.utils.TestUtils._
+import scala.collection._
+import scala.collection.JavaConversions._
+import scala.util.matching.Regex
+import org.scalatest.junit.JUnit3Suite
class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val zookeeperConnect = TestZKUtils.zookeeperConnect
@@ -34,7 +39,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val numParts = 2
val topic = "topic1"
val configs =
- for (props <- TestUtils.createBrokerConfigs(numNodes))
+ for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic=true))
yield new KafkaConfig(props) {
override val zkConnect = zookeeperConnect
override val numPartitions = numParts
@@ -45,6 +50,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
super.tearDown()
}
+ @Test
def testMetricsLeak() {
// create topic topic1 with 1 partition on broker 0
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
@@ -59,6 +65,15 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
}
}
+ @Test
+ def testMetricsReporterAfterDeletingTopic() {
+ val topic = "test-topic-metric"
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
+ AdminUtils.deleteTopic(zkClient, topic)
+ TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
+ }
+
def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1)
// create a consumer
@@ -69,4 +84,14 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
zkConsumerConnector1.shutdown()
}
-}
\ No newline at end of file
+
+ private def checkTopicMetricsExists(topic: String): Boolean = {
+ val topicMetricRegex = new Regex(".*("+topic+")$")
+ val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
+ for(metricGroup <- metricGroups) {
+ if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
+ return true
+ }
+ false
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/687abc98/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 21d0ed2..32b2899 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -801,9 +801,9 @@ object TestUtils extends Logging {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
- "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
+ "Admin path /admin/delete_topic/%s path not deleted even after a replica is restarted".format(topic))
TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
- "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
+ "Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic, topic))
// ensure that the topic-partition has been deleted from all brokers' replica managers
TestUtils.waitUntilTrue(() =>
servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition) == None)),