You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/07/20 04:39:56 UTC

kafka git commit: KAFKA-5608: Add --wait option for JmxTool and use in system tests to avoid race between JmxTool and monitored services

Repository: kafka
Updated Branches:
  refs/heads/trunk f87d58b79 -> f50af9c31


KAFKA-5608: Add --wait option for JmxTool and use in system tests to avoid race between JmxTool and monitored services

Author: Ewen Cheslack-Postava <me...@ewencp.org>
Author: Ewen Cheslack-Postava <ew...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3547 from ewencp/wait-jmx-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f50af9c3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f50af9c3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f50af9c3

Branch: refs/heads/trunk
Commit: f50af9c31df6fc6febba51d39293051efc82cec9
Parents: f87d58b
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Jul 19 21:39:51 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Jul 19 21:39:51 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/JmxTool.scala | 42 +++++++++++++++++-----
 tests/kafkatest/services/console_consumer.py  | 40 +++++++++++----------
 tests/kafkatest/services/monitor/jmx.py       | 11 ++++--
 3 files changed, 64 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f50af9c3/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 6e8a094..c83f792 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -72,7 +72,9 @@ object JmxTool extends Logging {
         .describedAs("service-url")
         .ofType(classOf[String])
         .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
-        
+    val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " +
+      "Only supported when the list of objects is non-empty and contains no object name patterns.")
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.")
 
@@ -89,13 +91,14 @@ object JmxTool extends Logging {
     val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None
     val dateFormatExists = options.has(dateFormatOpt)
     val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
+    val wait = options.has(waitOpt)
 
     var jmxc: JMXConnector = null
     var mbsc: MBeanServerConnection = null
-    var retries = 0
-    val maxNumRetries = 10
     var connected = false
-    while (retries < maxNumRetries && !connected) {
+    val connectTimeoutMs = 10000
+    val connectTestStarted = System.currentTimeMillis
+    do {
       try {
         System.err.println(s"Trying to connect to JMX url: $url.")
         jmxc = JMXConnectorFactory.connect(url, null)
@@ -105,13 +108,12 @@ object JmxTool extends Logging {
         case e : Exception =>
           System.err.println(s"Could not connect to JMX url: $url. Exception ${e.getMessage}.")
           e.printStackTrace()
-          retries += 1
-          Thread.sleep(500)
+          Thread.sleep(100)
       }
-    }
+    } while (System.currentTimeMillis - connectTestStarted < connectTimeoutMs && !connected)
 
     if (!connected) {
-      System.err.println(s"Could not connect to JMX url $url after $maxNumRetries retries.")
+      System.err.println(s"Could not connect to JMX url $url after $connectTimeoutMs ms.")
       System.err.println("Exiting.")
       sys.exit(1)
     }
@@ -122,7 +124,29 @@ object JmxTool extends Logging {
       else
         List(null)
 
-    val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
+    val hasPatternQueries = queries.exists((name: ObjectName) => name.isPattern)
+
+    var names: Iterable[ObjectName] = null
+    def namesSet = if (names == null) null else names.toSet
+    def foundAllObjects() = !queries.toSet.equals(namesSet)
+    val waitTimeoutMs = 10000
+    if (!hasPatternQueries) {
+      val start = System.currentTimeMillis
+      do {
+        if (names != null) {
+          System.err.println("Could not find all object names, retrying")
+          Thread.sleep(100)
+        }
+        names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
+      } while (wait && System.currentTimeMillis - start < waitTimeoutMs && foundAllObjects)
+    }
+
+    if (wait && foundAllObjects) {
+      val missing = (queries.toSet - namesSet).mkString(", ")
+      System.err.println(s"Could not find all requested object names after $waitTimeoutMs ms. Missing $missing")
+      System.err.println("Exiting.")
+      sys.exit(1)
+    }
 
     val numExpectedAttributes: Map[ObjectName, Int] =
       if (attributesWhitelistExists)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f50af9c3/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 5a945e1..91066d3 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -256,9 +256,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
 
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
 
-        self.init_jmx_attributes()
-        self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
-        self.start_jmx_tool(idx, node)
+        with self.lock:
+            self._init_jmx_attributes()
+            self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
+            self.start_jmx_tool(idx, node)
 
         for line in consumer_output:
             msg = line.strip()
@@ -273,7 +274,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
                 if msg is not None:
                     self.messages_consumed[idx].append(msg)
 
-        self.read_jmx_output(idx, node)
+        with self.lock:
+            self.read_jmx_output(idx, node)
 
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)
@@ -295,20 +297,22 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.security_config.clean_node(node)
 
     def has_partitions_assigned(self, node):
-       if self.new_consumer is False:
-          return False
-       idx = self.idx(node)
-       self.init_jmx_attributes()
-       self.start_jmx_tool(idx, node)
-       self.read_jmx_output(idx, node)
-       if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
-           return False
-       self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
-       return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
-
-    def init_jmx_attributes(self):
-        if self.new_consumer is True:
-            if self.jmx_object_names is None:
+        if self.new_consumer is False:
+            return False
+        idx = self.idx(node)
+        with self.lock:
+            self._init_jmx_attributes()
+            self.start_jmx_tool(idx, node)
+            self.read_jmx_output(idx, node)
+        if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
+            return False
+        self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
+        return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
+
+    def _init_jmx_attributes(self):
+        # Must hold lock
+        if self.new_consumer:
+            if not self.jmx_object_names:
                 self.jmx_object_names = []
                 self.jmx_object_names += ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
                 self.jmx_attributes += ["assigned-partitions"]

http://git-wip-us.apache.org/repos/asf/kafka/blob/f50af9c3/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index 0859bb4..7331cb9 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -15,7 +15,7 @@
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.utils.util import wait_until
-
+from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
 
 class JmxMixin(object):
     """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
@@ -23,6 +23,7 @@ class JmxMixin(object):
     A couple things worth noting:
     - this is not a service in its own right.
     - we assume the service using JmxMixin also uses KafkaPathResolverMixin
+    - this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted
     """
     def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None):
         self.jmx_object_names = jmx_object_names
@@ -59,8 +60,14 @@ class JmxMixin(object):
         wait_until(check_jmx_port_listening, timeout_sec=30, backoff_sec=.1,
                    err_msg="%s: Never saw JMX port for %s start listening" % (node.account, self))
 
-        cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", node)
+        # To correctly wait for requested JMX metrics to be added we need the --wait option for JmxTool. This option was
+        # not added until 0.11.0.1, so any earlier versions need to use JmxTool from a newer version.
+        use_jmxtool_version = get_version(node)
+        if use_jmxtool_version <= V_0_11_0_0:
+            use_jmxtool_version = DEV_BRANCH
+        cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", use_jmxtool_version)
         cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
+        cmd += " --wait"
         for jmx_object_name in self.jmx_object_names:
             cmd += " --object-name %s" % jmx_object_name
         for jmx_attribute in self.jmx_attributes: