You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/22 23:46:59 UTC
[kafka] branch trunk updated: KAFKA-6611: PART I,
Use JMXTool in SimpleBenchmark (#4650)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f2fbfaa KAFKA-6611: PART I, Use JMXTool in SimpleBenchmark (#4650)
f2fbfaa is described below
commit f2fbfaaccc99f889ae0796005a46d6c62c8e5be9
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Mar 22 16:46:56 2018 -0700
KAFKA-6611: PART I, Use JMXTool in SimpleBenchmark (#4650)
1. Use JmxMixin for SimpleBenchmark (will remove the self reporting in #4744), only when loading phase is false (i.e. we are in fact starting the streams app).
2. Reported the full jmx reported metrics in log files, and in the returned data only return the max values: this is because we want to skip the warming up and cooling down periods that will have lower rate numbers, while max represents the actual rate at full speed.
3. Incorporates two other improves to JMXTool: #1241 and #2950
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Rohan Desai <de...@gmail.com>
---
core/src/main/scala/kafka/tools/JmxTool.scala | 68 ++++++++++++++++++----
.../apache/kafka/streams/perf/SimpleBenchmark.java | 1 +
.../streams/streams_simple_benchmark_test.py | 15 ++++-
tests/kafkatest/services/console_consumer.py | 2 +-
tests/kafkatest/services/kafka/kafka.py | 10 ++--
tests/kafkatest/services/monitor/jmx.py | 6 +-
.../services/performance/streams_performance.py | 51 +++++++++++++++-
tests/kafkatest/services/streams.py | 17 ++++--
8 files changed, 142 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 4a6a348..27e4631 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -55,12 +55,17 @@ object JmxTool extends Logging {
.withRequiredArg
.describedAs("name")
.ofType(classOf[String])
- val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats.")
+ val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+ "Value of -1 equivalent to setting one-time to true")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(2000)
- val helpOpt = parser.accepts("help", "Print usage information.")
+ val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+ .withRequiredArg
+ .describedAs("one-time")
+ .ofType(classOf[java.lang.Boolean])
+ .defaultsTo(false)
val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
"See java.text.SimpleDateFormat for options.")
.withRequiredArg
@@ -72,8 +77,15 @@ object JmxTool extends Logging {
.describedAs("service-url")
.ofType(classOf[String])
.defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
+ val reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ")
+ .withRequiredArg
+ .describedAs("report-format")
+ .ofType(classOf[java.lang.String])
+ .defaultsTo("original")
val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " +
"Only supported when the list of objects is non-empty and contains no object name patterns.")
+ val helpOpt = parser.accepts("help", "Print usage information.")
+
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.")
@@ -87,12 +99,16 @@ object JmxTool extends Logging {
val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
val interval = options.valueOf(reportingIntervalOpt).intValue
+ var oneTime = interval < 0 || options.has(oneTimeOpt)
val attributesWhitelistExists = options.has(attributesOpt)
- val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None
+ val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None
val dateFormatExists = options.has(dateFormatOpt)
val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
val wait = options.has(waitOpt)
+ val reportFormat = parseFormat(options.valueOf(reportFormatOpt).toLowerCase)
+ val reportFormatOriginal = reportFormat.equals("original")
+
var jmxc: JMXConnector = null
var mbsc: MBeanServerConnection = null
var connected = false
@@ -150,33 +166,57 @@ object JmxTool extends Logging {
val numExpectedAttributes: Map[ObjectName, Int] =
if (attributesWhitelistExists)
- queries.map((_, attributesWhitelist.get.size)).toMap
+ queries.map((_, attributesWhitelist.get.length)).toMap
else {
names.map{(name: ObjectName) =>
val mbean = mbsc.getMBeanInfo(name)
(name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap
}
+ if(numExpectedAttributes.isEmpty) {
+ CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for the queried objects $queries.")
+ }
+
// print csv header
val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
- if(keys.size == numExpectedAttributes.values.sum + 1)
+ if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 1) {
println(keys.map("\"" + _ + "\"").mkString(","))
+ }
- while(true) {
+ var keepGoing = true
+ while (keepGoing) {
val start = System.currentTimeMillis
val attributes = queryAttributes(mbsc, names, attributesWhitelist)
attributes("time") = dateFormat match {
case Some(dFormat) => dFormat.format(new Date)
case None => System.currentTimeMillis().toString
}
- if(attributes.keySet.size == numExpectedAttributes.values.sum + 1)
- println(keys.map(attributes(_)).mkString(","))
- val sleep = max(0, interval - (System.currentTimeMillis - start))
- Thread.sleep(sleep)
+ if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) {
+ if(reportFormatOriginal) {
+ println(keys.map(attributes(_)).mkString(","))
+ }
+ else if(reportFormat.equals("properties")) {
+ keys.foreach( k => { println(k + "=" + attributes(k) ) } )
+ }
+ else if(reportFormat.equals("csv")) {
+ keys.foreach( k => { println(k + ",\"" + attributes(k) + "\"" ) } )
+ }
+ else { // tsv
+ keys.foreach( k => { println(k + "\t" + attributes(k) ) } )
+ }
+ }
+
+ if (oneTime) {
+ keepGoing = false
+ }
+ else {
+ val sleep = max(0, interval - (System.currentTimeMillis - start))
+ Thread.sleep(sleep)
+ }
}
}
- def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
+ def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]): mutable.Map[String, Any] = {
val attributes = new mutable.HashMap[String, Any]()
for (name <- names) {
val mbean = mbsc.getMBeanInfo(name)
@@ -193,4 +233,10 @@ object JmxTool extends Logging {
attributes
}
+ def parseFormat(reportFormatOpt : String): String = reportFormatOpt match {
+ case "properties" => "properties"
+ case "csv" => "csv"
+ case "tsv" => "tsv"
+ case _ => "original"
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 5d7041e..c66d78b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -231,6 +231,7 @@ public class SimpleBenchmark {
public void setStreamProperties(final String applicationId) {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 4cc3976..06aec14 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -35,7 +35,7 @@ class StreamsSimpleBenchmarkTest(Test):
self.num_threads = 1
@cluster(num_nodes=9)
- @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin", "yahoo"], scale=[1, 3])
+ @matrix(test=["count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
def test_simple_benchmark(self, test, scale):
"""
Run simple Kafka Streams benchmark
@@ -75,6 +75,8 @@ class StreamsSimpleBenchmarkTest(Test):
self.load_driver.wait()
self.load_driver.stop()
+
+
################
# RUN PHASE
################
@@ -93,11 +95,18 @@ class StreamsSimpleBenchmarkTest(Test):
node[num] = self.driver[num].node
node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
data[num] = self.driver[num].collect_data(node[num], "" )
-
+ self.driver[num].read_jmx_output_all_nodes()
+
final = {}
for num in range(0, scale):
for key in data[num]:
final[key + str(num)] = data[num][key]
-
+
+ for key in sorted(self.driver[num].jmx_stats[0]):
+ self.logger.info("%s: %s" % (key, self.driver[num].jmx_stats[0][key]))
+
+ final["jmx-avg" + str(num)] = self.driver[num].average_jmx_value
+ final["jmx-max" + str(num)] = self.driver[num].maximum_jmx_value
+
return final
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 950ded3..64a99f9 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -120,7 +120,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
print_timestamp if True, print each message's timestamp as well
isolation_level How to handle transactional messages.
"""
- JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [],
+ JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT)
BackgroundThreadService.__init__(self, context, num_nodes)
self.kafka = kafka
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index c4d4b24..ba5abc7 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -34,7 +34,6 @@ from kafkatest.version import DEV_BRANCH
Port = collections.namedtuple('Port', ['name', 'number', 'open'])
-
class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
PERSISTENT_ROOT = "/mnt/kafka"
STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log")
@@ -72,14 +71,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
- jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=[], zk_chroot=None):
+ jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None):
"""
:type context
:type zk: ZookeeperService
:type topics: dict
"""
Service.__init__(self, context, num_nodes)
- JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [],
+ JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=KafkaService.PERSISTENT_ROOT)
self.zk = zk
@@ -92,7 +91,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.minikdc = None
self.authorizer_class_name = authorizer_class_name
self.zk_set_acl = False
- self.server_prop_overides = server_prop_overides
+ if server_prop_overides is None:
+ self.server_prop_overides = []
+ else:
+ self.server_prop_overides = server_prop_overides
self.log_level = "DEBUG"
self.zk_chroot = zk_chroot
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index 6f6e221..542d3a5 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -70,14 +70,14 @@ class JmxMixin(object):
use_jmxtool_version = get_version(node)
if use_jmxtool_version <= V_0_11_0_0:
use_jmxtool_version = DEV_BRANCH
- cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version),
- self.jmx_class_name())
+ cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), self.jmx_class_name())
cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
cmd += " --wait"
for jmx_object_name in self.jmx_object_names:
cmd += " --object-name %s" % jmx_object_name
+ cmd += " --attributes "
for jmx_attribute in self.jmx_attributes:
- cmd += " --attributes %s" % jmx_attribute
+ cmd += "%s," % jmx_attribute
cmd += " 1>> %s" % self.jmx_tool_log
cmd += " 2>> %s &" % self.jmx_tool_err_log
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 94f7249..9f79181 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.streams import StreamsTestBaseService
-
#
# Class used to start the simple Kafka Streams benchmark
#
@@ -31,6 +31,55 @@ class StreamsSimpleBenchmarkService(StreamsTestBaseService):
test_name,
num_threads)
+ self.load_phase = load_phase
+
+ if self.load_phase == "false":
+ JmxMixin.__init__(self,
+ num_nodes=1,
+ jmx_object_names=['kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-%d' %(i+1) for i in range(num_threads)],
+ jmx_attributes=['process-latency-avg',
+ 'process-rate',
+ 'commit-latency-avg',
+ 'commit-rate',
+ 'poll-latency-avg',
+ 'poll-rate'],
+ root=StreamsTestBaseService.PERSISTENT_ROOT)
+
+ def start_cmd(self, node):
+ cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
+
+ if self.load_phase == "false":
+ args = self.args.copy()
+ args['jmx_port'] = self.jmx_port
+ args['kafka'] = self.kafka.bootstrap_servers()
+ args['config_file'] = self.CONFIG_FILE
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+ cmd = "( export JMX_PORT=%(jmx_port)s; export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
+ " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
+ " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+ self.logger.info("Executing streams simple benchmark cmd: " + cmd)
+
+ return cmd
+
+ def start_node(self, node):
+ super(StreamsSimpleBenchmarkService, self).start_node(node)
+
+ if self.load_phase == "false":
+ self.start_jmx_tool(1, node)
+
+
+ def clean_node(self, node):
+ if self.load_phase == "false":
+ JmxMixin.clean_node(self, node)
+ super(StreamsSimpleBenchmarkService, self).clean_node(node)
+
def collect_data(self, node, tag = None):
# Collect the data and return it to the framework
output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 6da5a25..d9b475e 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -19,13 +19,12 @@ import signal
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.kafka import KafkaConfig
-
STATE_DIR = "state.dir"
-class StreamsTestBaseService(KafkaPathResolverMixin, Service):
-
+class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
"""Base class for Streams Test services providing some common settings and functionality"""
PERSISTENT_ROOT = "/mnt/streams"
@@ -35,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
+ JMX_LOG_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.log")
+ JMX_ERR_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.err.log")
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
@@ -48,10 +49,16 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
+ "jmx_log": {
+ "path": JMX_LOG_FILE,
+ "collect_default": True},
+ "jmx_err": {
+ "path": JMX_ERR_FILE,
+ "collect_default": True},
}
def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None, user_test_args3=None):
- super(StreamsTestBaseService, self).__init__(test_context, 1)
+ Service.__init__(self, test_context, num_nodes=1)
self.kafka = kafka
self.args = {'streams_class_name': streams_class_name,
'user_test_args': user_test_args,
@@ -130,7 +137,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
" %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
" %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
- self.logger.info("Executing Streams cmd: " + cmd)
+ self.logger.info("Executing streams cmd: " + cmd)
return cmd
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.