You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/08/29 20:07:59 UTC

svn commit: r1378666 - in /incubator/kafka/branches/0.8: config/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/message/ core/src/main/scala/kafka/metrics/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/server/ core/src/ma...

Author: nehanarkhede
Date: Wed Aug 29 18:07:58 2012
New Revision: 1378666

URL: http://svn.apache.org/viewvc?rev=1378666&view=rev
Log:
KAFKA-489 Add metrics collection and graphs to the system test framework; patched by Neha Narkhede; reviewed by Jun Rao and John Fung

Added:
    incubator/kafka/branches/0.8/system_test/metrics.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/config/
    incubator/kafka/branches/0.8/system_test/utils/metrics.py
    incubator/kafka/branches/0.8/system_test/utils/pyh.py
Removed:
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/
Modified:
    incubator/kafka/branches/0.8/config/log4j.properties
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
    incubator/kafka/branches/0.8/system_test/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
    incubator/kafka/branches/0.8/system_test/system_test_env.py
    incubator/kafka/branches/0.8/system_test/system_test_runner.py
    incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/setup_utils.py
    incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/testcase_env.py

Modified: incubator/kafka/branches/0.8/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/log4j.properties?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/log4j.properties (original)
+++ incubator/kafka/branches/0.8/config/log4j.properties Wed Aug 29 18:07:58 2012
@@ -25,7 +25,8 @@ log4j.appender.stdout.layout.ConversionP
 
 
 # Turn on all our debugging info
-#log4j.logger.kafka=INFO
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
 #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
 
 log4j.logger.kafka.perf=DEBUG

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Wed Aug 29 18:07:58 2012
@@ -134,8 +134,8 @@ trait SimpleConsumerStatsMBean {
 }
 
 @threadsafe
-class SimpleConsumerStats extends SimpleConsumerStatsMBean {
-  private val fetchRequestStats = new SnapshotStats
+class SimpleConsumerStats(monitoringDurationNs: Long) extends SimpleConsumerStatsMBean {
+  private val fetchRequestStats = new SnapshotStats(monitoringDurationNs)
 
   def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs)
 
@@ -154,7 +154,7 @@ class SimpleConsumerStats extends Simple
 
 object SimpleConsumerStats extends Logging {
   private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
-  private val stats = new SimpleConsumerStats
+  private val stats = new SimpleConsumerStats(1 * 1000L * 1000L * 1000L)
   Utils.registerMBean(stats, simpleConsumerstatsMBeanName)
 
   def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Wed Aug 29 18:07:58 2012
@@ -252,8 +252,8 @@ trait LogFlushStatsMBean {
 }
 
 @threadsafe
-class LogFlushStats extends LogFlushStatsMBean {
-  private val flushRequestStats = new SnapshotStats
+class LogFlushStats(monitorDurationNs: Long) extends LogFlushStatsMBean {
+  private val flushRequestStats = new SnapshotStats(monitorDurationNs)
 
   def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
 
@@ -270,7 +270,7 @@ class LogFlushStats extends LogFlushStat
 
 object LogFlushStats extends Logging {
   private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
-  private val stats = new LogFlushStats
+  private val stats = new LogFlushStats(1L * 1000 * 1000 * 1000)
   Utils.registerMBean(stats, LogFlushStatsMBeanName)
 
   def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala Wed Aug 29 18:07:58 2012
@@ -33,7 +33,7 @@ trait KafkaMetricsGroup extends Logging 
    *
    * @return The sub-group identifier.
    */
-  def metricsGroupIdent: String
+  def metricsGroupIdent: String = ""
 
   /**
    * Creates a new MetricName object for gauges, meters, etc. created for this

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Wed Aug 29 18:07:58 2012
@@ -192,8 +192,8 @@ trait SyncProducerStatsMBean {
 }
 
 @threadsafe
-class SyncProducerStats extends SyncProducerStatsMBean {
-  private val produceRequestStats = new SnapshotStats
+class SyncProducerStats(monitoringDurationNs: Long) extends SyncProducerStatsMBean {
+  private val produceRequestStats = new SnapshotStats(monitoringDurationNs)
 
   def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs)
 
@@ -208,8 +208,10 @@ class SyncProducerStats extends SyncProd
 
 object SyncProducerStats extends Logging {
   private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
-  private val stats = new SyncProducerStats
+  private val stats = new SyncProducerStats(1L * 1000 * 1000 * 1000)
   swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
 
-  def recordProduceRequest(requestMs: Long) = stats.recordProduceRequest(requestMs)
+  def recordProduceRequest(requestMs: Long) = {
+    stats.recordProduceRequest(requestMs)
+  }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Aug 29 18:07:58 2012
@@ -459,9 +459,8 @@ class KafkaApis(val requestChannel: Requ
    */
   class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
 
-    this.logIdent = "[etchRequestPurgatory-%d], ".format(brokerId)
+    this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
 
-    override def metricsGroupIdent = metricsGroup
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@@ -598,8 +597,6 @@ class KafkaApis(val requestChannel: Requ
 
     this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId)
 
-    override def metricsGroupIdent = metricsGroup
-
     protected def checkSatisfied(followerFetchRequestKey: RequestKey,
                                  delayedProduce: DelayedProduce) =
       delayedProduce.isSatisfied(followerFetchRequestKey)
@@ -617,7 +614,6 @@ class KafkaApis(val requestChannel: Requ
 
   private class DelayedRequestMetrics {
     private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
-      override def metricsGroupIdent = metricsGroup
       val caughtUpFollowerFetchRequestMeter =
         newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
       val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel)
@@ -645,7 +641,6 @@ class KafkaApis(val requestChannel: Requ
                                              keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
       private val metricPrefix = if (forFollower) "Follower" else "NonFollower"
 
-      override def metricsGroupIdent = metricsGroup
       val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
         Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond",
           "requests", TimeUnit.SECONDS))
@@ -705,7 +700,6 @@ class KafkaApis(val requestChannel: Requ
       aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
     }
 
-
     private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) {
       val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
         else aggregateNonFollowerFetchRequestMetrics

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Wed Aug 29 18:07:58 2012
@@ -72,8 +72,6 @@ abstract class RequestPurgatory[T <: Del
   private val satisfactionRateBeanName = "SatisfactionRate"
   private val expirationRateBeanName = "ExpirationRate"
 
-  override def metricsGroupIdent = ""
-
   val satisfactionRateMeter = newMeter(
       satisfactionRateBeanName,
       "requests",

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala Wed Aug 29 18:07:58 2012
@@ -26,37 +26,43 @@ import joptsimple.OptionParser
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.math._
+import kafka.utils.Logging
 
-
-object JmxTool {
+object JmxTool extends Logging {
 
   def main(args: Array[String]) {
     // Parse command line
     val parser = new OptionParser
-    val objectNameOpt = 
+    val objectNameOpt =
       parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
-                                    "can be given multiple times to specify more than one query. If no objects are specified " +   
-                                    "all objects will be queried.")
-      .withRequiredArg
-      .describedAs("name")
-      .ofType(classOf[String])
+        "can be given multiple times to specify more than one query. If no objects are specified " +
+        "all objects will be queried.")
+        .withRequiredArg
+        .describedAs("name")
+        .ofType(classOf[String])
+    val attributesOpt =
+      parser.accepts("attributes", "The whitelist of attributes to query. This is a comma-separated list. If no " +
+        "attributes are specified all objects will be queried.")
+        .withOptionalArg()
+        .describedAs("name")
+        .ofType(classOf[String])
     val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats.")
       .withRequiredArg
       .describedAs("ms")
       .ofType(classOf[java.lang.Integer])
-      .defaultsTo(5000)
+      .defaultsTo(2000)
     val helpOpt = parser.accepts("help", "Print usage information.")
-    val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + 
-                                                      "See java.text.SimpleDateFormat for options.")
+    val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
+      "See java.text.SimpleDateFormat for options.")
       .withOptionalArg()
       .describedAs("format")
       .ofType(classOf[String])
     val jmxServiceUrlOpt =
       parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
-      .withRequiredArg
-      .describedAs("service-url")
-      .ofType(classOf[String])
-      .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
+        .withRequiredArg
+        .describedAs("service-url")
+        .ofType(classOf[String])
+        .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
 
     val options = parser.parse(args : _*)
 
@@ -67,44 +73,62 @@ object JmxTool {
 
     val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
     val interval = options.valueOf(reportingIntervalOpt).intValue
+    val attributesWhitelistExists = options.has(attributesOpt)
+    val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None
     val dateFormatExists = options.has(dateFormatOpt)
     val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
     val jmxc = JMXConnectorFactory.connect(url, null)
     val mbsc = jmxc.getMBeanServerConnection()
 
-    val queries: Iterable[ObjectName] = 
+    val queries: Iterable[ObjectName] =
       if(options.has(objectNameOpt))
         options.valuesOf(objectNameOpt).map(new ObjectName(_))
       else
         List(null)
+
     val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten
-    val attributes: Iterable[(ObjectName, Array[String])] = 
+    val allAttributes: Iterable[(ObjectName, Array[String])] =
       names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
 
+
+    val numExpectedAttributes: Map[ObjectName, Int] =
+      attributesWhitelistExists match {
+        case true => queries.map((_, attributesWhitelist.get.size)).toMap
+        case false => names.map((name: ObjectName) =>
+          (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName).size)).toMap
+      }
+
     // print csv header
-    val keys = List("time") ++ queryAttributes(mbsc, names).keys.toArray.sorted
-    println(keys.map("\"" + _ + "\"").mkString(", "))
+    val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
+    if(keys.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
+      println(keys.map("\"" + _ + "\"").mkString(","))
 
     while(true) {
       val start = System.currentTimeMillis
-      val attributes = queryAttributes(mbsc, names)
+      val attributes = queryAttributes(mbsc, names, attributesWhitelist)
       attributes("time") = dateFormat match {
         case Some(dFormat) => dFormat.format(new Date)
         case None => System.currentTimeMillis().toString
       }
-      println(keys.map(attributes(_)).mkString(", "))
+      if(attributes.keySet.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
+        println(keys.map(attributes(_)).mkString(","))
       val sleep = max(0, interval - (System.currentTimeMillis - start))
       Thread.sleep(sleep)
     }
   }
 
-  def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName]) = {
+  def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
     var attributes = new mutable.HashMap[String, Any]()
-	for(name <- names) {
-	  val mbean = mbsc.getMBeanInfo(name)
+    for(name <- names) {
+      val mbean = mbsc.getMBeanInfo(name)
       for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName))) {
         val attr = attrObj.asInstanceOf[Attribute]
-        attributes(name + ":" + attr.getName) = attr.getValue
+        attributesWhitelist match {
+          case Some(allowedAttributes) =>
+            if(allowedAttributes.contains(attr.getName))
+              attributes(name + ":" + attr.getName) = attr.getValue
+          case None => attributes(name + ":" + attr.getName) = attr.getValue
+        }
       }
     }
     attributes

Modified: incubator/kafka/branches/0.8/system_test/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/cluster_config.json?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/cluster_config.json (original)
+++ incubator/kafka/branches/0.8/system_test/cluster_config.json Wed Aug 29 18:07:58 2012
@@ -4,7 +4,7 @@
             "entity_id": "0",
             "hostname": "localhost",
             "role": "zookeeper",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9990"
         },
@@ -12,7 +12,7 @@
             "entity_id": "1",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9991"
         },
@@ -20,7 +20,7 @@
             "entity_id": "2",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9992"
         },
@@ -28,7 +28,7 @@
             "entity_id": "3",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9993"
         },
