You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/28 01:10:58 UTC

kafka git commit: KAFKA-1866 LogStartOffset gauge throws exceptions after log.delete(); reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk b56f5973c -> 687abc98a


KAFKA-1866 LogStartOffset gauge throws exceptions after log.delete(); reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/687abc98
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/687abc98
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/687abc98

Branch: refs/heads/trunk
Commit: 687abc98a4600bf90ed7a7acb6fb2a5e6eac2055
Parents: b56f597
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Fri Feb 27 16:10:37 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 27 16:10:47 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 11 ++++++-
 core/src/main/scala/kafka/log/Log.scala         | 12 ++++++-
 .../scala/unit/kafka/metrics/MetricsTest.scala  | 33 +++++++++++++++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala |  4 +--
 4 files changed, 52 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/687abc98/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index bfe4f45..c4bf48a 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -60,6 +60,7 @@ class Partition(val topic: String,
   this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
+  val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
 
   newGauge("UnderReplicated",
     new Gauge[Int] {
@@ -67,7 +68,7 @@ class Partition(val topic: String,
         if (isUnderReplicated) 1 else 0
       }
     },
-    Map("topic" -> topic, "partition" -> partitionId.toString)
+    tags
   )
 
   def isUnderReplicated(): Boolean = {
@@ -141,6 +142,7 @@ class Partition(val topic: String,
       leaderReplicaIdOpt = None
       try {
         logManager.deleteLog(TopicAndPartition(topic, partitionId))
+        removePartitionMetrics()
       } catch {
         case e: IOException =>
           fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e)
@@ -436,6 +438,13 @@ class Partition(val topic: String,
     }
   }
 
+  /**
+   * remove deleted log metrics
+   */
+  private def removePartitionMetrics() {
+    removeMetric("UnderReplicated", tags)
+  }
+
   override def equals(that: Any): Boolean = {
     if(!(that.isInstanceOf[Partition]))
       return false

http://git-wip-us.apache.org/repos/asf/kafka/blob/687abc98/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 846023b..0c4efa8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -624,6 +624,7 @@ class Log(val dir: File,
    */
   private[log] def delete() {
     lock synchronized {
+      removeLogMetrics()
       logSegments.foreach(_.delete())
       segments.clear()
       Utils.rm(dir)
@@ -769,7 +770,16 @@ class Log(val dir: File,
       newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
     }  
   }
-  
+
+  /**
+   * remove deleted log metrics
+   */
+  private[log] def removeLogMetrics(): Unit = {
+    removeMetric("NumLogSegments", tags)
+    removeMetric("LogStartOffset", tags)
+    removeMetric("LogEndOffset", tags)
+    removeMetric("Size", tags)
+  }
   /**
    * Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it.
    * @param segment The segment to add

http://git-wip-us.apache.org/repos/asf/kafka/blob/687abc98/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 3cf23b3..111e4a2 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -18,15 +18,20 @@
 package kafka.consumer
 
 import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.MetricPredicate
+import org.junit.Test
 import junit.framework.Assert._
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
-import scala.collection._
-import org.scalatest.junit.JUnit3Suite
 import kafka.message._
 import kafka.serializer._
 import kafka.utils._
+import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
+import scala.collection._
+import scala.collection.JavaConversions._
+import scala.util.matching.Regex
+import org.scalatest.junit.JUnit3Suite
 
 class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
   val zookeeperConnect = TestZKUtils.zookeeperConnect
@@ -34,7 +39,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
   val numParts = 2
   val topic = "topic1"
   val configs =
-    for (props <- TestUtils.createBrokerConfigs(numNodes))
+    for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic=true))
     yield new KafkaConfig(props) {
       override val zkConnect = zookeeperConnect
       override val numPartitions = numParts
@@ -45,6 +50,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
     super.tearDown()
   }
 
+  @Test
   def testMetricsLeak() {
     // create topic topic1 with 1 partition on broker 0
     createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
@@ -59,6 +65,15 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
     }
   }
 
+  @Test
+  def testMetricsReporterAfterDeletingTopic() {
+    val topic = "test-topic-metric"
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
+    AdminUtils.deleteTopic(zkClient, topic)
+    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
+  }
+
   def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
     val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1)
     // create a consumer
@@ -69,4 +84,14 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
     zkConsumerConnector1.shutdown()
   }
-}
\ No newline at end of file
+
+  private def checkTopicMetricsExists(topic: String): Boolean = {
+    val topicMetricRegex = new Regex(".*("+topic+")$")
+    val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
+    for(metricGroup <- metricGroups) {
+      if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
+        return true
+    }
+    false
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/687abc98/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 21d0ed2..32b2899 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -801,9 +801,9 @@ object TestUtils extends Logging {
     val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
     // wait until admin path for delete topic is deleted, signaling completion of topic deletion
     TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
-      "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
+      "Admin path /admin/delete_topic/%s path not deleted even after a replica is restarted".format(topic))
     TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
-      "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
+      "Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic, topic))
     // ensure that the topic-partition has been deleted from all brokers' replica managers
     TestUtils.waitUntilTrue(() =>
       servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition) == None)),