You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/22 23:46:59 UTC

[kafka] branch trunk updated: KAFKA-6611: PART I, Use JMXTool in SimpleBenchmark (#4650)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f2fbfaa  KAFKA-6611: PART I, Use JMXTool in SimpleBenchmark (#4650)
f2fbfaa is described below

commit f2fbfaaccc99f889ae0796005a46d6c62c8e5be9
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Mar 22 16:46:56 2018 -0700

    KAFKA-6611: PART I, Use JMXTool in SimpleBenchmark (#4650)
    
    1. Use JmxMixin for SimpleBenchmark (will remove the self reporting in #4744), only when loading phase is false (i.e. we are in fact starting the streams app).
    
    2. Reported the full jmx reported metrics in log files, and in the returned data only return the max values: this is because we want to skip the warming up and cooling down periods that will have lower rate numbers, while max represents the actual rate at full speed.
    
    3. Incorporates two other improves to JMXTool: #1241 and #2950
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Rohan Desai <de...@gmail.com>
---
 core/src/main/scala/kafka/tools/JmxTool.scala      | 68 ++++++++++++++++++----
 .../apache/kafka/streams/perf/SimpleBenchmark.java |  1 +
 .../streams/streams_simple_benchmark_test.py       | 15 ++++-
 tests/kafkatest/services/console_consumer.py       |  2 +-
 tests/kafkatest/services/kafka/kafka.py            | 10 ++--
 tests/kafkatest/services/monitor/jmx.py            |  6 +-
 .../services/performance/streams_performance.py    | 51 +++++++++++++++-
 tests/kafkatest/services/streams.py                | 17 ++++--
 8 files changed, 142 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 4a6a348..27e4631 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -55,12 +55,17 @@ object JmxTool extends Logging {
         .withRequiredArg
         .describedAs("name")
         .ofType(classOf[String])
-    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats.")
+    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+      "Value of -1 equivalent to setting one-time to true")
       .withRequiredArg
       .describedAs("ms")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(2000)
-    val helpOpt = parser.accepts("help", "Print usage information.")
+    val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+      .withRequiredArg
+      .describedAs("one-time")
+      .ofType(classOf[java.lang.Boolean])
+      .defaultsTo(false)
     val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
       "See java.text.SimpleDateFormat for options.")
       .withRequiredArg
@@ -72,8 +77,15 @@ object JmxTool extends Logging {
         .describedAs("service-url")
         .ofType(classOf[String])
         .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
+    val reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ")
+      .withRequiredArg
+      .describedAs("report-format")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo("original")
     val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " +
       "Only supported when the list of objects is non-empty and contains no object name patterns.")
+    val helpOpt = parser.accepts("help", "Print usage information.")
+
 
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.")
@@ -87,12 +99,16 @@ object JmxTool extends Logging {
 
     val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
     val interval = options.valueOf(reportingIntervalOpt).intValue
+    var oneTime = interval < 0 || options.has(oneTimeOpt)
     val attributesWhitelistExists = options.has(attributesOpt)
-    val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None
+    val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None
     val dateFormatExists = options.has(dateFormatOpt)
     val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
     val wait = options.has(waitOpt)
 
+    val reportFormat = parseFormat(options.valueOf(reportFormatOpt).toLowerCase)
+    val reportFormatOriginal = reportFormat.equals("original")
+
     var jmxc: JMXConnector = null
     var mbsc: MBeanServerConnection = null
     var connected = false
@@ -150,33 +166,57 @@ object JmxTool extends Logging {
 
     val numExpectedAttributes: Map[ObjectName, Int] =
       if (attributesWhitelistExists)
-        queries.map((_, attributesWhitelist.get.size)).toMap
+        queries.map((_, attributesWhitelist.get.length)).toMap
       else {
         names.map{(name: ObjectName) =>
           val mbean = mbsc.getMBeanInfo(name)
           (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap
       }
 
+    if(numExpectedAttributes.isEmpty) {
+      CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for the queried objects $queries.")
+    }
+
     // print csv header
     val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
-    if(keys.size == numExpectedAttributes.values.sum + 1)
+    if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 1) {
       println(keys.map("\"" + _ + "\"").mkString(","))
+    }
 
-    while(true) {
+    var keepGoing = true
+    while (keepGoing) {
       val start = System.currentTimeMillis
       val attributes = queryAttributes(mbsc, names, attributesWhitelist)
       attributes("time") = dateFormat match {
         case Some(dFormat) => dFormat.format(new Date)
         case None => System.currentTimeMillis().toString
       }
-      if(attributes.keySet.size == numExpectedAttributes.values.sum + 1)
-        println(keys.map(attributes(_)).mkString(","))
-      val sleep = max(0, interval - (System.currentTimeMillis - start))
-      Thread.sleep(sleep)
+      if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) {
+        if(reportFormatOriginal) {
+          println(keys.map(attributes(_)).mkString(","))
+        }
+        else if(reportFormat.equals("properties")) {
+          keys.foreach( k => { println(k + "=" + attributes(k) ) } )
+        }
+        else if(reportFormat.equals("csv")) {
+          keys.foreach( k => { println(k + ",\"" + attributes(k) + "\"" ) } )
+        }
+        else { // tsv
+          keys.foreach( k => { println(k + "\t" + attributes(k) ) } )
+        }
+      }
+
+      if (oneTime) {
+        keepGoing = false
+      }
+      else {
+        val sleep = max(0, interval - (System.currentTimeMillis - start))
+        Thread.sleep(sleep)
+      }
     }
   }
 
-  def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
+  def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]): mutable.Map[String, Any] = {
     val attributes = new mutable.HashMap[String, Any]()
     for (name <- names) {
       val mbean = mbsc.getMBeanInfo(name)
@@ -193,4 +233,10 @@ object JmxTool extends Logging {
     attributes
   }
 
+  def parseFormat(reportFormatOpt : String): String = reportFormatOpt match {
+    case "properties" => "properties"
+    case "csv" => "csv"
+    case "tsv" => "tsv"
+    case _ => "original"
+  }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 5d7041e..c66d78b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -231,6 +231,7 @@ public class SimpleBenchmark {
 
     public void setStreamProperties(final String applicationId) {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 4cc3976..06aec14 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -35,7 +35,7 @@ class StreamsSimpleBenchmarkTest(Test):
         self.num_threads = 1
 
     @cluster(num_nodes=9)
-    @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin", "yahoo"], scale=[1, 3])
+    @matrix(test=["count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
     def test_simple_benchmark(self, test, scale):
         """
         Run simple Kafka Streams benchmark
@@ -75,6 +75,8 @@ class StreamsSimpleBenchmarkTest(Test):
         self.load_driver.wait()
         self.load_driver.stop()
 
+
+
         ################
         # RUN PHASE
         ################
@@ -93,11 +95,18 @@ class StreamsSimpleBenchmarkTest(Test):
             node[num] = self.driver[num].node
             node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
             data[num] = self.driver[num].collect_data(node[num], "" )
-                
+            self.driver[num].read_jmx_output_all_nodes()
+
 
         final = {}
         for num in range(0, scale):
             for key in data[num]:
                 final[key + str(num)] = data[num][key]
-        
+
+            for key in sorted(self.driver[num].jmx_stats[0]):
+                self.logger.info("%s: %s" % (key, self.driver[num].jmx_stats[0][key]))
+
+            final["jmx-avg" + str(num)] = self.driver[num].average_jmx_value
+            final["jmx-max" + str(num)] = self.driver[num].maximum_jmx_value
+
         return final
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 950ded3..64a99f9 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -120,7 +120,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             print_timestamp             if True, print each message's timestamp as well
             isolation_level             How to handle transactional messages.
         """
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [],
+        JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=ConsoleConsumer.PERSISTENT_ROOT)
         BackgroundThreadService.__init__(self, context, num_nodes)
         self.kafka = kafka
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index c4d4b24..ba5abc7 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -34,7 +34,6 @@ from kafkatest.version import DEV_BRANCH
 
 Port = collections.namedtuple('Port', ['name', 'number', 'open'])
 
-
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     PERSISTENT_ROOT = "/mnt/kafka"
     STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log")
@@ -72,14 +71,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
-                 jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=[], zk_chroot=None):
+                 jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None):
         """
         :type context
         :type zk: ZookeeperService
         :type topics: dict
         """
         Service.__init__(self, context, num_nodes)
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [],
+        JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=KafkaService.PERSISTENT_ROOT)
 
         self.zk = zk
@@ -92,7 +91,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.minikdc = None
         self.authorizer_class_name = authorizer_class_name
         self.zk_set_acl = False
-        self.server_prop_overides = server_prop_overides
+        if server_prop_overides is None:
+            self.server_prop_overides = []
+        else:
+            self.server_prop_overides = server_prop_overides
         self.log_level = "DEBUG"
         self.zk_chroot = zk_chroot
 
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index 6f6e221..542d3a5 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -70,14 +70,14 @@ class JmxMixin(object):
         use_jmxtool_version = get_version(node)
         if use_jmxtool_version <= V_0_11_0_0:
             use_jmxtool_version = DEV_BRANCH
-        cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version),
-                          self.jmx_class_name())
+        cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), self.jmx_class_name())
         cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
         cmd += " --wait"
         for jmx_object_name in self.jmx_object_names:
             cmd += " --object-name %s" % jmx_object_name