@@ -36,7 +36,7 @@
             "entity_id": "4",
             "hostname": "localhost",
             "role": "producer_performance",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9994"
         },
@@ -44,7 +44,7 @@
             "entity_id": "5",
             "hostname": "localhost",
             "role": "console_consumer",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
             "java_home": "/export/apps/jdk/JDK-1_6_0_27",
             "jmx_port": "9995"
         }

Added: incubator/kafka/branches/0.8/system_test/metrics.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/metrics.json?rev=1378666&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/metrics.json (added)
+++ incubator/kafka/branches/0.8/system_test/metrics.json Wed Aug 29 18:07:58 2012
@@ -0,0 +1,174 @@
+{
+    "dashboards": [
+        {
+            "role": "broker",
+            "graphs": [
+               { 
+                  "graph_name": "SocketServerThroughput",
+                  "y_label": "bytes-read-per-second,bytes-written-per-second",
+                  "bean_name": "kafka:type=kafka.SocketServerStats",
+                  "attributes": "BytesReadPerSecond,BytesWrittenPerSecond"
+               },
+               { 
+                  "graph_name": "FetchRequestPurgatoryNumDelayedRequests",
+                  "y_label": "num-delayed-requests",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
+                  "attributes": "Value"
+               },
+               { 
+                  "graph_name": "MeanFetchRequestPurgatorySatisfactionRate",
+                  "y_label": "mean-request-satisfaction-rate",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=SatisfactionRate",
+                  "attributes": "MeanRate"
+               },
+               { 
+                  "graph_name": "FetchRequestPurgatoryTimeToSatisfy",
+                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=TimeToSatisfyInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               { 
+                  "graph_name": "FetchRequestPurgatoryExpirationRate",
+                  "y_label": "expiration-rate",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=ExpirationRate",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "ProducerRequestPurgatoryNumDelayedRequests",
+                  "y_label": "num-delayed-requests",
+                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
+                  "attributes": "Value"
+               },
+               {
+                  "graph_name": "MeanProducerRequestPurgatorySatisfactionRate",
+                  "y_label": "mean-request-satisfaction-rate",
+                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=SatisfactionRate",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "ProducerRequestPurgatoryExpirationRate",
+                  "y_label": "expiration-rate",
+                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=ExpirationRate",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-CaughtUpFollowerFetchRequestsPerSecond",
+                  "y_label": "mean-caught-up-follower-fetch-requests-per-second",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=CaughtUpFollowerRequestsPerSecond-all",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-ExpiredRequestRate",
+                  "y_label": "mean-expired-request-rate",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=ExpiredRequestsPerSecond-all",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-FollowerCatchUpLatency",
+                  "y_label": "mean-follower-catchup-time-ns,95th-percentile-follower-catchup-time-ns,99th-percentile-follower-catchup-time-ns,999th-percentile-follower-catchup-time-ns",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=FollowerCatchUpTimeInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-SatisfactionTimeInNs",
+                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfactionTimeInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-SatisfiedRequestsPerSecond",
+                  "y_label": "mean-satisfaction-requests-per-second",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfiedRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-Throughput-all",
+                  "y_label": "mean-purgatory-throughput-all",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=Throughput-all",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-Follower-ExpiredRequestRate",
+                  "y_label": "mean-expired-request-rate",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-ExpiredRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-Follower-SatisfactionTimeInNs",
+                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfactionTimeInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               {
+                  "graph_name": "DelayedProducerRequests-Follower-SatisfiedRequestsPerSecond",
+                  "y_label": "mean-satisfaction-requests-per-second",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfiedRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-Follower-Throughput-all",
+                  "y_label": "mean-purgatory-throughput-all",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-Throughput-all",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-NonFollower-ExpiredRequestRate",
+                  "y_label": "mean-expired-request-rate",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-ExpiredRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-NonFollower-SatisfactionTimeInNs",
+                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfactionTimeInNs",
+                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-NonFollower-SatisfiedRequestsPerSecond",
+                  "y_label": "mean-satisfaction-requests-per-second",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfiedRequestsPerSecond",
+                  "attributes": "MeanRate"
+               },
+               {
+                  "graph_name": "DelayedFetchRequests-NonFollower-Throughput-all",
+                  "y_label": "mean-purgatory-throughput-all",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-Throughput-all",
+                  "attributes": "MeanRate"
+               }
+             ]
+       },
+        {
+            "role": "producer_performance",
+            "graphs": [
+               {
+                  "graph_name": "ProducerStats",
+                  "y_label": "avg-producer-latency-ms,max-producer-latency-ms,produce-request-throughput",
+                  "bean_name": "kafka:type=kafka.KafkaProducerStats",
+                  "attributes": "AvgProduceRequestMs,MaxProduceRequestMs,ProduceRequestsPerSecond"
+               }
+             ]
+       },
+       {
+            "role": "console_consumer",
+            "graphs": [
+               {
+                  "graph_name": "SimpleConsumerRequestStats",
+                  "y_label": "simple-consumer-throughput,simple-consumer-throughput-bytes,simple-consumer-latency-ms",
+                  "bean_name": "kafka:type=kafka.SimpleConsumerStats",
+                  "attributes": "FetchRequestsPerSecond,ConsumerThroughput,AvgFetchRequestMs"
+               }
+             ]
+       },
+        {
+            "role": "zookeeper",
+            "graphs": [
+               {
+                  "graph_name": "ZookeeperServerStats",
+                  "y_label": "zookeeper-latency-ms",
+                  "bean_name": "org.apache.ZooKeeperService:name0=StandaloneServer_port-1",
+                  "attributes": "AvgRequestLatency"
+               }
+             ]
+       }
+    ]
+}

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties Wed Aug 29 18:07:58 2012
@@ -118,3 +118,5 @@ zk.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000
+
+monitoring.period.secs=1

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -11,6 +27,7 @@ import signal
 import subprocess
 import sys
 import time
+import traceback
 
 from   system_test_env    import SystemTestEnv
 sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
@@ -20,6 +37,7 @@ from   testcase_env       import Testcas
 
 # product specific: Kafka
 import kafka_system_test_utils
+import metrics
 
 class ReplicaBasicTest(SetupUtils):
 
@@ -85,7 +103,8 @@ class ReplicaBasicTest(SetupUtils):
                 #### => update testcaseEnv
                 self.testcaseEnv.testCaseBaseDir = testCasePathName
                 self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
-    
+                self.testcaseEnv.testCaseDashboardsDir = self.testcaseEnv.testCaseBaseDir + "/dashboards"
+
                 # get testcase description
                 testcaseDescription = ""
                 for k,v in testcaseNonEntityDataDict.items():
@@ -214,8 +233,9 @@ class ReplicaBasicTest(SetupUtils):
                 # starting producer 
                 self.log_message("starting producer")
                 kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
-                self.anonLogger.info("sleeping for 5s")
-                time.sleep(5)
+                self.anonLogger.info("sleeping for 10s")
+                time.sleep(10)
+                kafka_system_test_utils.stop_producer()
     
                 # starting previously terminated broker 
                 if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]):
