You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/08/08 22:17:32 UTC

svn commit: r1370941 - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/server/KafkaServer.scala core/src/main/scala/kafka/tools/JmxTool.scala system_test/single_host_multi_brokers/bin/run-test.sh

Author: nehanarkhede
Date: Wed Aug  8 20:17:31 2012
New Revision: 1370941

URL: http://svn.apache.org/viewvc?rev=1370941&view=rev
Log:
Minor checkin to fix replication system test and jmx tool

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1370941&r1=1370940&r2=1370941&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Wed Aug  8 20:17:31 2012
@@ -142,7 +142,7 @@ class KafkaServer(val config: KafkaConfi
       debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
       cleanShutDownFile.createNewFile
       shutdownLatch.countDown()
-      info("shutted down completed")
+      info("shut down completed")
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala?rev=1370941&r1=1370940&r2=1370941&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala Wed Aug  8 20:17:31 2012
@@ -48,11 +48,10 @@ object JmxTool {
     val helpOpt = parser.accepts("help", "Print usage information.")
     val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + 
                                                       "See java.text.SimpleDateFormat for options.")
-      .withRequiredArg
+      .withOptionalArg()
       .describedAs("format")
       .ofType(classOf[String])
-      .defaultsTo("yyyy-MM-dd HH:mm:ss.SSS")
-    val jmxServiceUrlOpt = 
+    val jmxServiceUrlOpt =
       parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
       .withRequiredArg
       .describedAs("service-url")
@@ -68,7 +67,8 @@ object JmxTool {
 
     val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
     val interval = options.valueOf(reportingIntervalOpt).intValue
-    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
+    val dateFormatExists = options.has(dateFormatOpt)
+    val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
     val jmxc = JMXConnectorFactory.connect(url, null)
     val mbsc = jmxc.getMBeanServerConnection()
 
@@ -88,7 +88,10 @@ object JmxTool {
     while(true) {
       val start = System.currentTimeMillis
       val attributes = queryAttributes(mbsc, names)
-      attributes("time") = dateFormat.format(new Date)
+      attributes("time") = dateFormat match {
+        case Some(dFormat) => dFormat.format(new Date)
+        case None => System.currentTimeMillis().toString
+      }
       println(keys.map(attributes(_)).mkString(", "))
       val sleep = max(0, interval - (System.currentTimeMillis - start))
       Thread.sleep(sleep)

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1370941&r1=1370940&r2=1370941&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Wed Aug  8 20:17:31 2012
@@ -91,6 +91,10 @@ pid_zk=
 kafka_pids=
 test_failure_counter=0
 leader_elected_timestamp=
+jmx_ports=
+jmx_ports[1]=9990
+jmx_ports[2]=9991
+jmx_ports[3]=9992
 
 initialize() {
     info "initializing ..."
@@ -144,15 +148,15 @@ cleanup() {
 }
 
 get_leader_brokerid() {
-    log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1`
+    log_line=`grep -i -h 'completed the leader state transition' ${base_dir}/kafka_server_*.log | sort | tail -1`
     info "found the log line: $log_line"
-    broker_id=`echo $log_line | sed s'/^.*INFO Broker //g' | awk -F ' ' '{print $1}'`
+    broker_id=`echo $log_line | sed s'/^.*INFO Replica Manager on Broker //g' | awk -F ',' '{print $1}'`
 
     return $broker_id
 }
 
 get_elected_leader_unix_timestamp() {
-    log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1`
+    log_line=`grep -i -h 'completed the leader state transition' ${base_dir}/kafka_server_*.log | sort | tail -1`
     info "found the log line: $log_line"
 
     this_timestamp=`echo $log_line | cut -f2 -d '[' | cut -f1 -d ']'`
@@ -209,8 +213,8 @@ stop_server() {
 start_server() {
     s_idx=$1
 
-    info "starting kafka server"
-    $base_dir/bin/kafka-run-class.sh kafka.Kafka ${kafka_prop_pathnames[$s_idx]} \
+    info "starting kafka server on jmx port ${jmx_ports[${s_idx}]}"
+    JMX_PORT=${jmx_ports[${s_idx}]} $base_dir/bin/kafka-run-class.sh kafka.Kafka ${kafka_prop_pathnames[$s_idx]} \
         2>&1 >> ${kafka_log4j_log_pathnames[$s_idx]} &
     kafka_pids[${s_idx}]=$!
     info "  -> kafka_pids[$s_idx]: ${kafka_pids[$s_idx]}"
@@ -275,7 +279,7 @@ shutdown_servers() {
 
      # running processes are not terminated properly in a Hudson job,
      # this is a temporary workaround to kill all processes
-     `ps axuw | grep "java\|run\-" | grep -v grep | grep -v slave | grep -v vi | grep -v "run\-test\.sh" | awk '{print $2}' | xargs kill -9`
+     `ps axuw | grep "kafka\|run\-" | grep -v grep | grep -v slave | grep -v vi | grep -v "run\-test\.sh" | awk '{print $2}' | xargs kill -9`
 
 }