+        cmd += " --attributes "
         for jmx_attribute in self.jmx_attributes:
-            cmd += " --attributes %s" % jmx_attribute
+            cmd += "%s," % jmx_attribute
         cmd += " 1>> %s" % self.jmx_tool_log
         cmd += " 2>> %s &" % self.jmx_tool_err_log
 
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 94f7249..9f79181 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -13,9 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.streams import StreamsTestBaseService
 
-
 #
 # Class used to start the simple Kafka Streams benchmark
 #
@@ -31,6 +31,55 @@ class StreamsSimpleBenchmarkService(StreamsTestBaseService):
                                                             test_name,
                                                             num_threads)
 
+        self.load_phase = load_phase
+
+        if self.load_phase == "false":
+            JmxMixin.__init__(self,
+                              num_nodes=1,
+                              jmx_object_names=['kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-%d' %(i+1) for i in range(num_threads)],
+                              jmx_attributes=['process-latency-avg',
+                                              'process-rate',
+                                              'commit-latency-avg',
+                                              'commit-rate',
+                                              'poll-latency-avg',
+                                              'poll-rate'],
+                              root=StreamsTestBaseService.PERSISTENT_ROOT)
+
+    def start_cmd(self, node):
+        cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
+
+        if self.load_phase == "false":
+            args = self.args.copy()
+            args['jmx_port'] = self.jmx_port
+            args['kafka'] = self.kafka.bootstrap_servers()
+            args['config_file'] = self.CONFIG_FILE
+            args['stdout'] = self.STDOUT_FILE
+            args['stderr'] = self.STDERR_FILE
+            args['pidfile'] = self.PID_FILE
+            args['log4j'] = self.LOG4J_CONFIG_FILE
+            args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+            cmd = "( export JMX_PORT=%(jmx_port)s; export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+                  "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
+                  " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
+                  " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        self.logger.info("Executing streams simple benchmark cmd: " + cmd)
+
+        return cmd
+
+    def start_node(self, node):
+        super(StreamsSimpleBenchmarkService, self).start_node(node)
+
+        if self.load_phase == "false":
+            self.start_jmx_tool(1, node)
+
+
+    def clean_node(self, node):
+        if self.load_phase == "false":
+            JmxMixin.clean_node(self, node)
+        super(StreamsSimpleBenchmarkService, self).clean_node(node)
+
     def collect_data(self, node, tag = None):
         # Collect the data and return it to the framework
         output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 6da5a25..d9b475e 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -19,13 +19,12 @@ import signal
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.kafka import KafkaConfig
 
-
 STATE_DIR = "state.dir"
 
-class StreamsTestBaseService(KafkaPathResolverMixin, Service):
-
+class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     """Base class for Streams Test services providing some common settings and functionality"""
 
     PERSISTENT_ROOT = "/mnt/streams"
@@ -35,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
     LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
     STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
     STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
+    JMX_LOG_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.log")
+    JMX_ERR_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.err.log")
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
 
@@ -48,10 +49,16 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
         "streams_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "jmx_log": {
+            "path": JMX_LOG_FILE,
+            "collect_default": True},
+        "jmx_err": {
+            "path": JMX_ERR_FILE,
+            "collect_default": True},
     }
 
     def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None, user_test_args3=None):
-        super(StreamsTestBaseService, self).__init__(test_context, 1)
+        Service.__init__(self, test_context, num_nodes=1)
         self.kafka = kafka
         self.args = {'streams_class_name': streams_class_name,
                      'user_test_args': user_test_args,
@@ -130,7 +137,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
               " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
               " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 
-        self.logger.info("Executing Streams cmd: " + cmd)
+        self.logger.info("Executing streams cmd: " + cmd)
 
         return cmd
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.