@@ -231,26 +251,45 @@ class ReplicaBasicTest(SetupUtils):
                 # starting consumer
                 self.log_message("starting consumer")
                 kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
-    
+                time.sleep(10)
+                kafka_system_test_utils.stop_consumer()
+                
                 # this testcase is completed - so stopping all entities
                 self.log_message("stopping all entities")
                 for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
                     kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
-    
+                    
                 # validate the data matched
                 self.log_message("validating data matched")
                 result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
-    
+                
                 # =============================================
                 # collect logs from remote hosts
                 # =============================================
                 kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
     
+                # ==========================
+                # draw graphs
+                # ==========================
+                metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME, 
+                                        self.testcaseEnv, 
+                                        self.systemTestEnv.clusterEntityConfigDictList)
+                
+                # build dashboard, one for each role
+                metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME,
+                                             self.testcaseEnv.testCaseDashboardsDir,
+                                             self.systemTestEnv.clusterEntityConfigDictList)
+                
+                # stop metrics processes
+                for entity in self.systemTestEnv.clusterEntityConfigDictList:
+                    metrics.stop_metrics_collection(entity['hostname'], entity['jmx_port'])
+                                        
             except Exception as e:
-                self.log_message("Exception caught : ")
-                print e
+                self.log_message("Exception while running test {0}".format(e))
+                traceback.print_exc()
                 self.log_message("stopping all entities")
                 for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
                     kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
 
+
  

Modified: incubator/kafka/branches/0.8/system_test/system_test_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/system_test_env.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/system_test_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/system_test_env.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -23,6 +39,8 @@ class SystemTestEnv():
     SYSTEM_TEST_MODULE_EXT    = ".py"
     CLUSTER_CONFIG_FILENAME   = "cluster_config.json"
     CLUSTER_CONFIG_PATHNAME   = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/" + CLUSTER_CONFIG_FILENAME)
+    METRICS_FILENAME          = "metrics.json"
+    METRICS_PATHNAME          = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/" + METRICS_FILENAME)
 
     clusterEntityConfigDictList  = []
     systemTestResultsList        = []

Modified: incubator/kafka/branches/0.8/system_test/system_test_runner.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/system_test_runner.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/system_test_runner.py (original)
+++ incubator/kafka/branches/0.8/system_test/system_test_runner.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/evn python
 
 # ===================================

Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -16,6 +32,7 @@ import time
 import traceback
 
 import system_test_utils
+import metrics
 
 from datetime  import datetime
 from time      import mktime
@@ -79,19 +96,22 @@ def generate_testcase_log_dirs(systemTes
 
     if not os.path.exists(testcasePathName + "/config") : os.makedirs(testcasePathName + "/config")
     if not os.path.exists(testcasePathName + "/logs")   : os.makedirs(testcasePathName + "/logs")
+    if not os.path.exists(testcasePathName + "/dashboards")   : os.makedirs(testcasePathName + "/dashboards")
 
-    dashboardsPathName = testcaseEnv.testCaseLogsDir + "/dashboards"
+    dashboardsPathName = testcasePathName + "/dashboards"
     if not os.path.exists(dashboardsPathName) : os.makedirs(dashboardsPathName)
 
     for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
         entityId = clusterEntityConfigDict["entity_id"]
         role     = clusterEntityConfigDict["role"]
 
-        logger.debug("entity_id : " + entityId, extra=d)
-        logger.debug("role      : " + role,     extra=d)
-
         metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
         if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName)
+        
+        # create the role directory under dashboards
+        dashboardsRoleDir = dashboardsPathName + "/" + role
+        if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir)
+        
 
 
 def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
@@ -192,40 +212,6 @@ def copy_file_with_dict_values(srcFile, 
         outfile.write(line)
     outfile.close()
 
-
-def start_metrics_collection(jmxHost, jmxPort, mBeanObjectName, mBeanAttributes, entityId, clusterEntityConfigDictList, testcaseEnv):
-    logger.info("starting metrics collection on jmx port: " + jmxPort, extra=d)
-    jmxUrl    = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
-    kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
-    javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
-    metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", entityId, "metrics")
-
-    startMetricsCmdList = ["ssh " + jmxHost,
-                           "'JAVA_HOME=" + javaHome,
-                           "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
-                           "--jmx-url " + jmxUrl,
-                           "--object-name " + mBeanObjectName + " &> ",
-                           metricsPathName + "/metrics.csv & echo pid:$! > ",
-                           metricsPathName + "/entity_pid'"]
-   
-    startMetricsCommand = " ".join(startMetricsCmdList) 
-    logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
-    system_test_utils.async_sys_call(startMetricsCommand)
-
-    pidCmdStr = "ssh " + jmxHost + " 'cat " + metricsPathName + "/entity_pid'"
-    logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
-    subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
-
-    # keep track of the remote entity pid in a dictionary
-    for line in subproc.stdout.readlines():
-        if line.startswith("pid"):
-            line = line.rstrip('\n')
-            logger.debug("found pid line: [" + line + "]", extra=d)
-            tokens  = line.split(':')
-            thisPid = tokens[1]
-            testcaseEnv.entityParentPidDict[thisPid] = thisPid
-
-
 def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
     logger.info("calling generate_properties_files", extra=d)
 
@@ -402,6 +388,7 @@ def start_entity_in_background(systemTes
     if role == "zookeeper":
         cmdList = ["ssh " + hostname,
                   "'JAVA_HOME=" + javaHome,
+                  "JMX_PORT=" + jmxPort,
                   kafkaHome + "/bin/zookeeper-server-start.sh ",
                   configPathName + "/" + configFile + " &> ",
                   logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -418,7 +405,7 @@ def start_entity_in_background(systemTes
     elif role == "broker":
         cmdList = ["ssh " + hostname,
                   "'JAVA_HOME=" + javaHome,
-                  "JMX_PORT=" + jmxPort,
+                 "JMX_PORT=" + jmxPort,
                   kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
                   configPathName + "/" + configFile + " &> ",
                   logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -446,10 +433,8 @@ def start_entity_in_background(systemTes
             tokens = line.split(':')
             testcaseEnv.entityParentPidDict[entityId] = tokens[1]
 
-    # if it is a broker, start metric collection
-    if role == "broker":
-        start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \
-            "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv)
+    time.sleep(1)
+    metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 
 def start_console_consumer(systemTestEnv, testcaseEnv):
@@ -460,6 +445,8 @@ def start_console_consumer(systemTestEnv
     for consumerConfig in consumerConfigList:
         host              = consumerConfig["hostname"]
         entityId          = consumerConfig["entity_id"]
+        jmxPort           = consumerConfig["jmx_port"] 
+        role              = consumerConfig["role"] 
         kafkaHome         = system_test_utils.get_data_by_lookup_keyval( \
                                 clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
         javaHome          = system_test_utils.get_data_by_lookup_keyval( \
@@ -476,13 +463,15 @@ def start_console_consumer(systemTestEnv
         commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"])
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
+                   "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
                    commandArgs + " &> " + consumerLogPathName + "'"]
 
         cmdStr = " ".join(cmdList)
         logger.debug("executing command: [" + cmdStr + "]", extra=d)
-        system_test_utils.sys_call(cmdStr)
-
+        system_test_utils.async_sys_call(cmdStr)
+        time.sleep(2)
+        metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 
 def start_producer_performance(systemTestEnv, testcaseEnv):
@@ -494,6 +483,8 @@ def start_producer_performance(systemTes
     for producerConfig in producerConfigList:
         host              = producerConfig["hostname"]
         entityId          = producerConfig["entity_id"]
+        jmxPort           = producerConfig["jmx_port"] 
+        role              = producerConfig["role"] 
         kafkaHome         = system_test_utils.get_data_by_lookup_keyval( \
                                 clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
         javaHome          = system_test_utils.get_data_by_lookup_keyval( \
@@ -510,12 +501,15 @@ def start_producer_performance(systemTes
         commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"])
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
+                   "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.perf.ProducerPerformance",
                    commandArgs + " &> " + producerLogPathName + "'"]
 
         cmdStr = " ".join(cmdList)
         logger.debug("executing command: [" + cmdStr + "]", extra=d)
-        system_test_utils.sys_call(cmdStr) 
+        system_test_utils.async_sys_call(cmdStr)
+        time.sleep(1)
+        metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 
 def stop_remote_entity(systemTestEnv, entityId, parentPid):
@@ -525,8 +519,8 @@ def stop_remote_entity(systemTestEnv, en
     pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
 
     logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
-    system_test_utils.sigterm_remote_process(hostname, pidStack)
-    time.sleep(1)
+#    system_test_utils.sigterm_remote_process(hostname, pidStack)
+#    time.sleep(1)
     system_test_utils.sigkill_remote_process(hostname, pidStack)
 
 
@@ -632,10 +626,9 @@ def validate_leader_election_successful(
             validationStatusDict["Validate leader election successful"] = "PASSED"
             return True
         except Exception, e:
-            logger.error("leader info not completed:", extra=d)
+            logger.error("leader info not completed: {0}".format(e), extra=d)
+            traceback.print_exc()
             validationStatusDict["Validate leader election successful"] = "FAILED"
-            print leaderDict
-            print e
             return False
     else:
         validationStatusDict["Validate leader election successful"] = "FAILED"
@@ -677,5 +670,14 @@ def cleanup_data_at_remote_hosts(systemT
         logger.debug("executing command [" + cmdStr + "]", extra=d)
         system_test_utils.sys_call(cmdStr)
 
+def get_entity_log_directory(testCaseBaseDir, entity_id, role):
+    return testCaseBaseDir + "/logs/" + role + "-" + entity_id
+
+def get_entities_for_role(clusterConfig, role):
+    return filter(lambda entity: entity['role'] == role, clusterConfig)
     
+def stop_consumer():
+    system_test_utils.sys_call("ps -ef | grep ConsoleConsumer | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
 
+def stop_producer():
+    system_test_utils.sys_call("ps -ef | grep ProducerPerformance | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")

Added: incubator/kafka/branches/0.8/system_test/utils/metrics.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/metrics.py?rev=1378666&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/metrics.py (added)
+++ incubator/kafka/branches/0.8/system_test/utils/metrics.py Wed Aug 29 18:07:58 2012
@@ -0,0 +1,254 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#!/usr/bin/env python
+
+# ===================================
+# file: metrics.py
+# ===================================
+
+import inspect
+import json
+import logging
+import os
+import signal
+import subprocess
+import sys
+import traceback
+
+import csv
+import time 
+import matplotlib.pyplot as plt
+from collections import namedtuple
+import numpy
+
+from pyh import *
+import kafka_system_test_utils
+import system_test_utils
+
+logger     = logging.getLogger("namedLogger")
+thisClassName = '(metrics)'
+d = {'name_of_class': thisClassName}
+
+def read_metrics_definition(metricsFile):
+    metricsFileData = open(metricsFile, "r").read()
+    metricsJsonData = json.loads(metricsFileData)
+    allDashboards = metricsJsonData['dashboards']
+    allGraphs = []
+    for dashboard in allDashboards:
+        dashboardName = dashboard['name']
+        graphs = dashboard['graphs']
+        for graph in graphs:
+            bean = graph['bean_name']
+            allGraphs.append(graph)
+            attributes = graph['attributes']
+            #print "Filtering on attributes " + attributes     
+    return allGraphs
+            
+def get_dashboard_definition(metricsFile, role):
+    metricsFileData = open(metricsFile, "r").read()
+    metricsJsonData = json.loads(metricsFileData)
+    allDashboards = metricsJsonData['dashboards']
+    dashboardsForRole = []
+    for dashboard in allDashboards:
+        if dashboard['role'] == role:
+            dashboardsForRole.append(dashboard) 
+    return dashboardsForRole
+
+def ensure_valid_headers(headers, attributes):
+    if headers[0] != "time":
+        raise Exception("First column should be time")
+    for header in headers:
+        logger.debug(header, extra=d)
+    # there should be exactly one column with a name that matches attributes
+    try:
+        attributeColumnIndex = headers.index(attributes)
+        return attributeColumnIndex
+    except ValueError as ve:
+        raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) +  
+                        " headers: {0}".format(",".join(headers)))
+        
+def plot_graphs(inputCsvFiles, labels, title, xLabel, yLabel, attribute, outputGraphFile):
+    # create empty plot
+    fig=plt.figure()
+    fig.subplots_adjust(bottom=0.2)
+    ax=fig.add_subplot(111)
+    labelx = -0.3  # axes coords
+    ax.set_xlabel(xLabel)
+    ax.set_ylabel(yLabel)
+    ax.grid()
+    #ax.yaxis.set_label_coords(labelx, 0.5)
+    Coordinates = namedtuple("Coordinates", 'x y')
+    plots = []
+    coordinates = []
+    # read data for all files, organize by label in a dict
+    for fileAndLabel in zip(inputCsvFiles, labels):
+        inputCsvFile = fileAndLabel[0]
+        label = fileAndLabel[1]
+        csv_reader = list(csv.reader(open(inputCsvFile, "rb")))
+        x,y = [],[]
+        xticks_labels = []
+        try:
+            # read first line as the headers
+            headers = csv_reader.pop(0)
+            attributeColumnIndex = ensure_valid_headers(headers, attribute)
+            logger.debug("Column index for attribute {0} is {1}".format(attribute, attributeColumnIndex), extra=d)
+            start_time = int(csv_reader[0][0])
+            for line in csv_reader:
+                yVal = float(line[attributeColumnIndex])                
+                xVal = (int(line[0])-start_time)/1000
+                y.append(yVal)
+                epoch=int(line[0])/1000
+                x.append(xVal)
+                xticks_labels.append(time.strftime("%H:%M:%S", time.localtime(epoch)))
+                coordinates.append(Coordinates(xVal, yVal))
+            p1 = ax.plot(x,y)
+            plots.append(p1)
+        except Exception as e:
+            logger.error("ERROR while plotting data for {0}: {1}".format(inputCsvFile, e), extra=d)
+    # find xmin, xmax, ymin, ymax from all csv files
+    xmin = min(map(lambda coord: coord.x, coordinates))
+    xmax = max(map(lambda coord: coord.x, coordinates))
+    ymin = min(map(lambda coord: coord.y, coordinates))
+    ymax = max(map(lambda coord: coord.y, coordinates))
+    # set x and y axes limits
+    plt.xlim(xmin, xmax)
+    plt.ylim(ymin, ymax)
+    # set ticks accordingly
+    xticks = numpy.arange(xmin, xmax, 0.2*xmax)
+#    yticks = numpy.arange(ymin, ymax)
+    plt.xticks(xticks,xticks_labels,rotation=17)
+#    plt.yticks(yticks)
+    plt.legend(plots,labels, loc=2)
+    plt.title(title)
+    plt.savefig(outputGraphFile)
+
+def draw_all_graphs(metricsDescriptionFile, testcaseEnv, clusterConfig):
+    # go through each role and plot graphs for the role's metrics
+    roles = set(map(lambda config: config['role'], clusterConfig))
+    for role in roles:
+        dashboards = get_dashboard_definition(metricsDescriptionFile, role)
+        entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
+        for dashboard in dashboards:
+            graphs = dashboard['graphs']
+            # draw each graph for all entities
+            draw_graph_for_role(graphs, entities, role, testcaseEnv)
+        
+def draw_graph_for_role(graphs, entities, role, testcaseEnv):
+    for graph in graphs:
+        graphName = graph['graph_name'] 
+        yLabel = graph['y_label']
+        inputCsvFiles = []
+        graphLegendLabels = []
+        for entity in entities:
+            entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entity['entity_id'], "metrics")
+            entityMetricCsvFile = entityMetricsDir + "/" + graph['bean_name'] + ".csv"
+            inputCsvFiles.append(entityMetricCsvFile)
+            graphLegendLabels.append(role + "-" + entity['entity_id'])
+#            print "Plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
+        try:
+            # plot one graph per mbean attribute
+            labels = graph['y_label'].split(',')
+            fullyQualifiedAttributeNames = map(lambda attribute: graph['bean_name'] + ':' + attribute, 
+                                           graph['attributes'].split(','))
+            attributes = graph['attributes'].split(',')
+            for labelAndAttribute in zip(labels, fullyQualifiedAttributeNames, attributes):            
+                outputGraphFile = testcaseEnv.testCaseDashboardsDir + "/" + role + "/" + labelAndAttribute[1] + ".svg"            
+                plot_graphs(inputCsvFiles, graphLegendLabels, graph['graph_name'] + '-' + labelAndAttribute[2], 
+                            "time", labelAndAttribute[0], labelAndAttribute[1], outputGraphFile)
+#            print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
+        except Exception as e:
+            logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
+
+def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig):
+    metricsHtmlFile = testcaseDashboardsDir + "/metrics.html"
+    centralDashboard = PyH('Kafka Metrics Dashboard')
+    centralDashboard << h1('Kafka Metrics Dashboard', cl='center')
+    roles = set(map(lambda config: config['role'], clusterConfig))
+    for role in roles:
+        entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
+        dashboardPagePath = build_dashboard_for_role(metricsDefinitionFile, role, 
+                                                     entities, testcaseDashboardsDir)
+        centralDashboard << a(role, href = dashboardPagePath)
+        centralDashboard << br()
+            
+    centralDashboard.printOut(metricsHtmlFile)
+
+def build_dashboard_for_role(metricsDefinitionFile, role, entities, testcaseDashboardsDir):
+    # build all dashboards for the input entity's based on its role. It can be one of kafka, zookeeper, producer
+    # consumer
+    dashboards = get_dashboard_definition(metricsDefinitionFile, role)
+    entityDashboard = PyH('Kafka Metrics Dashboard for ' + role)
+    entityDashboard << h1('Kafka Metrics Dashboard for ' + role, cl='center')
+    entityDashboardHtml = testcaseDashboardsDir + "/" + role + "-dashboards.html"
+    for dashboard in dashboards:
+        # place the graph svg files in this dashboard
+        allGraphs = dashboard['graphs']
+        for graph in allGraphs:
+            attributes = map(lambda attribute: graph['bean_name'] + ':' + attribute, 
+                                           graph['attributes'].split(','))
+            for attribute in attributes:                
+                graphFileLocation = testcaseDashboardsDir + "/" + role + "/" + attribute + ".svg"
+                entityDashboard << embed(src = graphFileLocation, type = "image/svg+xml")
+    entityDashboard.printOut(entityDashboardHtml)
+    return entityDashboardHtml
+
+def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, testcaseEnv):
+    logger.info("starting metrics collection on jmx port : " + jmxPort, extra=d)
+    jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
+    clusterConfig = systemTestEnv.clusterEntityConfigDictList
+    metricsDefinitionFile = systemTestEnv.METRICS_PATHNAME
+    entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
+    dashboardsForRole = get_dashboard_definition(metricsDefinitionFile, role)
+    mbeansForRole = get_mbeans_for_role(dashboardsForRole)
+    
+    kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "kafka_home")
+    javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "java_home")
+    
+    for mbean in mbeansForRole:
+        outputCsvFile = entityMetricsDir + "/" + mbean + ".csv"
+        startMetricsCmdList = ["ssh " + jmxHost,
+                               "'JAVA_HOME=" + javaHome,
+                               "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
+                               "--jmx-url " + jmxUrl,
+                               "--object-name " + mbean + " 1> ",
+                                outputCsvFile + " & echo pid:$! > ",
+                                entityMetricsDir + "/entity_pid'"]
+
+        startMetricsCommand = " ".join(startMetricsCmdList) 
+        logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
+        system_test_utils.async_sys_call(startMetricsCommand)
+            
+        pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid'"
+        logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+        subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+            # keep track of the remote entity pid in a dictionary
+        for line in subproc.stdout.readlines():
+            if line.startswith("pid"):
+                line = line.rstrip('\n')
+                logger.debug("found pid line: [" + line + "]", extra=d)
+                tokens  = line.split(':')
+                thisPid = tokens[1]
+                testcaseEnv.entityParentPidDict[thisPid] = thisPid
+
+def stop_metrics_collection(jmxHost, jmxPort):
+    logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d)
+    system_test_utils.sys_call("ps -ef | grep JmxTool | grep -v grep | grep " + jmxPort + " | awk '{print $2}' | xargs kill -9")
+
+def get_mbeans_for_role(dashboardsForRole):
+    graphs = reduce(lambda x,y: x+y, map(lambda dashboard: dashboard['graphs'], dashboardsForRole))
+    return set(map(lambda metric: metric['bean_name'], graphs))

Added: incubator/kafka/branches/0.8/system_test/utils/pyh.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/pyh.py?rev=1378666&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/pyh.py (added)
+++ incubator/kafka/branches/0.8/system_test/utils/pyh.py Wed Aug 29 18:07:58 2012
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# @file: pyh.py
+# @purpose: a HTML tag generator
+# @author: Emmanuel Turlay <tu...@cern.ch>
+
+__doc__ = """The pyh.py module is the core of the PyH package. PyH lets you
+generate HTML tags from within your python code.
+See http://code.google.com/p/pyh/ for documentation.
+"""
+__author__ = "Emmanuel Turlay <tu...@cern.ch>"
+__version__ = '$Revision: 63 $'
+__date__ = '$Date: 2010-05-21 03:09:03 +0200 (Fri, 21 May 2010) $'
+
+from sys import _getframe, stdout, modules, version
+nOpen={}
+
+nl = '\n'
+doctype = '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">\n'
+charset = '<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />\n'
+
+tags = ['html', 'body', 'head', 'link', 'meta', 'div', 'p', 'form', 'legend', 
+        'input', 'select', 'span', 'b', 'i', 'option', 'img', 'script',
+        'table', 'tr', 'td', 'th', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
+        'fieldset', 'a', 'title', 'body', 'head', 'title', 'script', 'br', 'table',
+        'ul', 'li', 'ol', 'embed']
+
+selfClose = ['input', 'img', 'link', 'br']
+
+class Tag(list):
+    tagname = ''
+    
+    def __init__(self, *arg, **kw):
+        self.attributes = kw
+        if self.tagname : 
+            name = self.tagname
+            self.isSeq = False
+        else: 
+            name = 'sequence'
+            self.isSeq = True
+        self.id = kw.get('id', name)
+        #self.extend(arg)
+        for a in arg: self.addObj(a)
+
+    def __iadd__(self, obj):
+        if isinstance(obj, Tag) and obj.isSeq:
+            for o in obj: self.addObj(o)
+        else: self.addObj(obj)
+        return self
+    
+    def addObj(self, obj):
+        if not isinstance(obj, Tag): obj = str(obj)
+        id=self.setID(obj)
+        setattr(self, id, obj)
+        self.append(obj)
+
+    def setID(self, obj):
+        if isinstance(obj, Tag):
+            id = obj.id
+            n = len([t for t in self if isinstance(t, Tag) and t.id.startswith(id)])
+        else:
+            id = 'content'
+            n = len([t for t in self if not isinstance(t, Tag)])
+        if n: id = '%s_%03i' % (id, n)
+        if isinstance(obj, Tag): obj.id = id
+        return id
+
+    def __add__(self, obj):
+        if self.tagname: return Tag(self, obj)
+        self.addObj(obj)
+        return self
+
+    def __lshift__(self, obj):
+        self += obj
+        if isinstance(obj, Tag): return obj
+
+    def render(self):
+        result = ''
+        if self.tagname:
+            result = '<%s%s%s>' % (self.tagname, self.renderAtt(), self.selfClose()*' /')
+        if not self.selfClose():
+            for c in self:
+                if isinstance(c, Tag):
+                    result += c.render()
+                else: result += c
+            if self.tagname: 
+                result += '</%s>' % self.tagname
+        result += '\n'
+        return result
+
+    def renderAtt(self):
+        result = ''
+        for n, v in self.attributes.iteritems():
+            if n != 'txt' and n != 'open':
+                if n == 'cl': n = 'class'
+                result += ' %s="%s"' % (n, v)
+        return result
+
+    def selfClose(self):
+        return self.tagname in selfClose        
+    
+def TagFactory(name):
+    class f(Tag):
+        tagname = name
+    f.__name__ = name
+    return f
+
+thisModule = modules[__name__]
+
+for t in tags: setattr(thisModule, t, TagFactory(t)) 
+
+def ValidW3C():
+    out = a(img(src='http://www.w3.org/Icons/valid-xhtml10', alt='Valid XHTML 1.0 Strict'), href='http://validator.w3.org/check?uri=referer')
+    return out
+
+class PyH(Tag):
+    tagname = 'html'
+    
+    def __init__(self, name='MyPyHPage'):
+        self += head()
+        self += body()
+        self.attributes = dict(xmlns='http://www.w3.org/1999/xhtml', lang='en')
+        self.head += title(name)
+
+    def __iadd__(self, obj):
+        if isinstance(obj, head) or isinstance(obj, body): self.addObj(obj)
+        elif isinstance(obj, meta) or isinstance(obj, link): self.head += obj
+        else:
+            self.body += obj
+            id=self.setID(obj)
+            setattr(self, id, obj)
+        return self
+
+    def addJS(self, *arg):
+        for f in arg: self.head += script(type='text/javascript', src=f)
+
+    def addCSS(self, *arg):
+        for f in arg: self.head += link(rel='stylesheet', type='text/css', href=f)
+    
+    def printOut(self,file=''):
+        if file: f = open(file, 'w')
+        else: f = stdout
+        f.write(doctype)
+        f.write(self.render())
+        f.flush()
+        if file: f.close()
+    

Modified: incubator/kafka/branches/0.8/system_test/utils/setup_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/setup_utils.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/setup_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/setup_utils.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 import logging

Modified: incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -314,7 +330,6 @@ def setup_remote_hosts(systemTestEnv):
 
     return True
 
-
 def copy_source_to_remote_hosts(hostname, sourceDir, destDir):
 
     cmdStr = "rsync -avz --delete-before " + sourceDir + "/ " + hostname + ":" + destDir

Modified: incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/testcase_env.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/testcase_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/testcase_env.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #!/usr/bin/env python
 
 # ===================================
@@ -42,10 +58,10 @@ class TestcaseEnv():
         self.systemTestBaseDir = systemTestEnv.SYSTEM_TEST_BASE_DIR
 
         # to be initialized in the Test Module
-        self.testSuiteBaseDir  = ""
-        self.testCaseBaseDir   = ""
-        self.testCaseLogsDir   = ""
-
+        self.testSuiteBaseDir      = ""
+        self.testCaseBaseDir       = ""
+        self.testCaseLogsDir       = ""
+        self.testCaseDashboardsDir = ""
         # ================================
         # dictionary to keep track of
         # user-defined environment variables