You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/07/20 04:39:56 UTC
kafka git commit: KAFKA-5608: Add --wait option for JmxTool and use
in system tests to avoid race between JmxTool and monitored services
Repository: kafka
Updated Branches:
refs/heads/trunk f87d58b79 -> f50af9c31
KAFKA-5608: Add --wait option for JmxTool and use in system tests to avoid race between JmxTool and monitored services
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Author: Ewen Cheslack-Postava <ew...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #3547 from ewencp/wait-jmx-metrics
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f50af9c3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f50af9c3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f50af9c3
Branch: refs/heads/trunk
Commit: f50af9c31df6fc6febba51d39293051efc82cec9
Parents: f87d58b
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Jul 19 21:39:51 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Jul 19 21:39:51 2017 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/tools/JmxTool.scala | 42 +++++++++++++++++-----
tests/kafkatest/services/console_consumer.py | 40 +++++++++++----------
tests/kafkatest/services/monitor/jmx.py | 11 ++++--
3 files changed, 64 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f50af9c3/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 6e8a094..c83f792 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -72,7 +72,9 @@ object JmxTool extends Logging {
.describedAs("service-url")
.ofType(classOf[String])
.defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
-
+ val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " +
+ "Only supported when the list of objects is non-empty and contains no object name patterns.")
+
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.")
@@ -89,13 +91,14 @@ object JmxTool extends Logging {
val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None
val dateFormatExists = options.has(dateFormatOpt)
val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
+ val wait = options.has(waitOpt)
var jmxc: JMXConnector = null
var mbsc: MBeanServerConnection = null
- var retries = 0
- val maxNumRetries = 10
var connected = false
- while (retries < maxNumRetries && !connected) {
+ val connectTimeoutMs = 10000
+ val connectTestStarted = System.currentTimeMillis
+ do {
try {
System.err.println(s"Trying to connect to JMX url: $url.")
jmxc = JMXConnectorFactory.connect(url, null)
@@ -105,13 +108,12 @@ object JmxTool extends Logging {
case e : Exception =>
System.err.println(s"Could not connect to JMX url: $url. Exception ${e.getMessage}.")
e.printStackTrace()
- retries += 1
- Thread.sleep(500)
+ Thread.sleep(100)
}
- }
+ } while (System.currentTimeMillis - connectTestStarted < connectTimeoutMs && !connected)
if (!connected) {
- System.err.println(s"Could not connect to JMX url $url after $maxNumRetries retries.")
+ System.err.println(s"Could not connect to JMX url $url after $connectTimeoutMs ms.")
System.err.println("Exiting.")
sys.exit(1)
}
@@ -122,7 +124,29 @@ object JmxTool extends Logging {
else
List(null)
- val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
+ val hasPatternQueries = queries.exists((name: ObjectName) => name.isPattern)
+
+ var names: Iterable[ObjectName] = null
+ def namesSet = if (names == null) null else names.toSet
+ def foundAllObjects() = !queries.toSet.equals(namesSet)
+ val waitTimeoutMs = 10000
+ if (!hasPatternQueries) {
+ val start = System.currentTimeMillis
+ do {
+ if (names != null) {
+ System.err.println("Could not find all object names, retrying")
+ Thread.sleep(100)
+ }
+ names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
+ } while (wait && System.currentTimeMillis - start < waitTimeoutMs && foundAllObjects)
+ }
+
+ if (wait && foundAllObjects) {
+ val missing = (queries.toSet - namesSet).mkString(", ")
+ System.err.println(s"Could not find all requested object names after $waitTimeoutMs ms. Missing $missing")
+ System.err.println("Exiting.")
+ sys.exit(1)
+ }
val numExpectedAttributes: Map[ObjectName, Int] =
if (attributesWhitelistExists)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f50af9c3/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 5a945e1..91066d3 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -256,9 +256,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
- self.init_jmx_attributes()
- self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
- self.start_jmx_tool(idx, node)
+ with self.lock:
+ self._init_jmx_attributes()
+ self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
+ self.start_jmx_tool(idx, node)
for line in consumer_output:
msg = line.strip()
@@ -273,7 +274,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
if msg is not None:
self.messages_consumed[idx].append(msg)
- self.read_jmx_output(idx, node)
+ with self.lock:
+ self.read_jmx_output(idx, node)
def start_node(self, node):
BackgroundThreadService.start_node(self, node)
@@ -295,20 +297,22 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
self.security_config.clean_node(node)
def has_partitions_assigned(self, node):
- if self.new_consumer is False:
- return False
- idx = self.idx(node)
- self.init_jmx_attributes()
- self.start_jmx_tool(idx, node)
- self.read_jmx_output(idx, node)
- if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
- return False
- self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
- return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
-
- def init_jmx_attributes(self):
- if self.new_consumer is True:
- if self.jmx_object_names is None:
+ if self.new_consumer is False:
+ return False
+ idx = self.idx(node)
+ with self.lock:
+ self._init_jmx_attributes()
+ self.start_jmx_tool(idx, node)
+ self.read_jmx_output(idx, node)
+ if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
+ return False
+ self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
+ return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
+
+ def _init_jmx_attributes(self):
+ # Must hold lock
+ if self.new_consumer:
+ if not self.jmx_object_names:
self.jmx_object_names = []
self.jmx_object_names += ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
self.jmx_attributes += ["assigned-partitions"]
http://git-wip-us.apache.org/repos/asf/kafka/blob/f50af9c3/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index 0859bb4..7331cb9 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -15,7 +15,7 @@
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until
-
+from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
class JmxMixin(object):
"""This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
@@ -23,6 +23,7 @@ class JmxMixin(object):
A couple things worth noting:
- this is not a service in its own right.
- we assume the service using JmxMixin also uses KafkaPathResolverMixin
+ - this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted
"""
def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None):
self.jmx_object_names = jmx_object_names
@@ -59,8 +60,14 @@ class JmxMixin(object):
wait_until(check_jmx_port_listening, timeout_sec=30, backoff_sec=.1,
err_msg="%s: Never saw JMX port for %s start listening" % (node.account, self))
- cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", node)
+ # To correctly wait for requested JMX metrics to be added we need the --wait option for JmxTool. This option was
+ # not added until 0.11.0.1, so any earlier versions need to use JmxTool from a newer version.
+ use_jmxtool_version = get_version(node)
+ if use_jmxtool_version <= V_0_11_0_0:
+ use_jmxtool_version = DEV_BRANCH
+ cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", use_jmxtool_version)
cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
+ cmd += " --wait"
for jmx_object_name in self.jmx_object_names:
cmd += " --object-name %s" % jmx_object_name
for jmx_attribute in self.jmx_attributes: