You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/08/29 20:07:59 UTC
svn commit: r1378666 - in /incubator/kafka/branches/0.8: config/
core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/message/
core/src/main/scala/kafka/metrics/ core/src/main/scala/kafka/producer/
core/src/main/scala/kafka/server/ core/src/ma...
Author: nehanarkhede
Date: Wed Aug 29 18:07:58 2012
New Revision: 1378666
URL: http://svn.apache.org/viewvc?rev=1378666&view=rev
Log:
KAFKA-489 Add metrics collection and graphs to the system test framework; patched by Neha Narkhede; reviewed by Jun Rao and John Fung
Added:
incubator/kafka/branches/0.8/system_test/metrics.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/config/
incubator/kafka/branches/0.8/system_test/utils/metrics.py
incubator/kafka/branches/0.8/system_test/utils/pyh.py
Removed:
incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/
Modified:
incubator/kafka/branches/0.8/config/log4j.properties
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
incubator/kafka/branches/0.8/system_test/cluster_config.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties
incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
incubator/kafka/branches/0.8/system_test/system_test_env.py
incubator/kafka/branches/0.8/system_test/system_test_runner.py
incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
incubator/kafka/branches/0.8/system_test/utils/setup_utils.py
incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
Modified: incubator/kafka/branches/0.8/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/log4j.properties?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/log4j.properties (original)
+++ incubator/kafka/branches/0.8/config/log4j.properties Wed Aug 29 18:07:58 2012
@@ -25,7 +25,8 @@ log4j.appender.stdout.layout.ConversionP
# Turn on all our debugging info
-#log4j.logger.kafka=INFO
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka.perf=DEBUG
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Wed Aug 29 18:07:58 2012
@@ -134,8 +134,8 @@ trait SimpleConsumerStatsMBean {
}
@threadsafe
-class SimpleConsumerStats extends SimpleConsumerStatsMBean {
- private val fetchRequestStats = new SnapshotStats
+class SimpleConsumerStats(monitoringDurationNs: Long) extends SimpleConsumerStatsMBean {
+ private val fetchRequestStats = new SnapshotStats(monitoringDurationNs)
def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs)
@@ -154,7 +154,7 @@ class SimpleConsumerStats extends Simple
object SimpleConsumerStats extends Logging {
private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
- private val stats = new SimpleConsumerStats
+ private val stats = new SimpleConsumerStats(1 * 1000L * 1000L * 1000L)
Utils.registerMBean(stats, simpleConsumerstatsMBeanName)
def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Wed Aug 29 18:07:58 2012
@@ -252,8 +252,8 @@ trait LogFlushStatsMBean {
}
@threadsafe
-class LogFlushStats extends LogFlushStatsMBean {
- private val flushRequestStats = new SnapshotStats
+class LogFlushStats(monitorDurationNs: Long) extends LogFlushStatsMBean {
+ private val flushRequestStats = new SnapshotStats(monitorDurationNs)
def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
@@ -270,7 +270,7 @@ class LogFlushStats extends LogFlushStat
object LogFlushStats extends Logging {
private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
- private val stats = new LogFlushStats
+ private val stats = new LogFlushStats(1L * 1000 * 1000 * 1000)
Utils.registerMBean(stats, LogFlushStatsMBeanName)
def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala Wed Aug 29 18:07:58 2012
@@ -33,7 +33,7 @@ trait KafkaMetricsGroup extends Logging
*
* @return The sub-group identifier.
*/
- def metricsGroupIdent: String
+ def metricsGroupIdent: String = ""
/**
* Creates a new MetricName object for gauges, meters, etc. created for this
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Wed Aug 29 18:07:58 2012
@@ -192,8 +192,8 @@ trait SyncProducerStatsMBean {
}
@threadsafe
-class SyncProducerStats extends SyncProducerStatsMBean {
- private val produceRequestStats = new SnapshotStats
+class SyncProducerStats(monitoringDurationNs: Long) extends SyncProducerStatsMBean {
+ private val produceRequestStats = new SnapshotStats(monitoringDurationNs)
def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs)
@@ -208,8 +208,10 @@ class SyncProducerStats extends SyncProd
object SyncProducerStats extends Logging {
private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
- private val stats = new SyncProducerStats
+ private val stats = new SyncProducerStats(1L * 1000 * 1000 * 1000)
swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
- def recordProduceRequest(requestMs: Long) = stats.recordProduceRequest(requestMs)
+ def recordProduceRequest(requestMs: Long) = {
+ stats.recordProduceRequest(requestMs)
+ }
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Aug 29 18:07:58 2012
@@ -459,9 +459,8 @@ class KafkaApis(val requestChannel: Requ
*/
class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
- this.logIdent = "[etchRequestPurgatory-%d], ".format(brokerId)
+ this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
- override def metricsGroupIdent = metricsGroup
/**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@@ -598,8 +597,6 @@ class KafkaApis(val requestChannel: Requ
this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId)
- override def metricsGroupIdent = metricsGroup
-
protected def checkSatisfied(followerFetchRequestKey: RequestKey,
delayedProduce: DelayedProduce) =
delayedProduce.isSatisfied(followerFetchRequestKey)
@@ -617,7 +614,6 @@ class KafkaApis(val requestChannel: Requ
private class DelayedRequestMetrics {
private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
- override def metricsGroupIdent = metricsGroup
val caughtUpFollowerFetchRequestMeter =
newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel)
@@ -645,7 +641,6 @@ class KafkaApis(val requestChannel: Requ
keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
private val metricPrefix = if (forFollower) "Follower" else "NonFollower"
- override def metricsGroupIdent = metricsGroup
val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond",
"requests", TimeUnit.SECONDS))
@@ -705,7 +700,6 @@ class KafkaApis(val requestChannel: Requ
aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
}
-
private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) {
val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Wed Aug 29 18:07:58 2012
@@ -72,8 +72,6 @@ abstract class RequestPurgatory[T <: Del
private val satisfactionRateBeanName = "SatisfactionRate"
private val expirationRateBeanName = "ExpirationRate"
- override def metricsGroupIdent = ""
-
val satisfactionRateMeter = newMeter(
satisfactionRateBeanName,
"requests",
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala Wed Aug 29 18:07:58 2012
@@ -26,37 +26,43 @@ import joptsimple.OptionParser
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.math._
+import kafka.utils.Logging
-
-object JmxTool {
+object JmxTool extends Logging {
def main(args: Array[String]) {
// Parse command line
val parser = new OptionParser
- val objectNameOpt =
+ val objectNameOpt =
parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
- "can be given multiple times to specify more than one query. If no objects are specified " +
- "all objects will be queried.")
- .withRequiredArg
- .describedAs("name")
- .ofType(classOf[String])
+ "can be given multiple times to specify more than one query. If no objects are specified " +
+ "all objects will be queried.")
+ .withRequiredArg
+ .describedAs("name")
+ .ofType(classOf[String])
+ val attributesOpt =
+ parser.accepts("attributes", "The whitelist of attributes to query. This is a comma-separated list. If no " +
+ "attributes are specified all objects will be queried.")
+ .withOptionalArg()
+ .describedAs("name")
+ .ofType(classOf[String])
val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
- .defaultsTo(5000)
+ .defaultsTo(2000)
val helpOpt = parser.accepts("help", "Print usage information.")
- val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
- "See java.text.SimpleDateFormat for options.")
+ val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
+ "See java.text.SimpleDateFormat for options.")
.withOptionalArg()
.describedAs("format")
.ofType(classOf[String])
val jmxServiceUrlOpt =
parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
- .withRequiredArg
- .describedAs("service-url")
- .ofType(classOf[String])
- .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
+ .withRequiredArg
+ .describedAs("service-url")
+ .ofType(classOf[String])
+ .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
val options = parser.parse(args : _*)
@@ -67,44 +73,62 @@ object JmxTool {
val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
val interval = options.valueOf(reportingIntervalOpt).intValue
+ val attributesWhitelistExists = options.has(attributesOpt)
+ val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None
val dateFormatExists = options.has(dateFormatOpt)
val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
val jmxc = JMXConnectorFactory.connect(url, null)
val mbsc = jmxc.getMBeanServerConnection()
- val queries: Iterable[ObjectName] =
+ val queries: Iterable[ObjectName] =
if(options.has(objectNameOpt))
options.valuesOf(objectNameOpt).map(new ObjectName(_))
else
List(null)
+
val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten
- val attributes: Iterable[(ObjectName, Array[String])] =
+ val allAttributes: Iterable[(ObjectName, Array[String])] =
names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
+
+ val numExpectedAttributes: Map[ObjectName, Int] =
+ attributesWhitelistExists match {
+ case true => queries.map((_, attributesWhitelist.get.size)).toMap
+ case false => names.map((name: ObjectName) =>
+ (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName).size)).toMap
+ }
+
// print csv header
- val keys = List("time") ++ queryAttributes(mbsc, names).keys.toArray.sorted
- println(keys.map("\"" + _ + "\"").mkString(", "))
+ val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
+ if(keys.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
+ println(keys.map("\"" + _ + "\"").mkString(","))
while(true) {
val start = System.currentTimeMillis
- val attributes = queryAttributes(mbsc, names)
+ val attributes = queryAttributes(mbsc, names, attributesWhitelist)
attributes("time") = dateFormat match {
case Some(dFormat) => dFormat.format(new Date)
case None => System.currentTimeMillis().toString
}
- println(keys.map(attributes(_)).mkString(", "))
+ if(attributes.keySet.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
+ println(keys.map(attributes(_)).mkString(","))
val sleep = max(0, interval - (System.currentTimeMillis - start))
Thread.sleep(sleep)
}
}
- def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName]) = {
+ def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
var attributes = new mutable.HashMap[String, Any]()
- for(name <- names) {
- val mbean = mbsc.getMBeanInfo(name)
+ for(name <- names) {
+ val mbean = mbsc.getMBeanInfo(name)
for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName))) {
val attr = attrObj.asInstanceOf[Attribute]
- attributes(name + ":" + attr.getName) = attr.getValue
+ attributesWhitelist match {
+ case Some(allowedAttributes) =>
+ if(allowedAttributes.contains(attr.getName))
+ attributes(name + ":" + attr.getName) = attr.getValue
+ case None => attributes(name + ":" + attr.getName) = attr.getValue
+ }
}
}
attributes
Modified: incubator/kafka/branches/0.8/system_test/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/cluster_config.json?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/cluster_config.json (original)
+++ incubator/kafka/branches/0.8/system_test/cluster_config.json Wed Aug 29 18:07:58 2012
@@ -4,7 +4,7 @@
"entity_id": "0",
"hostname": "localhost",
"role": "zookeeper",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+ "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9990"
},
@@ -12,7 +12,7 @@
"entity_id": "1",
"hostname": "localhost",
"role": "broker",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+ "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9991"
},
@@ -20,7 +20,7 @@
"entity_id": "2",
"hostname": "localhost",
"role": "broker",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+ "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9992"
},
@@ -28,7 +28,7 @@
"entity_id": "3",
"hostname": "localhost",
"role": "broker",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+ "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9993"
},
@@ -36,7 +36,7 @@
"entity_id": "4",
"hostname": "localhost",
"role": "producer_performance",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+ "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9994"
},
@@ -44,7 +44,7 @@
"entity_id": "5",
"hostname": "localhost",
"role": "console_consumer",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+ "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
"java_home": "/export/apps/jdk/JDK-1_6_0_27",
"jmx_port": "9995"
}
Added: incubator/kafka/branches/0.8/system_test/metrics.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/metrics.json?rev=1378666&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/metrics.json (added)
+++ incubator/kafka/branches/0.8/system_test/metrics.json Wed Aug 29 18:07:58 2012
@@ -0,0 +1,174 @@
+{
+ "dashboards": [
+ {
+ "role": "broker",
+ "graphs": [
+ {
+ "graph_name": "SocketServerThroughput",
+ "y_label": "bytes-read-per-second,bytes-written-per-second",
+ "bean_name": "kafka:type=kafka.SocketServerStats",
+ "attributes": "BytesReadPerSecond,BytesWrittenPerSecond"
+ },
+ {
+ "graph_name": "FetchRequestPurgatoryNumDelayedRequests",
+ "y_label": "num-delayed-requests",
+ "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
+ "attributes": "Value"
+ },
+ {
+ "graph_name": "MeanFetchRequestPurgatorySatisfactionRate",
+ "y_label": "mean-request-satisfaction-rate",
+ "bean_name": "kafka.server:type=FetchRequestPurgatory,name=SatisfactionRate",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "FetchRequestPurgatoryTimeToSatisfy",
+ "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+ "bean_name": "kafka.server:type=FetchRequestPurgatory,name=TimeToSatisfyInNs",
+ "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ },
+ {
+ "graph_name": "FetchRequestPurgatoryExpirationRate",
+ "y_label": "expiration-rate",
+ "bean_name": "kafka.server:type=FetchRequestPurgatory,name=ExpirationRate",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "ProducerRequestPurgatoryNumDelayedRequests",
+ "y_label": "num-delayed-requests",
+ "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
+ "attributes": "Value"
+ },
+ {
+ "graph_name": "MeanProducerRequestPurgatorySatisfactionRate",
+ "y_label": "mean-request-satisfaction-rate",
+ "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=SatisfactionRate",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "ProducerRequestPurgatoryExpirationRate",
+ "y_label": "expiration-rate",
+ "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=ExpirationRate",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedProducerRequests-CaughtUpFollowerFetchRequestsPerSecond",
+ "y_label": "mean-caught-up-follower-fetch-requests-per-second",
+ "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=CaughtUpFollowerRequestsPerSecond-all",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedProducerRequests-ExpiredRequestRate",
+ "y_label": "mean-expired-request-rate",
+ "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=ExpiredRequestsPerSecond-all",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedProducerRequests-FollowerCatchUpLatency",
+ "y_label": "mean-follower-catchup-time-ns,95th-percentile-follower-catchup-time-ns,99th-percentile-follower-catchup-time-ns,999th-percentile-follower-catchup-time-ns",
+ "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=FollowerCatchUpTimeInNs",
+ "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ },
+ {
+ "graph_name": "DelayedProducerRequests-SatisfactionTimeInNs",
+ "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+ "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfactionTimeInNs",
+ "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ },
+ {
+ "graph_name": "DelayedProducerRequests-SatisfiedRequestsPerSecond",
+ "y_label": "mean-satisfaction-requests-per-second",
+ "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfiedRequestsPerSecond",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedProducerRequests-Throughput-all",
+ "y_label": "mean-purgatory-throughput-all",
+ "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=Throughput-all",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedFetchRequests-Follower-ExpiredRequestRate",
+ "y_label": "mean-expired-request-rate",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-ExpiredRequestsPerSecond",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedFetchRequests-Follower-SatisfactionTimeInNs",
+ "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfactionTimeInNs",
+ "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ },
+ {
+ "graph_name": "DelayedProducerRequests-Follower-SatisfiedRequestsPerSecond",
+ "y_label": "mean-satisfaction-requests-per-second",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfiedRequestsPerSecond",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedFetchRequests-Follower-Throughput-all",
+ "y_label": "mean-purgatory-throughput-all",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-Throughput-all",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedFetchRequests-NonFollower-ExpiredRequestRate",
+ "y_label": "mean-expired-request-rate",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-ExpiredRequestsPerSecond",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedFetchRequests-NonFollower-SatisfactionTimeInNs",
+ "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfactionTimeInNs",
+ "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ },
+ {
+ "graph_name": "DelayedFetchRequests-NonFollower-SatisfiedRequestsPerSecond",
+ "y_label": "mean-satisfaction-requests-per-second",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfiedRequestsPerSecond",
+ "attributes": "MeanRate"
+ },
+ {
+ "graph_name": "DelayedFetchRequests-NonFollower-Throughput-all",
+ "y_label": "mean-purgatory-throughput-all",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-Throughput-all",
+ "attributes": "MeanRate"
+ }
+ ]
+ },
+ {
+ "role": "producer_performance",
+ "graphs": [
+ {
+ "graph_name": "ProducerStats",
+ "y_label": "avg-producer-latency-ms,max-producer-latency-ms,produce-request-throughput",
+ "bean_name": "kafka:type=kafka.KafkaProducerStats",
+ "attributes": "AvgProduceRequestMs,MaxProduceRequestMs,ProduceRequestsPerSecond"
+ }
+ ]
+ },
+ {
+ "role": "console_consumer",
+ "graphs": [
+ {
+ "graph_name": "SimpleConsumerRequestStats",
+ "y_label": "simple-consumer-throughput,simple-consumer-throughput-bytes,simple-consumer-latency-ms",
+ "bean_name": "kafka:type=kafka.SimpleConsumerStats",
+ "attributes": "FetchRequestsPerSecond,ConsumerThroughput,AvgFetchRequestMs"
+ }
+ ]
+ },
+ {
+ "role": "zookeeper",
+ "graphs": [
+ {
+ "graph_name": "ZookeeperServerStats",
+ "y_label": "zookeeper-latency-ms",
+ "bean_name": "org.apache.ZooKeeperService:name0=StandaloneServer_port-1",
+ "attributes": "AvgRequestLatency"
+ }
+ ]
+ }
+ ]
+}
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties Wed Aug 29 18:07:58 2012
@@ -118,3 +118,5 @@ zk.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
+
+monitoring.period.secs=1
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
#!/usr/bin/env python
# ===================================
@@ -11,6 +27,7 @@ import signal
import subprocess
import sys
import time
+import traceback
from system_test_env import SystemTestEnv
sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
@@ -20,6 +37,7 @@ from testcase_env import Testcas
# product specific: Kafka
import kafka_system_test_utils
+import metrics
class ReplicaBasicTest(SetupUtils):
@@ -85,7 +103,8 @@ class ReplicaBasicTest(SetupUtils):
#### => update testcaseEnv
self.testcaseEnv.testCaseBaseDir = testCasePathName
self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
-
+ self.testcaseEnv.testCaseDashboardsDir = self.testcaseEnv.testCaseBaseDir + "/dashboards"
+
# get testcase description
testcaseDescription = ""
for k,v in testcaseNonEntityDataDict.items():
@@ -214,8 +233,9 @@ class ReplicaBasicTest(SetupUtils):
# starting producer
self.log_message("starting producer")
kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
- self.anonLogger.info("sleeping for 5s")
- time.sleep(5)
+ self.anonLogger.info("sleeping for 10s")
+ time.sleep(10)
+ kafka_system_test_utils.stop_producer()
# starting previously terminated broker
if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]):
@@ -231,26 +251,45 @@ class ReplicaBasicTest(SetupUtils):
# starting consumer
self.log_message("starting consumer")
kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
-
+ time.sleep(10)
+ kafka_system_test_utils.stop_consumer()
+
# this testcase is completed - so stopping all entities
self.log_message("stopping all entities")
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
-
+
# validate the data matched
self.log_message("validating data matched")
result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
-
+
# =============================================
# collect logs from remote hosts
# =============================================
kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+ # ==========================
+ # draw graphs
+ # ==========================
+ metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME,
+ self.testcaseEnv,
+ self.systemTestEnv.clusterEntityConfigDictList)
+
+ # build dashboard, one for each role
+ metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME,
+ self.testcaseEnv.testCaseDashboardsDir,
+ self.systemTestEnv.clusterEntityConfigDictList)
+
+ # stop metrics processes
+ for entity in self.systemTestEnv.clusterEntityConfigDictList:
+ metrics.stop_metrics_collection(entity['hostname'], entity['jmx_port'])
+
except Exception as e:
- self.log_message("Exception caught : ")
- print e
+ self.log_message("Exception while running test {0}".format(e))
+ traceback.print_exc()
self.log_message("stopping all entities")
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
Modified: incubator/kafka/branches/0.8/system_test/system_test_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/system_test_env.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/system_test_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/system_test_env.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
#!/usr/bin/env python
# ===================================
@@ -23,6 +39,8 @@ class SystemTestEnv():
SYSTEM_TEST_MODULE_EXT = ".py"
CLUSTER_CONFIG_FILENAME = "cluster_config.json"
CLUSTER_CONFIG_PATHNAME = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/" + CLUSTER_CONFIG_FILENAME)
+ METRICS_FILENAME = "metrics.json"
+ METRICS_PATHNAME = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/" + METRICS_FILENAME)
clusterEntityConfigDictList = []
systemTestResultsList = []
Modified: incubator/kafka/branches/0.8/system_test/system_test_runner.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/system_test_runner.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/system_test_runner.py (original)
+++ incubator/kafka/branches/0.8/system_test/system_test_runner.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
#!/usr/bin/evn python
# ===================================
Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
#!/usr/bin/env python
# ===================================
@@ -16,6 +32,7 @@ import time
import traceback
import system_test_utils
+import metrics
from datetime import datetime
from time import mktime
@@ -79,19 +96,22 @@ def generate_testcase_log_dirs(systemTes
if not os.path.exists(testcasePathName + "/config") : os.makedirs(testcasePathName + "/config")
if not os.path.exists(testcasePathName + "/logs") : os.makedirs(testcasePathName + "/logs")
+ if not os.path.exists(testcasePathName + "/dashboards") : os.makedirs(testcasePathName + "/dashboards")
- dashboardsPathName = testcaseEnv.testCaseLogsDir + "/dashboards"
+ dashboardsPathName = testcasePathName + "/dashboards"
if not os.path.exists(dashboardsPathName) : os.makedirs(dashboardsPathName)
for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
entityId = clusterEntityConfigDict["entity_id"]
role = clusterEntityConfigDict["role"]
- logger.debug("entity_id : " + entityId, extra=d)
- logger.debug("role : " + role, extra=d)
-
metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName)
+
+ # create the role directory under dashboards
+ dashboardsRoleDir = dashboardsPathName + "/" + role
+ if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir)
+
def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
@@ -192,40 +212,6 @@ def copy_file_with_dict_values(srcFile,
outfile.write(line)
outfile.close()
-
-def start_metrics_collection(jmxHost, jmxPort, mBeanObjectName, mBeanAttributes, entityId, clusterEntityConfigDictList, testcaseEnv):
- logger.info("starting metrics collection on jmx port: " + jmxPort, extra=d)
- jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
- kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
- javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
- metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", entityId, "metrics")
-
- startMetricsCmdList = ["ssh " + jmxHost,
- "'JAVA_HOME=" + javaHome,
- "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
- "--jmx-url " + jmxUrl,
- "--object-name " + mBeanObjectName + " &> ",
- metricsPathName + "/metrics.csv & echo pid:$! > ",
- metricsPathName + "/entity_pid'"]
-
- startMetricsCommand = " ".join(startMetricsCmdList)
- logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
- system_test_utils.async_sys_call(startMetricsCommand)
-
- pidCmdStr = "ssh " + jmxHost + " 'cat " + metricsPathName + "/entity_pid'"
- logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
- subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
-
- # keep track of the remote entity pid in a dictionary
- for line in subproc.stdout.readlines():
- if line.startswith("pid"):
- line = line.rstrip('\n')
- logger.debug("found pid line: [" + line + "]", extra=d)
- tokens = line.split(':')
- thisPid = tokens[1]
- testcaseEnv.entityParentPidDict[thisPid] = thisPid
-
-
def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
logger.info("calling generate_properties_files", extra=d)
@@ -402,6 +388,7 @@ def start_entity_in_background(systemTes
if role == "zookeeper":
cmdList = ["ssh " + hostname,
"'JAVA_HOME=" + javaHome,
+ "JMX_PORT=" + jmxPort,
kafkaHome + "/bin/zookeeper-server-start.sh ",
configPathName + "/" + configFile + " &> ",
logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -418,7 +405,7 @@ def start_entity_in_background(systemTes
elif role == "broker":
cmdList = ["ssh " + hostname,
"'JAVA_HOME=" + javaHome,
- "JMX_PORT=" + jmxPort,
+ "JMX_PORT=" + jmxPort,
kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
configPathName + "/" + configFile + " &> ",
logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -446,10 +433,8 @@ def start_entity_in_background(systemTes
tokens = line.split(':')
testcaseEnv.entityParentPidDict[entityId] = tokens[1]
- # if it is a broker, start metric collection
- if role == "broker":
- start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \
- "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv)
+ time.sleep(1)
+ metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
def start_console_consumer(systemTestEnv, testcaseEnv):
@@ -460,6 +445,8 @@ def start_console_consumer(systemTestEnv
for consumerConfig in consumerConfigList:
host = consumerConfig["hostname"]
entityId = consumerConfig["entity_id"]
+ jmxPort = consumerConfig["jmx_port"]
+ role = consumerConfig["role"]
kafkaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
@@ -476,13 +463,15 @@ def start_console_consumer(systemTestEnv
commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"])
cmdList = ["ssh " + host,
"'JAVA_HOME=" + javaHome,
+ "JMX_PORT=" + jmxPort,
kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
commandArgs + " &> " + consumerLogPathName + "'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
- system_test_utils.sys_call(cmdStr)
-
+ system_test_utils.async_sys_call(cmdStr)
+ time.sleep(2)
+ metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
def start_producer_performance(systemTestEnv, testcaseEnv):
@@ -494,6 +483,8 @@ def start_producer_performance(systemTes
for producerConfig in producerConfigList:
host = producerConfig["hostname"]
entityId = producerConfig["entity_id"]
+ jmxPort = producerConfig["jmx_port"]
+ role = producerConfig["role"]
kafkaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
@@ -510,12 +501,15 @@ def start_producer_performance(systemTes
commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"])
cmdList = ["ssh " + host,
"'JAVA_HOME=" + javaHome,
+ "JMX_PORT=" + jmxPort,
kafkaRunClassBin + " kafka.perf.ProducerPerformance",
commandArgs + " &> " + producerLogPathName + "'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
- system_test_utils.sys_call(cmdStr)
+ system_test_utils.async_sys_call(cmdStr)
+ time.sleep(1)
+ metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
def stop_remote_entity(systemTestEnv, entityId, parentPid):
@@ -525,8 +519,8 @@ def stop_remote_entity(systemTestEnv, en
pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid)
logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
- system_test_utils.sigterm_remote_process(hostname, pidStack)
- time.sleep(1)
+# system_test_utils.sigterm_remote_process(hostname, pidStack)
+# time.sleep(1)
system_test_utils.sigkill_remote_process(hostname, pidStack)
@@ -632,10 +626,9 @@ def validate_leader_election_successful(
validationStatusDict["Validate leader election successful"] = "PASSED"
return True
except Exception, e:
- logger.error("leader info not completed:", extra=d)
+ logger.error("leader info not completed: {0}".format(e), extra=d)
+ traceback.print_exc()
validationStatusDict["Validate leader election successful"] = "FAILED"
- print leaderDict
- print e
return False
else:
validationStatusDict["Validate leader election successful"] = "FAILED"
@@ -677,5 +670,14 @@ def cleanup_data_at_remote_hosts(systemT
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
+def get_entity_log_directory(testCaseBaseDir, entity_id, role):
+ return testCaseBaseDir + "/logs/" + role + "-" + entity_id
+
+def get_entities_for_role(clusterConfig, role):
+ return filter(lambda entity: entity['role'] == role, clusterConfig)
+def stop_consumer():
+ system_test_utils.sys_call("ps -ef | grep ConsoleConsumer | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
+def stop_producer():
+ system_test_utils.sys_call("ps -ef | grep ProducerPerformance | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
Added: incubator/kafka/branches/0.8/system_test/utils/metrics.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/metrics.py?rev=1378666&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/metrics.py (added)
+++ incubator/kafka/branches/0.8/system_test/utils/metrics.py Wed Aug 29 18:07:58 2012
@@ -0,0 +1,254 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#!/usr/bin/env python
+
+# ===================================
+# file: metrics.py
+# ===================================
+
+import inspect
+import json
+import logging
+import os
+import signal
+import subprocess
+import sys
+import traceback
+
+import csv
+import time
+import matplotlib.pyplot as plt
+from collections import namedtuple
+import numpy
+
+from pyh import *
+import kafka_system_test_utils
+import system_test_utils
+
+logger = logging.getLogger("namedLogger")
+thisClassName = '(metrics)'
+d = {'name_of_class': thisClassName}
+
+def read_metrics_definition(metricsFile):
+ metricsFileData = open(metricsFile, "r").read()
+ metricsJsonData = json.loads(metricsFileData)
+ allDashboards = metricsJsonData['dashboards']
+ allGraphs = []
+ for dashboard in allDashboards:
+ dashboardName = dashboard['name']
+ graphs = dashboard['graphs']
+ for graph in graphs:
+ bean = graph['bean_name']
+ allGraphs.append(graph)
+ attributes = graph['attributes']
+ #print "Filtering on attributes " + attributes
+ return allGraphs
+
+def get_dashboard_definition(metricsFile, role):
+ metricsFileData = open(metricsFile, "r").read()
+ metricsJsonData = json.loads(metricsFileData)
+ allDashboards = metricsJsonData['dashboards']
+ dashboardsForRole = []
+ for dashboard in allDashboards:
+ if dashboard['role'] == role:
+ dashboardsForRole.append(dashboard)
+ return dashboardsForRole
+
+def ensure_valid_headers(headers, attributes):
+ if headers[0] != "time":
+ raise Exception("First column should be time")
+ for header in headers:
+ logger.debug(header, extra=d)
+ # there should be exactly one column with a name that matches attributes
+ try:
+ attributeColumnIndex = headers.index(attributes)
+ return attributeColumnIndex
+ except ValueError as ve:
+ raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) +
+ " headers: {0}".format(",".join(headers)))
+
+def plot_graphs(inputCsvFiles, labels, title, xLabel, yLabel, attribute, outputGraphFile):
+ # create empty plot
+ fig=plt.figure()
+ fig.subplots_adjust(bottom=0.2)
+ ax=fig.add_subplot(111)
+ labelx = -0.3 # axes coords
+ ax.set_xlabel(xLabel)
+ ax.set_ylabel(yLabel)
+ ax.grid()
+ #ax.yaxis.set_label_coords(labelx, 0.5)
+ Coordinates = namedtuple("Coordinates", 'x y')
+ plots = []
+ coordinates = []
+ # read data for all files, organize by label in a dict
+ for fileAndLabel in zip(inputCsvFiles, labels):
+ inputCsvFile = fileAndLabel[0]
+ label = fileAndLabel[1]
+ csv_reader = list(csv.reader(open(inputCsvFile, "rb")))
+ x,y = [],[]
+ xticks_labels = []
+ try:
+ # read first line as the headers
+ headers = csv_reader.pop(0)
+ attributeColumnIndex = ensure_valid_headers(headers, attribute)
+ logger.debug("Column index for attribute {0} is {1}".format(attribute, attributeColumnIndex), extra=d)
+ start_time = int(csv_reader[0][0])
+ for line in csv_reader:
+ yVal = float(line[attributeColumnIndex])
+ xVal = (int(line[0])-start_time)/1000
+ y.append(yVal)
+ epoch=int(line[0])/1000
+ x.append(xVal)
+ xticks_labels.append(time.strftime("%H:%M:%S", time.localtime(epoch)))
+ coordinates.append(Coordinates(xVal, yVal))
+ p1 = ax.plot(x,y)
+ plots.append(p1)
+ except Exception as e:
+ logger.error("ERROR while plotting data for {0}: {1}".format(inputCsvFile, e), extra=d)
+ # find xmin, xmax, ymin, ymax from all csv files
+ xmin = min(map(lambda coord: coord.x, coordinates))
+ xmax = max(map(lambda coord: coord.x, coordinates))
+ ymin = min(map(lambda coord: coord.y, coordinates))
+ ymax = max(map(lambda coord: coord.y, coordinates))
+ # set x and y axes limits
+ plt.xlim(xmin, xmax)
+ plt.ylim(ymin, ymax)
+ # set ticks accordingly
+ xticks = numpy.arange(xmin, xmax, 0.2*xmax)
+# yticks = numpy.arange(ymin, ymax)
+ plt.xticks(xticks,xticks_labels,rotation=17)
+# plt.yticks(yticks)
+ plt.legend(plots,labels, loc=2)
+ plt.title(title)
+ plt.savefig(outputGraphFile)
+
+def draw_all_graphs(metricsDescriptionFile, testcaseEnv, clusterConfig):
+ # go through each role and plot graphs for the role's metrics
+ roles = set(map(lambda config: config['role'], clusterConfig))
+ for role in roles:
+ dashboards = get_dashboard_definition(metricsDescriptionFile, role)
+ entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
+ for dashboard in dashboards:
+ graphs = dashboard['graphs']
+ # draw each graph for all entities
+ draw_graph_for_role(graphs, entities, role, testcaseEnv)
+
+def draw_graph_for_role(graphs, entities, role, testcaseEnv):
+ for graph in graphs:
+ graphName = graph['graph_name']
+ yLabel = graph['y_label']
+ inputCsvFiles = []
+ graphLegendLabels = []
+ for entity in entities:
+ entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entity['entity_id'], "metrics")
+ entityMetricCsvFile = entityMetricsDir + "/" + graph['bean_name'] + ".csv"
+ inputCsvFiles.append(entityMetricCsvFile)
+ graphLegendLabels.append(role + "-" + entity['entity_id'])
+# print "Plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
+ try:
+ # plot one graph per mbean attribute
+ labels = graph['y_label'].split(',')
+ fullyQualifiedAttributeNames = map(lambda attribute: graph['bean_name'] + ':' + attribute,
+ graph['attributes'].split(','))
+ attributes = graph['attributes'].split(',')
+ for labelAndAttribute in zip(labels, fullyQualifiedAttributeNames, attributes):
+ outputGraphFile = testcaseEnv.testCaseDashboardsDir + "/" + role + "/" + labelAndAttribute[1] + ".svg"
+ plot_graphs(inputCsvFiles, graphLegendLabels, graph['graph_name'] + '-' + labelAndAttribute[2],
+ "time", labelAndAttribute[0], labelAndAttribute[1], outputGraphFile)
+# print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
+ except Exception as e:
+ logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
+
+def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig):
+ metricsHtmlFile = testcaseDashboardsDir + "/metrics.html"
+ centralDashboard = PyH('Kafka Metrics Dashboard')
+ centralDashboard << h1('Kafka Metrics Dashboard', cl='center')
+ roles = set(map(lambda config: config['role'], clusterConfig))
+ for role in roles:
+ entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
+ dashboardPagePath = build_dashboard_for_role(metricsDefinitionFile, role,
+ entities, testcaseDashboardsDir)
+ centralDashboard << a(role, href = dashboardPagePath)
+ centralDashboard << br()
+
+ centralDashboard.printOut(metricsHtmlFile)
+
+def build_dashboard_for_role(metricsDefinitionFile, role, entities, testcaseDashboardsDir):
+ # build all dashboards for the input entity's based on its role. It can be one of kafka, zookeeper, producer
+ # consumer
+ dashboards = get_dashboard_definition(metricsDefinitionFile, role)
+ entityDashboard = PyH('Kafka Metrics Dashboard for ' + role)
+ entityDashboard << h1('Kafka Metrics Dashboard for ' + role, cl='center')
+ entityDashboardHtml = testcaseDashboardsDir + "/" + role + "-dashboards.html"
+ for dashboard in dashboards:
+ # place the graph svg files in this dashboard
+ allGraphs = dashboard['graphs']
+ for graph in allGraphs:
+ attributes = map(lambda attribute: graph['bean_name'] + ':' + attribute,
+ graph['attributes'].split(','))
+ for attribute in attributes:
+ graphFileLocation = testcaseDashboardsDir + "/" + role + "/" + attribute + ".svg"
+ entityDashboard << embed(src = graphFileLocation, type = "image/svg+xml")
+ entityDashboard.printOut(entityDashboardHtml)
+ return entityDashboardHtml
+
+def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, testcaseEnv):
+ logger.info("starting metrics collection on jmx port : " + jmxPort, extra=d)
+ jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
+ clusterConfig = systemTestEnv.clusterEntityConfigDictList
+ metricsDefinitionFile = systemTestEnv.METRICS_PATHNAME
+ entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
+ dashboardsForRole = get_dashboard_definition(metricsDefinitionFile, role)
+ mbeansForRole = get_mbeans_for_role(dashboardsForRole)
+
+ kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "kafka_home")
+ javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "java_home")
+
+ for mbean in mbeansForRole:
+ outputCsvFile = entityMetricsDir + "/" + mbean + ".csv"
+ startMetricsCmdList = ["ssh " + jmxHost,
+ "'JAVA_HOME=" + javaHome,
+ "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
+ "--jmx-url " + jmxUrl,
+ "--object-name " + mbean + " 1> ",
+ outputCsvFile + " & echo pid:$! > ",
+ entityMetricsDir + "/entity_pid'"]
+
+ startMetricsCommand = " ".join(startMetricsCmdList)
+ logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
+ system_test_utils.async_sys_call(startMetricsCommand)
+
+ pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid'"
+ logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+ subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+ # keep track of the remote entity pid in a dictionary
+ for line in subproc.stdout.readlines():
+ if line.startswith("pid"):
+ line = line.rstrip('\n')
+ logger.debug("found pid line: [" + line + "]", extra=d)
+ tokens = line.split(':')
+ thisPid = tokens[1]
+ testcaseEnv.entityParentPidDict[thisPid] = thisPid
+
+def stop_metrics_collection(jmxHost, jmxPort):
+ logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d)
+ system_test_utils.sys_call("ps -ef | grep JmxTool | grep -v grep | grep " + jmxPort + " | awk '{print $2}' | xargs kill -9")
+
+def get_mbeans_for_role(dashboardsForRole):
+ graphs = reduce(lambda x,y: x+y, map(lambda dashboard: dashboard['graphs'], dashboardsForRole))
+ return set(map(lambda metric: metric['bean_name'], graphs))
Added: incubator/kafka/branches/0.8/system_test/utils/pyh.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/pyh.py?rev=1378666&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/pyh.py (added)
+++ incubator/kafka/branches/0.8/system_test/utils/pyh.py Wed Aug 29 18:07:58 2012
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# @file: pyh.py
+# @purpose: a HTML tag generator
+# @author: Emmanuel Turlay <tu...@cern.ch>
+
+__doc__ = """The pyh.py module is the core of the PyH package. PyH lets you
+generate HTML tags from within your python code.
+See http://code.google.com/p/pyh/ for documentation.
+"""
+__author__ = "Emmanuel Turlay <tu...@cern.ch>"
+__version__ = '$Revision: 63 $'
+__date__ = '$Date: 2010-05-21 03:09:03 +0200 (Fri, 21 May 2010) $'
+
+from sys import _getframe, stdout, modules, version
+nOpen={}
+
+nl = '\n'
+doctype = '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">\n'
+charset = '<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />\n'
+
+tags = ['html', 'body', 'head', 'link', 'meta', 'div', 'p', 'form', 'legend',
+ 'input', 'select', 'span', 'b', 'i', 'option', 'img', 'script',
+ 'table', 'tr', 'td', 'th', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
+ 'fieldset', 'a', 'title', 'body', 'head', 'title', 'script', 'br', 'table',
+ 'ul', 'li', 'ol', 'embed']
+
+selfClose = ['input', 'img', 'link', 'br']
+
+class Tag(list):
+ tagname = ''
+
+ def __init__(self, *arg, **kw):
+ self.attributes = kw
+ if self.tagname :
+ name = self.tagname
+ self.isSeq = False
+ else:
+ name = 'sequence'
+ self.isSeq = True
+ self.id = kw.get('id', name)
+ #self.extend(arg)
+ for a in arg: self.addObj(a)
+
+ def __iadd__(self, obj):
+ if isinstance(obj, Tag) and obj.isSeq:
+ for o in obj: self.addObj(o)
+ else: self.addObj(obj)
+ return self
+
+ def addObj(self, obj):
+ if not isinstance(obj, Tag): obj = str(obj)
+ id=self.setID(obj)
+ setattr(self, id, obj)
+ self.append(obj)
+
+ def setID(self, obj):
+ if isinstance(obj, Tag):
+ id = obj.id
+ n = len([t for t in self if isinstance(t, Tag) and t.id.startswith(id)])
+ else:
+ id = 'content'
+ n = len([t for t in self if not isinstance(t, Tag)])
+ if n: id = '%s_%03i' % (id, n)
+ if isinstance(obj, Tag): obj.id = id
+ return id
+
+ def __add__(self, obj):
+ if self.tagname: return Tag(self, obj)
+ self.addObj(obj)
+ return self
+
+ def __lshift__(self, obj):
+ self += obj
+ if isinstance(obj, Tag): return obj
+
+ def render(self):
+ result = ''
+ if self.tagname:
+ result = '<%s%s%s>' % (self.tagname, self.renderAtt(), self.selfClose()*' /')
+ if not self.selfClose():
+ for c in self:
+ if isinstance(c, Tag):
+ result += c.render()
+ else: result += c
+ if self.tagname:
+ result += '</%s>' % self.tagname
+ result += '\n'
+ return result
+
+ def renderAtt(self):
+ result = ''
+ for n, v in self.attributes.iteritems():
+ if n != 'txt' and n != 'open':
+ if n == 'cl': n = 'class'
+ result += ' %s="%s"' % (n, v)
+ return result
+
+ def selfClose(self):
+ return self.tagname in selfClose
+
+def TagFactory(name):
+ class f(Tag):
+ tagname = name
+ f.__name__ = name
+ return f
+
+thisModule = modules[__name__]
+
+for t in tags: setattr(thisModule, t, TagFactory(t))
+
+def ValidW3C():
+ out = a(img(src='http://www.w3.org/Icons/valid-xhtml10', alt='Valid XHTML 1.0 Strict'), href='http://validator.w3.org/check?uri=referer')
+ return out
+
+class PyH(Tag):
+ tagname = 'html'
+
+ def __init__(self, name='MyPyHPage'):
+ self += head()
+ self += body()
+ self.attributes = dict(xmlns='http://www.w3.org/1999/xhtml', lang='en')
+ self.head += title(name)
+
+ def __iadd__(self, obj):
+ if isinstance(obj, head) or isinstance(obj, body): self.addObj(obj)
+ elif isinstance(obj, meta) or isinstance(obj, link): self.head += obj
+ else:
+ self.body += obj
+ id=self.setID(obj)
+ setattr(self, id, obj)
+ return self
+
+ def addJS(self, *arg):
+ for f in arg: self.head += script(type='text/javascript', src=f)
+
+ def addCSS(self, *arg):
+ for f in arg: self.head += link(rel='stylesheet', type='text/css', href=f)
+
+ def printOut(self,file=''):
+ if file: f = open(file, 'w')
+ else: f = stdout
+ f.write(doctype)
+ f.write(self.render())
+ f.flush()
+ if file: f.close()
+
Modified: incubator/kafka/branches/0.8/system_test/utils/setup_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/setup_utils.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/setup_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/setup_utils.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
#!/usr/bin/env python
import logging
Modified: incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
#!/usr/bin/env python
# ===================================
@@ -314,7 +330,6 @@ def setup_remote_hosts(systemTestEnv):
return True
-
def copy_source_to_remote_hosts(hostname, sourceDir, destDir):
cmdStr = "rsync -avz --delete-before " + sourceDir + "/ " + hostname + ":" + destDir
Modified: incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/testcase_env.py?rev=1378666&r1=1378665&r2=1378666&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/testcase_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/testcase_env.py Wed Aug 29 18:07:58 2012
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
#!/usr/bin/env python
# ===================================
@@ -42,10 +58,10 @@ class TestcaseEnv():
self.systemTestBaseDir = systemTestEnv.SYSTEM_TEST_BASE_DIR
# to be initialized in the Test Module
- self.testSuiteBaseDir = ""
- self.testCaseBaseDir = ""
- self.testCaseLogsDir = ""
-
+ self.testSuiteBaseDir = ""
+ self.testCaseBaseDir = ""
+ self.testCaseLogsDir = ""
+ self.testCaseDashboardsDir = ""
# ================================
# dictionary to keep track of
# user-defined environment variables