You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by priyank5485 <gi...@git.apache.org> on 2016/05/31 04:35:48 UTC

[GitHub] storm pull request: STORM-1136: Command line module to return kafk...

GitHub user priyank5485 opened a pull request:

    https://github.com/apache/storm/pull/1451

    STORM-1136: Command line module to return kafka spout offsets lag and\u2026

    \u2026 display in storm ui.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/priyank5485/storm STORM-1136

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1451.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1451
    
----
commit 83b70af3d2a419e81ea9dc8a962dfccbda2a3255
Author: Priyank <ps...@hortonworks.com>
Date:   2016-05-18T19:14:50Z

    STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1451


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    sitll +1. @abellina can you address @priyank5485 response.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the pull request:

    https://github.com/apache/storm/pull/1451
  
    @harshach Will do that. Just to confirm, you mean the section header in UI under Topology Stats, right? Also, since the topology can technically have more than one kafka spout and we are showing lags for all spouts there I was thinking of keeping it as plural. May be Kafka Spouts Lag
    
    Does that sound okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @harshach I added a comment on the format for the response from the lag endpoint. I still believe the backend should do some of the work the UI is currently doing. 
    
    @priyank5485 div name makes sense now. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @hmcl I have added a comment.
    @harshach I have added you as a sponsor. I will revisit the trident part later and refactor any common code in getComponentConfiguration as suggested by you then. Please review again when you get a chance. Thanks for all your help with this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r65576093
  
    --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java ---
    @@ -0,0 +1,374 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.monitor;
    +
    +import kafka.api.OffsetRequest;
    +import kafka.api.PartitionOffsetRequestInfo;
    +import kafka.common.TopicAndPartition;
    +import kafka.javaapi.OffsetResponse;
    +import kafka.javaapi.consumer.SimpleConsumer;
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Options;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryOneTime;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.json.simple.JSONValue;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +/**
    + * Utility class for querying offset lag for kafka spout
    + */
    +public class KafkaOffsetLagUtil {
    +    private static final String OPTION_TOPIC_SHORT = "t";
    --- End diff --
    
    what is the difference between SHORT and LONG ? why t vs topics ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by darionyaphet <gi...@git.apache.org>.
Github user darionyaphet commented on the pull request:

    https://github.com/apache/storm/pull/1451
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @abellina I have added a comment for that div. As far as renaming it to topology-spout-stats is concerned I prefer the current name since right below that div there is a div showing stats for the spout like complete latency, etc. I dont want the two to be confused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the pull request:

    https://github.com/apache/storm/pull/1451
  
    ![kafkaspoutlags_success](https://cloud.githubusercontent.com/assets/5192436/15682583/402a1180-2713-11e6-9ad2-e101cbb1ea4c.png)
    ![kafkaspoutlags_failure](https://cloud.githubusercontent.com/assets/5192436/15682586/43807626-2713-11e6-8609-870774dbc511.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66382723
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    +                var isJson = function (input) {
    +                  try {
    +                    JSON.parse(input);
    +                  } catch (e) {
    +                    return false;
    +                  }
    +                  return true;
    +                };
    +                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
    +                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
    +                var data = {};
    +                if (kafkaSpoutsValidResults.length > 0) {
    +                  data.kafkaSpoutsLagResults = [];
    +                  kafkaSpoutsValidResults.forEach(function(ele) {
    +                    var spoutLagResult = JSON.parse(ele.spoutLagResult);
    +                    spoutLagResult.forEach(function(ele2) {
    --- End diff --
    
    We would still need the for loop as need to flatten the results out. I chose to use id in the json passed to template since the template uses id in other tables for ui. If you are fine we can leave it as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r65582081
  
    --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java ---
    @@ -0,0 +1,374 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.monitor;
    +
    +import kafka.api.OffsetRequest;
    +import kafka.api.PartitionOffsetRequestInfo;
    +import kafka.common.TopicAndPartition;
    +import kafka.javaapi.OffsetResponse;
    +import kafka.javaapi.consumer.SimpleConsumer;
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Options;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryOneTime;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.json.simple.JSONValue;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +/**
    + * Utility class for querying offset lag for kafka spout
    + */
    +public class KafkaOffsetLagUtil {
    +    private static final String OPTION_TOPIC_SHORT = "t";
    +    private static final String OPTION_TOPIC_LONG = "topics";
    +    private static final String OPTION_OLD_CONSUMER_SHORT = "o";
    +    private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
    +    private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
    +    private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers";
    +    private static final String OPTION_GROUP_ID_SHORT = "g";
    +    private static final String OPTION_GROUP_ID_LONG = "groupid";
    +    private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
    +    private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic";
    +    private static final String OPTION_PARTITIONS_SHORT = "p";
    +    private static final String OPTION_PARTITIONS_LONG = "partitions";
    +    private static final String OPTION_LEADERS_SHORT = "l";
    +    private static final String OPTION_LEADERS_LONG = "leaders";
    +    private static final String OPTION_ZK_SERVERS_SHORT = "z";
    +    private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
    +    private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
    +    private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
    +    private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
    +    private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
    +
    +    public static void main (String args[]) {
    +        try {
    +            List<KafkaOffsetLagResult> results;
    +            Options options = buildOptions();
    +            CommandLineParser parser = new DefaultParser();
    +            CommandLine commandLine = parser.parse(options, args);
    +            if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
    +                printUsageAndExit(options, OPTION_TOPIC_LONG + " is required");
    +            }
    +            if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
    +                OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
    +                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
    +                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
    +                            OPTION_OLD_CONSUMER_LONG);
    +                }
    +                if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
    +                    printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
    +                            OPTION_OLD_CONSUMER_LONG);
    +                }
    +                String[] topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
    +                if (topics != null && topics.length > 1) {
    +                    printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic or a " +
    +                            "wildcard string for matching topics is supported");
    +                }
    +                if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
    +                    if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " or " + OPTION_LEADERS_LONG + " is not accepted with " +
    +                                OPTION_ZK_BROKERS_ROOT_LONG);
    +                    }
    +                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
    +                            (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
    +                            (OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
    +                } else {
    +                    if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) {
    +                        printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG);
    +                    }
    +                    if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " + OPTION_ZK_BROKERS_ROOT_LONG +
    +                                " is not provided");
    +                    }
    +                    String[] partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
    +                    String[] leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
    +                    if (partitions.length != leaders.length) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " need to be of same size");
    +                    }
    +                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
    +                            (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue
    +                            (OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG));
    +                }
    +                results = getOffsetLags(oldKafkaSpoutOffsetQuery);
    +            } else {
    +                String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
    +                        OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG};
    +                for (String oldOption: oldSpoutOptions) {
    +                    if (commandLine.hasOption(oldOption)) {
    +                        printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG);
    +                    }
    +                }
    +                if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
    +                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + OPTION_OLD_CONSUMER_LONG +
    +                            " is not specified");
    +                }
    +                NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
    +                        commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
    +                results = getOffsetLags(newKafkaSpoutOffsetQuery);
    +            }
    +            System.out.print(JSONValue.toJSONString(results));
    +        } catch (Exception ex) {
    +            System.out.print("Unable to get offset lags for kafka. Reason: ");
    +            ex.printStackTrace(System.out);
    +        }
    +    }
    +
    +    private static void printUsageAndExit (Options options, String message) {
    +        System.out.println(message);
    +        HelpFormatter formatter = new HelpFormatter();
    +        formatter.printHelp("storm-kafka-monitor ", options);
    +        System.exit(1);
    +    }
    +
    +    private static Options buildOptions () {
    +        Options options = new Options();
    +        options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed " +
    +                "offset");
    +        options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
    +        options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new " +
    +                "consumer/spout e.g. hostname1:9092,hostname2:9092");
    +        options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) ");
    +        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in " +
    +                "old spout");
    +        options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to " +
    +                OPTION_LEADERS_LONG + " for old spout with StaticHosts");
    +        options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to " +
    +                OPTION_PARTITIONS_LONG + " for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092");
    +        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets  " +
    +                "and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181");
    +        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed" +
    +                " offsets without the topic and partition nodes");
    +        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " +
    +                "/brokers (applicable only for old kafka spout) ");
    +        return options;
    +    }
    +
    +    /**
    +     *
    +     * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
    +     * @return log head offset, spout offset and lag for each partition
    +     */
    +    public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
    +        KafkaConsumer<String, String> consumer = null;
    +        List<KafkaOffsetLagResult> result = new ArrayList<>();
    +        try {
    +            Properties props = new Properties();
    +            props.put("bootstrap.servers", newKafkaSpoutOffsetQuery.getBootStrapBrokers());
    +            props.put("group.id", newKafkaSpoutOffsetQuery.getConsumerGroupId());
    +            props.put("enable.auto.commit", "false");
    +            props.put("session.timeout.ms", "30000");
    +            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    +            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    +            List<TopicPartition> topicPartitionList = new ArrayList<>();
    +            consumer = new KafkaConsumer<>(props);
    +            for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) {
    +                List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
    +                if (partitionInfoList != null) {
    +                    for (PartitionInfo partitionInfo : partitionInfoList) {
    +                        topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    +                    }
    +                }
    +            }
    +            consumer.assign(topicPartitionList);
    +            for (TopicPartition topicPartition : topicPartitionList) {
    +                OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
    +                long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.offset() : -1;
    +                consumer.seekToEnd(topicPartition);
    +                result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, consumer.position(topicPartition)));
    +            }
    +        } finally {
    +            if (consumer != null) {
    +                consumer.close();
    +            }
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     *
    +     * @param oldKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
    +     * @return log head offset, spout offset and lag for each partition
    +     */
    +    public static List<KafkaOffsetLagResult> getOffsetLags (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
    +        List<KafkaOffsetLagResult> result = new ArrayList<>();
    +        Map<String, List<TopicPartition>> leaders = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
    +        if (leaders != null) {
    +            Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leaders);
    +            Map<String, List<Integer>> topicPartitions = new HashMap<>();
    +            for (Map.Entry<String, List<TopicPartition>> entry: leaders.entrySet()) {
    +                for (TopicPartition topicPartition: entry.getValue()) {
    +                    if (!topicPartitions.containsKey(topicPartition.topic())) {
    +                        topicPartitions.put(topicPartition.topic(), new ArrayList<Integer>());
    +                    }
    +                    topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
    +                }
    +            }
    +            Map<String, Map<Integer, Long>> oldConsumerOffsets = getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
    +            for (Map.Entry<String, Map<Integer, Long>> topicOffsets: logHeadOffsets.entrySet()) {
    +                for (Map.Entry<Integer, Long> partitionOffsets: topicOffsets.getValue().entrySet()) {
    +                    Long consumerCommittedOffset = oldConsumerOffsets.get(topicOffsets.getKey()) != null ? oldConsumerOffsets.get(topicOffsets.getKey()).get
    +                            (partitionOffsets.getKey()) : -1;
    +                    consumerCommittedOffset = (consumerCommittedOffset == null ? -1 : consumerCommittedOffset);
    +                    KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
    +                            consumerCommittedOffset, partitionOffsets.getValue());
    +                    result.add(kafkaOffsetLagResult);
    +                }
    +            }
    +        }
    +        return result;
    +    }
    +
    +    private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
    +        Map<String, List<TopicPartition>> result = new HashMap<>();
    +        if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
    +            String[] partitions = oldKafkaSpoutOffsetQuery.getPartitions().split(",");
    +            String[] leaders = oldKafkaSpoutOffsetQuery.getLeaders().split(",");
    +            for (int i = 0; i < leaders.length; ++i) {
    +                if (!result.containsKey(leaders[i])) {
    +                    result.put(leaders[i], new ArrayList<TopicPartition>());
    +                }
    +                result.get(leaders[i]).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(partitions[i])));
    +            }
    +        } else {
    --- End diff --
    
    Old kafka spout has two ways for its configuration. 
    1. StaticHosts
    2. ZkHosts
    StaticHosts is when user adds the leader for each partition statically at the time of building the topology. In that case we need to use that information to get latest offsets. For ZkHosts the KafkaSpout finds leaders in the prepare method using zk nodes and consumes data. Thats what the if else represents. I will add some documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66442673
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    +                var isJson = function (input) {
    +                  try {
    +                    JSON.parse(input);
    +                  } catch (e) {
    +                    return false;
    +                  }
    +                  return true;
    +                };
    +                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
    +                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
    +                var data = {};
    +                if (kafkaSpoutsValidResults.length > 0) {
    +                  data.kafkaSpoutsLagResults = [];
    +                  kafkaSpoutsValidResults.forEach(function(ele) {
    +                    var spoutLagResult = JSON.parse(ele.spoutLagResult);
    +                    spoutLagResult.forEach(function(ele2) {
    --- End diff --
    
    @priyank5485 The result currently looks like (if I understand correctly):
    
    [{spoutId: "spout-1", spoutLagResult: "{topic: '..', partition: '..', logHeadOffset: '..', ...}",
     {spoutId: "spout-2", spoutLagResult: "error message"]
    
    I think this should be split into two arrays. Sample proposed response:
    
    // notice no strings, it is strictly JSON.
     {valid: [{spoutId: "spout-1", spoutLagResult: [topic: '..', partition: '..', logHeadOffset: '..', ...]}, ... ],
      invalid: [{spoutId: "spout-2", error: ""}, ...]}
    
    This tells you at a glance if something had an error: resp.invalid.length > 0, tells you if there are non-errors spouts: resp.valid.length > 0, and allows you to loop through resp.valid and resp.invalid to populate your tables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r65303241
  
    --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java ---
    @@ -0,0 +1,102 @@
    +/*
    --- End diff --
    
    can we put this in org.apache.storm.tools.KafkaOffsetLagTool.java 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the pull request:

    https://github.com/apache/storm/pull/1451
  
    @HeartSaVioR I just attached the two screen shots. One when it successfully managed to get the lag result. Other one by stopping kafka in which case it will just show a message in the UI indicating the cause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/1451
  
    @priyank5485 this looks great. Left some minor nit-picks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66466516
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    --- End diff --
    
    @abellina Yes thats the idea to extend it for other spouts. For now only kafkaspout supported


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @abellina lets handle the concern you've in https://issues.apache.org/jira/browse/STORM-1891.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66306233
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    +                var isJson = function (input) {
    +                  try {
    +                    JSON.parse(input);
    +                  } catch (e) {
    +                    return false;
    +                  }
    +                  return true;
    +                };
    +                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
    +                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
    +                var data = {};
    +                if (kafkaSpoutsValidResults.length > 0) {
    +                  data.kafkaSpoutsLagResults = [];
    +                  kafkaSpoutsValidResults.forEach(function(ele) {
    +                    var spoutLagResult = JSON.parse(ele.spoutLagResult);
    +                    spoutLagResult.forEach(function(ele2) {
    --- End diff --
    
    or it returns spoutId (as it does now), and the rest of the code is adapted to take spoutId.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66305519
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    +                var isJson = function (input) {
    +                  try {
    +                    JSON.parse(input);
    +                  } catch (e) {
    +                    return false;
    +                  }
    +                  return true;
    +                };
    +                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
    +                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
    --- End diff --
    
    Could the response still be json in the error case? I think the response should be JSON at all times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/1451
  
    @priyank5485 Yes. sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    opened https://issues.apache.org/jira/browse/STORM-1891to track trident kafka spouts


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @abellina The tables are optional already. If there are no kafka spouts then the response will be an empty array. Can you try running a non kafka spout topology and check? I did.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r65303499
  
    --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetQueryInfoNew.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +package org.apache.storm.kafka.monitor;
    +
    +/**
    + * Class representing information for querying kafka for log head offsets, consumer offsets and the difference for new kafka spout using new consumer api
    + */
    +public class KafkaOffsetQueryInfoNew {
    --- End diff --
    
    Should we call this as NewKafkaSpoutOffsetQuery and OldKafkaSpoutOffsetQery


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    Could the new tables be made optional? That way when a topology doesn't use KafkaSpout, the tables don't show in the UI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafk...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/1451#issuecomment-222596393
  
    @priyank5485 Could you attach UI screenshot to see output easily? Thanks in advance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66306159
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    +                var isJson = function (input) {
    +                  try {
    +                    JSON.parse(input);
    +                  } catch (e) {
    +                    return false;
    +                  }
    +                  return true;
    +                };
    +                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
    +                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
    +                var data = {};
    +                if (kafkaSpoutsValidResults.length > 0) {
    +                  data.kafkaSpoutsLagResults = [];
    +                  kafkaSpoutsValidResults.forEach(function(ele) {
    +                    var spoutLagResult = JSON.parse(ele.spoutLagResult);
    +                    spoutLagResult.forEach(function(ele2) {
    --- End diff --
    
    Could the rest endpoint return id instead of spoutId, and this loop can go away?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @priyank5485 I overlooked that this code is in external/module we can keep the classes as it is. My comment regarding getComponentConfiguration as it looks to be doing bit of logic in KafkaSpout and my suggestion was to move that code to KafkaUtils and call buildKafkaSpoutComponentConfig and return the config map . This way spout code looks cleaner and may be buildKafkaSpoutComponentConfig can be reused for trident. 
    Go ahead with you commit squashes and you can add me to sponsor list since its in external module


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/1451
  
    @priyank5485 Nice!!. Can we rename the section as KafkaSpout Lag


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66382357
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    +                var isJson = function (input) {
    +                  try {
    +                    JSON.parse(input);
    +                  } catch (e) {
    +                    return false;
    +                  }
    +                  return true;
    +                };
    +                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
    +                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
    --- End diff --
    
    @abellina If its an error we only want to show an appropriate message to the user as a string. Also, not that the top level response is still a json. I think string is fine here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @priyank5485 Yes I see the tables don't get populated if lag responses are empty, but we still end up with the div markup on the page. It seems that this may be an area where other spouts could put their stats in the future. Could you call the div topology-spout-stats, and put a comment in the html that says it gets populated if stats are available?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66524810
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    +                var isJson = function (input) {
    +                  try {
    +                    JSON.parse(input);
    +                  } catch (e) {
    +                    return false;
    +                  }
    +                  return true;
    +                };
    +                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
    +                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
    +                var data = {};
    +                if (kafkaSpoutsValidResults.length > 0) {
    +                  data.kafkaSpoutsLagResults = [];
    +                  kafkaSpoutsValidResults.forEach(function(ele) {
    +                    var spoutLagResult = JSON.parse(ele.spoutLagResult);
    +                    spoutLagResult.forEach(function(ele2) {
    --- End diff --
    
    @priyank5485 yes, sorry that was my intention but I didn't write it correctly in my comment, spoutLagResult should be a JSON array. Thanks for the background!
    
    That said, I think that TopologySpoutLag should do this (my two cents). That way the output from the rest interface doesn't need further inspection and some other script can call on your new lag endpoint and get a JSON object they use without having to do the extra testing. I am not sure that valid/invalid are the best labels here by the way, just an example I came up with.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @harshach I have renamed Kafka Log Head Offset to Latest Offset, since Kafka is present in ui header and topic is a column already. I have also renamed the Query classes as per your suggestion.
    
    Regarding moving it under package tool, I thought of this module specific to kafka and getting the offset lags. If you are okay with it we can keep it as is instead of creating a tools package. Or any other suggestion if you have.
    
    Regarding moving getComponentConfiguration to KafkaUtils, KafkaUtils exist only in the old module. So i will keep it in the spout for now. I plan to add getComponentConfiguration for trident as well. If there is code duplication there, I will try to reuse it for old module and new module(when trident is ready).
    
    Let me know if there is anything else you want me to address. Otherwise I will squash the commits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66435002
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    --- End diff --
    
    is the /lag endpoint returning spouts that aren't KafkaSpout?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r65576345
  
    --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java ---
    @@ -0,0 +1,374 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.monitor;
    +
    +import kafka.api.OffsetRequest;
    +import kafka.api.PartitionOffsetRequestInfo;
    +import kafka.common.TopicAndPartition;
    +import kafka.javaapi.OffsetResponse;
    +import kafka.javaapi.consumer.SimpleConsumer;
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Options;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryOneTime;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.json.simple.JSONValue;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +/**
    + * Utility class for querying offset lag for kafka spout
    + */
    +public class KafkaOffsetLagUtil {
    +    private static final String OPTION_TOPIC_SHORT = "t";
    +    private static final String OPTION_TOPIC_LONG = "topics";
    +    private static final String OPTION_OLD_CONSUMER_SHORT = "o";
    +    private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
    +    private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
    +    private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers";
    +    private static final String OPTION_GROUP_ID_SHORT = "g";
    +    private static final String OPTION_GROUP_ID_LONG = "groupid";
    +    private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
    +    private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic";
    +    private static final String OPTION_PARTITIONS_SHORT = "p";
    +    private static final String OPTION_PARTITIONS_LONG = "partitions";
    +    private static final String OPTION_LEADERS_SHORT = "l";
    +    private static final String OPTION_LEADERS_LONG = "leaders";
    +    private static final String OPTION_ZK_SERVERS_SHORT = "z";
    +    private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
    +    private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
    +    private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
    +    private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
    +    private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
    +
    +    public static void main (String args[]) {
    +        try {
    +            List<KafkaOffsetLagResult> results;
    +            Options options = buildOptions();
    +            CommandLineParser parser = new DefaultParser();
    +            CommandLine commandLine = parser.parse(options, args);
    +            if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
    +                printUsageAndExit(options, OPTION_TOPIC_LONG + " is required");
    +            }
    +            if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
    +                OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
    +                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
    +                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
    +                            OPTION_OLD_CONSUMER_LONG);
    +                }
    +                if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
    +                    printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
    +                            OPTION_OLD_CONSUMER_LONG);
    +                }
    +                String[] topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
    +                if (topics != null && topics.length > 1) {
    +                    printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic or a " +
    +                            "wildcard string for matching topics is supported");
    +                }
    +                if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
    +                    if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " or " + OPTION_LEADERS_LONG + " is not accepted with " +
    +                                OPTION_ZK_BROKERS_ROOT_LONG);
    +                    }
    +                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
    +                            (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
    +                            (OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
    +                } else {
    +                    if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) {
    +                        printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG);
    +                    }
    +                    if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " + OPTION_ZK_BROKERS_ROOT_LONG +
    +                                " is not provided");
    +                    }
    +                    String[] partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
    +                    String[] leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
    +                    if (partitions.length != leaders.length) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " need to be of same size");
    +                    }
    +                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
    +                            (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue
    +                            (OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG));
    +                }
    +                results = getOffsetLags(oldKafkaSpoutOffsetQuery);
    +            } else {
    +                String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
    +                        OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG};
    +                for (String oldOption: oldSpoutOptions) {
    +                    if (commandLine.hasOption(oldOption)) {
    +                        printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG);
    +                    }
    +                }
    +                if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
    +                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + OPTION_OLD_CONSUMER_LONG +
    +                            " is not specified");
    +                }
    +                NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
    +                        commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
    +                results = getOffsetLags(newKafkaSpoutOffsetQuery);
    +            }
    +            System.out.print(JSONValue.toJSONString(results));
    +        } catch (Exception ex) {
    +            System.out.print("Unable to get offset lags for kafka. Reason: ");
    +            ex.printStackTrace(System.out);
    +        }
    +    }
    +
    +    private static void printUsageAndExit (Options options, String message) {
    +        System.out.println(message);
    +        HelpFormatter formatter = new HelpFormatter();
    +        formatter.printHelp("storm-kafka-monitor ", options);
    +        System.exit(1);
    +    }
    +
    +    private static Options buildOptions () {
    +        Options options = new Options();
    +        options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed " +
    +                "offset");
    +        options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
    +        options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new " +
    +                "consumer/spout e.g. hostname1:9092,hostname2:9092");
    +        options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) ");
    +        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in " +
    +                "old spout");
    +        options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to " +
    +                OPTION_LEADERS_LONG + " for old spout with StaticHosts");
    +        options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to " +
    +                OPTION_PARTITIONS_LONG + " for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092");
    +        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets  " +
    +                "and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181");
    +        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed" +
    +                " offsets without the topic and partition nodes");
    +        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " +
    +                "/brokers (applicable only for old kafka spout) ");
    +        return options;
    +    }
    +
    +    /**
    +     *
    +     * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
    +     * @return log head offset, spout offset and lag for each partition
    +     */
    +    public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
    +        KafkaConsumer<String, String> consumer = null;
    +        List<KafkaOffsetLagResult> result = new ArrayList<>();
    +        try {
    +            Properties props = new Properties();
    +            props.put("bootstrap.servers", newKafkaSpoutOffsetQuery.getBootStrapBrokers());
    +            props.put("group.id", newKafkaSpoutOffsetQuery.getConsumerGroupId());
    +            props.put("enable.auto.commit", "false");
    +            props.put("session.timeout.ms", "30000");
    +            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    +            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    +            List<TopicPartition> topicPartitionList = new ArrayList<>();
    +            consumer = new KafkaConsumer<>(props);
    +            for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) {
    +                List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
    +                if (partitionInfoList != null) {
    +                    for (PartitionInfo partitionInfo : partitionInfoList) {
    +                        topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    +                    }
    +                }
    +            }
    +            consumer.assign(topicPartitionList);
    +            for (TopicPartition topicPartition : topicPartitionList) {
    +                OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
    +                long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.offset() : -1;
    +                consumer.seekToEnd(topicPartition);
    +                result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, consumer.position(topicPartition)));
    +            }
    +        } finally {
    +            if (consumer != null) {
    +                consumer.close();
    +            }
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     *
    +     * @param oldKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
    +     * @return log head offset, spout offset and lag for each partition
    +     */
    +    public static List<KafkaOffsetLagResult> getOffsetLags (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
    +        List<KafkaOffsetLagResult> result = new ArrayList<>();
    +        Map<String, List<TopicPartition>> leaders = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
    +        if (leaders != null) {
    +            Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leaders);
    +            Map<String, List<Integer>> topicPartitions = new HashMap<>();
    +            for (Map.Entry<String, List<TopicPartition>> entry: leaders.entrySet()) {
    +                for (TopicPartition topicPartition: entry.getValue()) {
    +                    if (!topicPartitions.containsKey(topicPartition.topic())) {
    +                        topicPartitions.put(topicPartition.topic(), new ArrayList<Integer>());
    +                    }
    +                    topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
    +                }
    +            }
    +            Map<String, Map<Integer, Long>> oldConsumerOffsets = getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
    +            for (Map.Entry<String, Map<Integer, Long>> topicOffsets: logHeadOffsets.entrySet()) {
    +                for (Map.Entry<Integer, Long> partitionOffsets: topicOffsets.getValue().entrySet()) {
    +                    Long consumerCommittedOffset = oldConsumerOffsets.get(topicOffsets.getKey()) != null ? oldConsumerOffsets.get(topicOffsets.getKey()).get
    +                            (partitionOffsets.getKey()) : -1;
    +                    consumerCommittedOffset = (consumerCommittedOffset == null ? -1 : consumerCommittedOffset);
    +                    KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
    +                            consumerCommittedOffset, partitionOffsets.getValue());
    +                    result.add(kafkaOffsetLagResult);
    +                }
    +            }
    +        }
    +        return result;
    +    }
    +
    +    private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
    +        Map<String, List<TopicPartition>> result = new HashMap<>();
    +        if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
    +            String[] partitions = oldKafkaSpoutOffsetQuery.getPartitions().split(",");
    +            String[] leaders = oldKafkaSpoutOffsetQuery.getLeaders().split(",");
    +            for (int i = 0; i < leaders.length; ++i) {
    +                if (!result.containsKey(leaders[i])) {
    +                    result.put(leaders[i], new ArrayList<TopicPartition>());
    +                }
    +                result.get(leaders[i]).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(partitions[i])));
    +            }
    +        } else {
    --- End diff --
    
    when can this scenario occur? There is a lot of code in some of these methods. It would be ideal to put some comments hinting on what is going on, and/or split into smaller methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the pull request:

    https://github.com/apache/storm/pull/1451
  
    @harshach Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    @harshach Rebased 1.x branch since there were some merges since the PR was raised.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r66499861
  
    --- Diff: storm-core/src/ui/public/topology.html ---
    @@ -384,8 +389,51 @@ <h2 id="topology-resources-header">Topology resources</h2>
                 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
                 $('#topology-actions [data-toggle="tooltip"]').tooltip();
                 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
    +
    +            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
    +            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
    +              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
    +                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
    +                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
    +
    +                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
    +                var isJson = function (input) {
    +                  try {
    +                    JSON.parse(input);
    +                  } catch (e) {
    +                    return false;
    +                  }
    +                  return true;
    +                };
    +                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
    +                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
    +                var data = {};
    +                if (kafkaSpoutsValidResults.length > 0) {
    +                  data.kafkaSpoutsLagResults = [];
    +                  kafkaSpoutsValidResults.forEach(function(ele) {
    +                    var spoutLagResult = JSON.parse(ele.spoutLagResult);
    +                    spoutLagResult.forEach(function(ele2) {
    --- End diff --
    
    @abellina The spoutLagResult property is actually an array. Hence the flattening. 
    
    Let me elaborate the limitations of valid and invalid approach. The TopologySpoutLag class in storm-core called by ui server does not know anything about if the spoutLagResult is valid or not. Reason is,  the way to get lag information for kafka and other spouts is handled by making a shell call handled by an external module since we did not want any direct dependency on storm core. For now we have kafka, but we can add other types of spouts as well. Plus the fields in ui or the template for a different type of spout could be different. The only commonality I found was if something went wrong for getting lag info for underlying spout(kafka or other) then an error message is sent in response. This will work for all types of spouts. To do what you are saying we will have to inspect the response in TopologySpoutLag class. I felt it did not matter if it were ui doing that or server. I preferred ui. Let me know if it makes sense or not and if you still think we need to change somethin
 g.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

Posted by priyank5485 <gi...@git.apache.org>.
Github user priyank5485 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r65581276
  
    --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java ---
    @@ -0,0 +1,374 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.monitor;
    +
    +import kafka.api.OffsetRequest;
    +import kafka.api.PartitionOffsetRequestInfo;
    +import kafka.common.TopicAndPartition;
    +import kafka.javaapi.OffsetResponse;
    +import kafka.javaapi.consumer.SimpleConsumer;
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Options;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryOneTime;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.json.simple.JSONValue;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +/**
    + * Utility class for querying offset lag for kafka spout
    + */
    +public class KafkaOffsetLagUtil {
    +    private static final String OPTION_TOPIC_SHORT = "t";
    --- End diff --
    
    I think its just convenience. You can use -t or --topics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1451
  
    Thanks @priyank5485 . This looks great and will be greatly helpful for all the users of KafkaSpout. Lets open another JIRA for trident support. +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r65303547
  
    --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java ---
    @@ -188,6 +189,47 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
             }
         }
     
    +    @Override
    +    public Map<String, Object> getComponentConfiguration () {
    --- End diff --
    
    looks to be quite a bit of logic in here. If it makes sense lets move most of it into KafkaUtils.java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---