You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/20 16:56:24 UTC

[GitHub] [kafka] fvaleri opened a new pull request, #13136: KAFKA-14582: Move JmxTool to tools

fvaleri opened a new pull request, #13136:
URL: https://github.com/apache/kafka/pull/13136

   This PR is based on https://github.com/apache/kafka/pull/13131.
   
   This class is also used by the system tests, so I need to check if the replacement works fine there too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1092919815


##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxToolOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                    "can be given multiple times to specify more than one query. If no objects are specified " +
+                    "all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
+                    "attributes are specified all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+                    "Value of -1 equivalent to setting one-time to true")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(2000);
+            oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+                .withOptionalArg()

Review Comment:
   Here I'm fixing the following issue with the old implementation:
   
   It is rather common to pass boolean options like `--one-time` without the argument (true is implicit).
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool \
     --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   "time","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"
   20230201-09:39:41,0.0,0.0
   ```
   
   As you can see, the parser sees `--report-format tsv` as the argument of `--one-time` and TSV formatting is ignored.
   
   Setting optional args fixes the issue, without causing any breaking change.
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool   --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   time    20230201-09:44:40
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate    0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate       0.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1093412947


##########
tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.rmi.registry.LocateRegistry;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JmxToolTest {
+    private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
+
+    private static JMXConnectorServer jmxAgent;
+    private static String jmxUrl;
+
+    @BeforeAll
+    public static void beforeAll() throws Exception {
+        int port = findRandomOpenPortOnAllLocalInterfaces();
+        jmxAgent = startJmxAgent(port);
+        jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port);
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        jmxAgent.stop();
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        Exit.setExitProcedure(exitProcedure);
+    }
+
+    @AfterEach
+    public void afterEach() {
+        Exit.resetExitProcedure();
+    }
+
+    @Test
+    public void kafkaVersion() {
+        String out = executeAndGetOut("--version");
+        assertNormalExit();
+        assertEquals(AppInfoParser.getVersion(), out);

Review Comment:
   This fails when running in Intellij. The output is:
   ```
   [2023-02-01 16:49:33,671] WARN Error while loading kafka-version.properties: inStream parameter is null (org.apache.kafka.common.utils.AppInfoParser:46)
   unknown
   ```
   I guess all we could simply do `assertTrue(out.contains(AppInfoParser.getVersion()));`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1092895124


##########
tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.rmi.registry.LocateRegistry;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("integration")

Review Comment:
   Agree. Initially I was starting a cluster, but it was an overkill, so I switched to the platform MBean server. I'll remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13136:
URL: https://github.com/apache/kafka/pull/13136#issuecomment-1401539053

   @mimaison @clolov @vamossagar12 this is ready for review if you have some time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1092890819


##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -1033,7 +1033,7 @@ object ConsumerGroupCommand extends Logging {
     val describeOpt = parser.accepts("describe", DescribeDoc)
     val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc)
     val deleteOpt = parser.accepts("delete", DeleteDoc)
-    val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
+    val timeoutMsOpt: OptionSpec[Long] = parser.accepts("timeout", TimeoutMsDoc)

Review Comment:
   Thanks. That's an oversight in conflict resolution. Reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1093445449


##########
tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.rmi.registry.LocateRegistry;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JmxToolTest {
+    private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
+
+    private static JMXConnectorServer jmxAgent;
+    private static String jmxUrl;
+
+    @BeforeAll
+    public static void beforeAll() throws Exception {
+        int port = findRandomOpenPortOnAllLocalInterfaces();
+        jmxAgent = startJmxAgent(port);
+        jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port);
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        jmxAgent.stop();
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        Exit.setExitProcedure(exitProcedure);
+    }
+
+    @AfterEach
+    public void afterEach() {
+        Exit.resetExitProcedure();
+    }
+
+    @Test
+    public void kafkaVersion() {
+        String out = executeAndGetOut("--version");
+        assertNormalExit();
+        assertEquals(AppInfoParser.getVersion(), out);

Review Comment:
   Looks like it's working on my end.
   ![Screenshot from 2023-02-01 17-15-54](https://user-images.githubusercontent.com/11456498/216100017-c5f290c6-f964-41dd-8d79-d5cf0a27b379.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1088957590


##########
checkstyle/import-control.xml:
##########
@@ -407,7 +407,8 @@
     <allow pkg="net.sourceforge.argparse4j" />
     <allow pkg="org.apache.log4j" />
     <allow pkg="kafka.test" />
-    <allow pkg="joptsimple" />
+    <allow pkg="joptsimple"/>
+    <allow pkg="javax.rmi.ssl"/>

Review Comment:
   Can we keep the trailing spaces like all other entries?



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -1033,7 +1033,7 @@ object ConsumerGroupCommand extends Logging {
     val describeOpt = parser.accepts("describe", DescribeDoc)
     val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc)
     val deleteOpt = parser.accepts("delete", DeleteDoc)
-    val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
+    val timeoutMsOpt: OptionSpec[Long] = parser.accepts("timeout", TimeoutMsDoc)

Review Comment:
   Why is this PR touching this file?



##########
tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.rmi.registry.LocateRegistry;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("integration")
+public class JmxCommandTest {
+    private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
+
+    private static JMXConnectorServer jmxAgent;
+    private static String jmxUrl;
+
+    @BeforeAll
+    public static void beforeAll() throws Exception {
+        int port = findRandomOpenPortOnAllLocalInterfaces();
+        jmxAgent = startJmxAgent(port);
+        jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port);
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        jmxAgent.stop();
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        Exit.setExitProcedure(exitProcedure);
+    }
+
+    @AfterEach
+    public void afterEach() {
+        Exit.resetExitProcedure();
+    }
+
+    @Test
+    public void kafkaVersion() {
+        String out = executeAndGetOut("--version");
+        assertNormalExit();
+        assertEquals(AppInfoParser.getVersion(), out);
+    }
+
+    @Test
+    public void unrecognizedOption() {
+        String err = executeAndGetErr("--foo");
+        assertCommandFailure();
+        assertTrue(err.contains("UnrecognizedOptionException"));
+        assertTrue(err.contains("foo"));
+    }
+
+    @Test
+    public void missingRequired() {
+        String err = executeAndGetErr("--reporting-interval");
+        assertCommandFailure();
+        assertTrue(err.contains("OptionMissingRequiredArgumentException"));
+        assertTrue(err.contains("reporting-interval"));
+    }
+
+    @Test
+    public void invalidJmxUrl() {
+        String err = executeAndGetErr("--jmx-url", String.format("localhost:9999"));

Review Comment:
   We don't need `String.format()` here



##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxToolOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                    "can be given multiple times to specify more than one query. If no objects are specified " +
+                    "all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
+                    "attributes are specified all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+                    "Value of -1 equivalent to setting one-time to true")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(2000);
+            oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+                .withOptionalArg()

Review Comment:
   Previously this argument was required. Why is it optional now?



##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));

Review Comment:
   We should be able to use `System.out.printf` instead of `System.out.println(String.format`



##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxToolOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                    "can be given multiple times to specify more than one query. If no objects are specified " +
+                    "all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
+                    "attributes are specified all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+                    "Value of -1 equivalent to setting one-time to true")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(2000);
+            oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+                .withOptionalArg()
+                .describedAs("one-time")
+                .ofType(Boolean.class)
+                .defaultsTo(false);
+            dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
+                    "See java.text.SimpleDateFormat for options.")
+                .withRequiredArg()
+                .describedAs("format")
+                .ofType(String.class);
+            jmxServiceUrlOpt = parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
+                .withRequiredArg()
+                .describedAs("service-url")
+                .ofType(String.class)
+                .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi");
+            reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ")
+                .withRequiredArg()
+                .describedAs("report-format")
+                .ofType(String.class)
+                .defaultsTo("original");
+            jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " +
+                    "when enabling remote JMX with password authentication.")
+                .withRequiredArg()
+                .describedAs("jmx-auth-prop")
+                .ofType(String.class);
+            jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.")
+                .withRequiredArg()
+                .describedAs("ssl-enable")
+                .ofType(Boolean.class)
+                .defaultsTo(false);
+            waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " +
+                "Only supported when the list of objects is non-empty and contains no object name patterns.");
+            options = parser.parse(args);
+        }
+
+        public JMXServiceURL jmxServiceURL() {
+            try {
+                return new JMXServiceURL(options.valueOf(jmxServiceUrlOpt));
+            } catch (MalformedURLException e) {
+                throw new RuntimeException(e);

Review Comment:
   Do we need to wrap `MalformedURLException`? The previous tool failed like this:
   ```
   Exception in thread "main" java.net.MalformedURLException: Service URL must start with service:jmx:
   	at java.management/javax.management.remote.JMXServiceURL.<init>(JMXServiceURL.java:169)
   	at kafka.tools.JmxTool$.main(JmxTool.scala:111)
   	at kafka.tools.JmxTool.main(JmxTool.scala)
   ```
   



##########
core/src/main/scala/kafka/Kafka.scala:
##########
@@ -18,7 +18,6 @@
 package kafka
 
 import java.util.Properties
-

Review Comment:
   Let's undo this change



##########
tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.rmi.registry.LocateRegistry;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("integration")

Review Comment:
   Typically integration tests rely on starting a cluster. I don't think this is the case here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1093501099


##########
tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.rmi.registry.LocateRegistry;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JmxToolTest {
+    private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
+
+    private static JMXConnectorServer jmxAgent;
+    private static String jmxUrl;
+
+    @BeforeAll
+    public static void beforeAll() throws Exception {
+        int port = findRandomOpenPortOnAllLocalInterfaces();
+        jmxAgent = startJmxAgent(port);
+        jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port);
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        jmxAgent.stop();
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        Exit.setExitProcedure(exitProcedure);
+    }
+
+    @AfterEach
+    public void afterEach() {
+        Exit.resetExitProcedure();
+    }
+
+    @Test
+    public void kafkaVersion() {
+        String out = executeAndGetOut("--version");
+        assertNormalExit();
+        assertEquals(AppInfoParser.getVersion(), out);

Review Comment:
   Ok, we reproduced the issue and I'll take your suggestion. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13136:
URL: https://github.com/apache/kafka/pull/13136#issuecomment-1406316345

   @mimaison I've rebased and added some tests. 
   
   This is now ready for another review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1092919815


##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxToolOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                    "can be given multiple times to specify more than one query. If no objects are specified " +
+                    "all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
+                    "attributes are specified all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+                    "Value of -1 equivalent to setting one-time to true")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(2000);
+            oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+                .withOptionalArg()

Review Comment:
   Here I'm fixing the following issue with the old implementation.
   
   It is rather common to pass boolean options like `--one-time` without the argument (true is implicit).
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool \
     --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   "time","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"
   20230201-09:39:41,0.0,0.0
   ```
   
   As you can see, the parser does not raise an error because it sees `--report-format tsv` as the argument of `--one-time`, which also means that `report-format` is ignored.
   
   Using optionalArg fixes the issue, without causing any breaking change.
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool   --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   time    20230201-09:44:40
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate    0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate       0.0
   ```



##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxToolOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                    "can be given multiple times to specify more than one query. If no objects are specified " +
+                    "all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
+                    "attributes are specified all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+                    "Value of -1 equivalent to setting one-time to true")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(2000);
+            oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+                .withOptionalArg()

Review Comment:
   Here I'm fixing the following issue with the old implementation.
   
   It is rather common to pass boolean options like `--one-time` without the argument (true is implicit).
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool \
     --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   "time","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"
   20230201-09:39:41,0.0,0.0
   ```
   
   As you can see, the parser does not raise an error because it sees `--report-format tsv` as the argument of `--one-time`, which also means that `--report-format` is ignored.
   
   Using optionalArg fixes the issue, without causing any breaking change.
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool   --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   time    20230201-09:44:40
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate    0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate       0.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison merged pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison merged PR #13136:
URL: https://github.com/apache/kafka/pull/13136


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1086694567


##########
tools/src/main/java/org/apache/kafka/tools/JmxCommand.java:
##########
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxCommand {
+    public static void main(String[] args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        JmxCommandOptions options = new JmxCommandOptions(args);
+        CommandLineUtils.printHelpAndExitIfNeeded(options, "Dump JMX values to standard output.");
+
+        Optional<String[]> attributesInclude = options.attributesInclude();
+        Optional<DateFormat> dateFormat = options.dateFormat();
+        String reportFormat = options.parseFormat();
+        boolean keepGoing = true;
+
+        MBeanServerConnection conn = connectToBeanServer(options);
+        List<ObjectName> queries = options.queries();
+        boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+        Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+        Map<ObjectName, Integer> numExpectedAttributes =
+                findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+        List<String> keys = new ArrayList<>();
+        keys.add("time");
+        keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+        maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+        while (keepGoing) {
+            long start = System.currentTimeMillis();
+            Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+            attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+            maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+            if (options.isOneTime()) {
+                keepGoing = false;
+            } else {
+                TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+            }
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxCommandOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxCommandOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.keySet().size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxCommandOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxCommandOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                            "can be given multiple times to specify more than one query. If no objects are specified " +
+                            "all objects will be queried.")
+                    .withOptionalArg()

Review Comment:
   Thanks for spotting this, reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1092895124


##########
tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.rmi.registry.LocateRegistry;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("integration")

Review Comment:
   Agree. Initially I was starting a cluster, but it was an overkill, so I switched to the platform MBean server. Let me remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1092919815


##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxToolOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                    "can be given multiple times to specify more than one query. If no objects are specified " +
+                    "all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
+                    "attributes are specified all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+                    "Value of -1 equivalent to setting one-time to true")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(2000);
+            oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+                .withOptionalArg()

Review Comment:
   Here I'm fixing the following issue with the old implementation.
   
   It is rather common to pass boolean options like `--one-time` without the argument (true is implicit).
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool \
     --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   "time","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"
   20230201-09:39:41,0.0,0.0
   ```
   
   As you can see, the parser sees `--report-format tsv` as the argument of `--one-time` and TSV formatting is ignored.
   
   Using optionalArg fixes the issue, without causing any breaking change.
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool   --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   time    20230201-09:44:40
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate    0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate       0.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1085565198


##########
checkstyle/import-control.xml:
##########
@@ -347,7 +347,7 @@
 
   <subpackage name="server">
     <allow pkg="org.apache.kafka.common" />
-    <allow pkg="joptsimple" />

Review Comment:
   We seem to have the trailing space everywhere else, so maybe keep it here too



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -801,21 +801,15 @@ object ConsumerGroupCommand extends Logging {
         partitionsToReset.map { topicPartition =>
           logStartOffsets.get(topicPartition) match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
-            case _ => {
-              CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")
-              Exit.exit(1)
-            }
+            case _ => ToolsUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")

Review Comment:
   Can you remove other changes not related to JmxTool? 



##########
tools/src/main/java/org/apache/kafka/tools/JmxCommand.java:
##########
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxCommand {
+    public static void main(String[] args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        JmxCommandOptions options = new JmxCommandOptions(args);
+        CommandLineUtils.printHelpAndExitIfNeeded(options, "Dump JMX values to standard output.");
+
+        Optional<String[]> attributesInclude = options.attributesInclude();
+        Optional<DateFormat> dateFormat = options.dateFormat();
+        String reportFormat = options.parseFormat();
+        boolean keepGoing = true;
+
+        MBeanServerConnection conn = connectToBeanServer(options);
+        List<ObjectName> queries = options.queries();
+        boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+        Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+        Map<ObjectName, Integer> numExpectedAttributes =
+                findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+        List<String> keys = new ArrayList<>();
+        keys.add("time");
+        keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+        maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+        while (keepGoing) {
+            long start = System.currentTimeMillis();
+            Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+            attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+            maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+            if (options.isOneTime()) {
+                keepGoing = false;
+            } else {
+                TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+            }
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxCommandOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxCommandOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.keySet().size() == sumValues(numExpectedAttributes) + 1) {

Review Comment:
   We should be able to do `attributes.size()` instead of `attributes.keySet().size()`



##########
tools/src/main/java/org/apache/kafka/tools/JmxCommand.java:
##########
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxCommand {
+    public static void main(String[] args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        JmxCommandOptions options = new JmxCommandOptions(args);
+        CommandLineUtils.printHelpAndExitIfNeeded(options, "Dump JMX values to standard output.");
+
+        Optional<String[]> attributesInclude = options.attributesInclude();
+        Optional<DateFormat> dateFormat = options.dateFormat();
+        String reportFormat = options.parseFormat();
+        boolean keepGoing = true;
+
+        MBeanServerConnection conn = connectToBeanServer(options);
+        List<ObjectName> queries = options.queries();
+        boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+        Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+        Map<ObjectName, Integer> numExpectedAttributes =
+                findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+        List<String> keys = new ArrayList<>();
+        keys.add("time");
+        keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+        maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+        while (keepGoing) {
+            long start = System.currentTimeMillis();
+            Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+            attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+            maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+            if (options.isOneTime()) {
+                keepGoing = false;
+            } else {
+                TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+            }
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxCommandOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxCommandOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.keySet().size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxCommandOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxCommandOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                            "can be given multiple times to specify more than one query. If no objects are specified " +
+                            "all objects will be queried.")
+                    .withOptionalArg()

Review Comment:
   The initial tool had `withRequiredArg`. Why are we changing it?
   
   Same for a few more arguments below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by GitBox <gi...@apache.org>.
fvaleri commented on PR #13136:
URL: https://github.com/apache/kafka/pull/13136#issuecomment-1398707514

   Output example:
   
   ```sh
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxCommand --jmx-url service:jmx:rmi:///jndi/rmi://:9999/jmxrmi --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec --attributes FifteenMinuteRate,FiveMinuteRate --date-format "yyyyMMdd-hh:mm:ss" --reporting-interval 1000 --report-format tsv
   Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://:9999/jmxrmi
   time    20230120-06:23:14
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate    0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate	0.0
   time	20230120-06:23:15
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate	0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate	0.0
   time	20230120-06:23:16
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate	0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate	0.0
   ^C
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1086675903


##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -801,21 +801,15 @@ object ConsumerGroupCommand extends Logging {
         partitionsToReset.map { topicPartition =>
           logStartOffsets.get(topicPartition) match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
-            case _ => {
-              CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")
-              Exit.exit(1)
-            }
+            case _ => ToolsUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")

Review Comment:
   Well, this is built on top of https://github.com/apache/kafka/pull/13131, which is not approved and merged yet, so I cannot actually remove these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13136:
URL: https://github.com/apache/kafka/pull/13136#issuecomment-1403712199

   Thanks @mimaison. 
   
   I addressed all your comments and now working on the test suite.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1092919815


##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxToolOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                    "can be given multiple times to specify more than one query. If no objects are specified " +
+                    "all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
+                    "attributes are specified all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+                    "Value of -1 equivalent to setting one-time to true")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(2000);
+            oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+                .withOptionalArg()

Review Comment:
   Here I'm fixing the following issue with the old implementation:
   
   It is rather common to pass boolean options without the argument (true is implicit).
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool \
     --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   "time","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"
   20230201-09:39:41,0.0,0.0
   ```
   
   As you can see, the parser sees `--report-format tsv` as the argument of `--one-time` and TSV formatting is ignored.
   
   Setting optional args fixes the issue, without causing any breaking change.
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool   --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   time    20230201-09:44:40
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate    0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate       0.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1092919815


##########
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxTool {
+    public static void main(String[] args) {
+        try {
+            JmxToolOptions options = new JmxToolOptions(args);
+            if (CommandLineUtils.isPrintHelpNeeded(options)) {
+                CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
+                return;
+            }
+            if (CommandLineUtils.isPrintVersionNeeded(options)) {
+                CommandLineUtils.printVersionAndExit();
+                return;
+            }
+
+            Optional<String[]> attributesInclude = options.attributesInclude();
+            Optional<DateFormat> dateFormat = options.dateFormat();
+            String reportFormat = options.parseFormat();
+            boolean keepGoing = true;
+
+            MBeanServerConnection conn = connectToBeanServer(options);
+            List<ObjectName> queries = options.queries();
+            boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+            Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
+            Map<ObjectName, Integer> numExpectedAttributes =
+                    findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
+
+            List<String> keys = new ArrayList<>();
+            keys.add("time");
+            keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
+            maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+            while (keepGoing) {
+                long start = System.currentTimeMillis();
+                Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
+                attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
+                maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
+                if (options.isOneTime()) {
+                    keepGoing = false;
+                } else {
+                    TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
+                }
+            }
+            Exit.exit(0);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
+        return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
+                                                          MBeanServerConnection conn,
+                                                          List<ObjectName> queries,
+                                                          boolean hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), ",");
+            throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      Optional<String[]> attributesInclude,
+                                                                      boolean hasPatternQueries,
+                                                                      List<ObjectName> queries,
+                                                                      Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
+                                                       Set<ObjectName> objectNames,
+                                                       Optional<String[]> attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
+        if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxToolOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
+                    "can be given multiple times to specify more than one query. If no objects are specified " +
+                    "all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
+                    "attributes are specified all objects will be queried.")
+                .withRequiredArg()
+                .describedAs("name")
+                .ofType(String.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
+                    "Value of -1 equivalent to setting one-time to true")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(2000);
+            oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
+                .withOptionalArg()

Review Comment:
   Here I'm fixing the following issue with the old implementation.
   
   It is rather common to pass boolean options like `--one-time` without the argument (true is implicit).
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool \
     --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   "time","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"
   20230201-09:39:41,0.0,0.0
   ```
   
   As you can see, the parser sees `--report-format tsv` as the argument of `--one-time` and TSV formatting is ignored.
   
   Setting optional args fixes the issue, without causing any breaking change.
   
   ```
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxTool   --jmx-url service:jmx:rmi:///jndi/rmi://:9990/jmxrmi \
     --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
     --attributes FifteenMinuteRate,FiveMinuteRate \
     --date-format "yyyyMMdd-hh:mm:ss" \
     --one-time \
     --report-format tsv
   time    20230201-09:44:40
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate    0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate       0.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13136: KAFKA-14582: Move JmxTool to tools

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13136:
URL: https://github.com/apache/kafka/pull/13136#issuecomment-1426833220

   Looks like this caused some system test failures: http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/trunk/2023-02-11--001.system-test-kafka-trunk--1676126329--confluentinc--master--d0d9e3f297/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org