You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/06/21 01:09:49 UTC

[1/3] storm git commit: STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui.

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 382c3811a -> e7f478cad


STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c0a706a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c0a706a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c0a706a3

Branch: refs/heads/1.x-branch
Commit: c0a706a3de1c3eab6b6da20e934364c1737c3fac
Parents: 382c381
Author: Priyank <ps...@hortonworks.com>
Authored: Wed May 18 12:14:50 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Mon Jun 20 17:28:07 2016 -0700

----------------------------------------------------------------------
 bin/storm-kafka-monitor                         |  43 +++
 .../apache/storm/kafka/spout/KafkaSpout.java    |  17 +
 external/storm-kafka-monitor/README.md          |  41 ++
 external/storm-kafka-monitor/pom.xml            | 127 +++++++
 .../kafka/monitor/KafkaOffsetLagResult.java     | 102 +++++
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 376 +++++++++++++++++++
 .../kafka/monitor/NewKafkaSpoutOffsetQuery.java |  76 ++++
 .../kafka/monitor/OldKafkaSpoutOffsetQuery.java | 124 ++++++
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |  42 +++
 .../trident/GlobalPartitionInformation.java     |   4 +
 pom.xml                                         |   1 +
 storm-core/src/clj/org/apache/storm/ui/core.clj |  15 +-
 .../apache/storm/utils/TopologySpoutLag.java    | 141 +++++++
 .../templates/topology-page-template.html       |  86 +++++
 storm-core/src/ui/public/topology.html          |  48 +++
 storm-dist/binary/src/main/assembly/binary.xml  |   4 +-
 16 files changed, 1244 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/bin/storm-kafka-monitor
----------------------------------------------------------------------
diff --git a/bin/storm-kafka-monitor b/bin/storm-kafka-monitor
new file mode 100755
index 0000000..146152a
--- /dev/null
+++ b/bin/storm-kafka-monitor
@@ -0,0 +1,43 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Resolve links - $0 may be a softlink
+PRG="${0}"
+
+while [ -h "${PRG}" ]; do
+  ls=`ls -ld "${PRG}"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "${PRG}"`/"$link"
+  fi
+done
+
+STORM_BIN_DIR=`dirname ${PRG}`
+export STORM_BASE_DIR=`cd ${STORM_BIN_DIR}/..;pwd`
+
+# Which java to use
+if [ -z "$JAVA_HOME" ]; then
+  JAVA="java"
+else
+  JAVA="$JAVA_HOME/bin/java"
+fi
+
+exec "$JAVA" -cp "$STORM_BASE_DIR/extlib/*" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index d211ae9..bba245b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -381,6 +381,23 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return "{acked=" + acked + "} ";
     }
 
+    @Override
+    public Map<String, Object> getComponentConfiguration () {
+        Map<String, Object> configuration = super.getComponentConfiguration();
+        if (configuration == null) {
+            configuration = new HashMap<>();
+        }
+        String configKeyPrefix = "config.";
+        StringBuilder topics = new StringBuilder();
+        for (String topic: this.kafkaSpoutConfig.getSubscribedTopics()) {
+            topics.append(topic).append(",");
+        }
+        configuration.put(configKeyPrefix + "topics", topics.toString());
+        configuration.put(configKeyPrefix + "groupid", this.kafkaSpoutConfig.getConsumerGroupId());
+        configuration.put(configKeyPrefix + "bootstrap.servers", this.kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
+        return configuration;
+    }
+
     // ======= Offsets Commit Management ==========
 
     private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/README.md b/external/storm-kafka-monitor/README.md
new file mode 100644
index 0000000..4026baf
--- /dev/null
+++ b/external/storm-kafka-monitor/README.md
@@ -0,0 +1,41 @@
+#Storm Kafka Monitor 
+
+Tool to query kafka spout lags and show in Storm UI
+
+## Usage
+This tool provides a way to query kafka offsets that the spout has consumed successfully and the latest
+offsets in kafka. It provides an easy way to see how the topology is performing. It is a command line
+interface called storm-kafka-monitor in the bin directory. The results have also been included in storm
+ui on the topology page. It supports both new and the old kafka spout. Please execute the command
+line without any options to see usage.
+
+```java
+$STORM_HOME_DIR/bin/storm-kafka-monitor
+```
+
+## Future Work 
+The offset lag calculation support for trident kafka spouts will be added soon.
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/pom.xml b/external/storm-kafka-monitor/pom.xml
new file mode 100644
index 0000000..0ddb156
--- /dev/null
+++ b/external/storm-kafka-monitor/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--/**
+        * Licensed to the Apache Software Foundation (ASF) under one
+        * or more contributor license agreements.  See the NOTICE file
+        * distributed with this work for additional information
+        * regarding copyright ownership.  The ASF licenses this file
+        * to you under the Apache License, Version 2.0 (the
+        * "License"); you may not use this file except in compliance
+        * with the License.  You may obtain a copy of the License at
+        *
+        * http://www.apache.org/licenses/LICENSE-2.0
+        *
+        * Unless required by applicable law or agreed to in writing, software
+        * distributed under the License is distributed on an "AS IS" BASIS,
+        * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        * See the License for the specific language governing permissions and
+        * limitations under the License.
+        */-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.0.2-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>storm-kafka-monitor</artifactId>
+    <name>storm-kafka-monitor</name>
+
+
+    <packaging>jar</packaging>
+
+    <developers>
+        <developer>
+            <id>pshah</id>
+            <name>Priyank Shah</name>
+            <email>priyank5485@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <!--kafka libraries-->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>${kafka.artifact.id}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <exclusions>
+               <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+        <dependency>
+           <groupId>commons-cli</groupId>
+           <artifactId>commons-cli</artifactId>
+       </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    <shadedPattern>org.apache.kafka.shaded</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>kafka</pattern>
+                                    <shadedPattern>kafka.shaded</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>other.kafka</pattern>
+                                    <shadedPattern>other.kafka.shaded</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.slf4j.impl</pattern>
+                                    <shadedPattern>org.slf4j.impl.shaded</shadedPattern>
+                                </relocation>
+
+                                <relocation>
+                                    <pattern>org.apache.curator</pattern>
+                                    <shadedPattern>org.apache.curator.shaded</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.json.simple</pattern>
+                                    <shadedPattern>org.json.simple.shaded</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons.cli</pattern>
+                                    <shadedPattern>org.apache.commons.cli.shaded</shadedPattern>
+                                </relocation>
+                           </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
new file mode 100644
index 0000000..a6d898c
--- /dev/null
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import org.json.simple.JSONAware;
+
+/**
+ * Class representing the log head offsets, spout offsets and the lag for a topic
+ */
+public class KafkaOffsetLagResult implements JSONAware {
+    private String topic;
+    private int partition;
+    private long consumerCommittedOffset;
+    private long logHeadOffset;
+    private long lag;
+
+    public KafkaOffsetLagResult(String topic, int parition, long consumerCommittedOffset, long logHeadOffset) {
+        this.topic = topic;
+        this.partition = parition;
+        this.consumerCommittedOffset = consumerCommittedOffset;
+        this.logHeadOffset = logHeadOffset;
+        this.lag = this.logHeadOffset - this.consumerCommittedOffset;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public long getConsumerCommittedOffset() {
+        return consumerCommittedOffset;
+    }
+
+    public long getLogHeadOffset() {
+        return logHeadOffset;
+    }
+
+    public long getLag() {
+        return lag;
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaOffsetLagResult{" +
+                "topic='" + topic + '\'' +
+                ", partition=" + partition +
+                ", consumerCommittedOffset=" + consumerCommittedOffset +
+                ", logHeadOffset=" + logHeadOffset +
+                ", lag=" + lag +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KafkaOffsetLagResult that = (KafkaOffsetLagResult) o;
+
+        if (partition != that.partition) return false;
+        if (consumerCommittedOffset != that.consumerCommittedOffset) return false;
+        if (logHeadOffset != that.logHeadOffset) return false;
+        if (lag != that.lag) return false;
+        return !(topic != null ? !topic.equals(that.topic) : that.topic != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topic != null ? topic.hashCode() : 0;
+        result = 31 * result + partition;
+        result = 31 * result + (int) (consumerCommittedOffset ^ (consumerCommittedOffset >>> 32));
+        result = 31 * result + (int) (logHeadOffset ^ (logHeadOffset >>> 32));
+        result = 31 * result + (int) (lag ^ (lag >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toJSONString() {
+        return "{\"topic\":\"" + topic + "\",\"partition\":" + partition + ",\"consumerCommittedOffset\":" + consumerCommittedOffset + "," +
+                "\"logHeadOffset\":" + logHeadOffset + ",\"lag\":" + lag + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
new file mode 100644
index 0000000..a4436f9
--- /dev/null
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.json.simple.JSONValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class for querying offset lag for kafka spout
+ */
+public class KafkaOffsetLagUtil {
+    private static final String OPTION_TOPIC_SHORT = "t";
+    private static final String OPTION_TOPIC_LONG = "topics";
+    private static final String OPTION_OLD_CONSUMER_SHORT = "o";
+    private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
+    private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
+    private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers";
+    private static final String OPTION_GROUP_ID_SHORT = "g";
+    private static final String OPTION_GROUP_ID_LONG = "groupid";
+    private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
+    private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic";
+    private static final String OPTION_PARTITIONS_SHORT = "p";
+    private static final String OPTION_PARTITIONS_LONG = "partitions";
+    private static final String OPTION_LEADERS_SHORT = "l";
+    private static final String OPTION_LEADERS_LONG = "leaders";
+    private static final String OPTION_ZK_SERVERS_SHORT = "z";
+    private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
+    private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
+    private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
+    private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
+    private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
+
+    public static void main (String args[]) {
+        try {
+            List<KafkaOffsetLagResult> results;
+            Options options = buildOptions();
+            CommandLineParser parser = new DefaultParser();
+            CommandLine commandLine = parser.parse(options, args);
+            if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
+                printUsageAndExit(options, OPTION_TOPIC_LONG + " is required");
+            }
+            if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
+                OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
+                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
+                            OPTION_OLD_CONSUMER_LONG);
+                }
+                if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
+                    printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
+                            OPTION_OLD_CONSUMER_LONG);
+                }
+                String[] topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
+                if (topics != null && topics.length > 1) {
+                    printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic or a " +
+                            "wildcard string for matching topics is supported");
+                }
+                if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
+                    if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) {
+                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " or " + OPTION_LEADERS_LONG + " is not accepted with " +
+                                OPTION_ZK_BROKERS_ROOT_LONG);
+                    }
+                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
+                            (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
+                            (OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
+                } else {
+                    if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) {
+                        printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG);
+                    }
+                    if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) {
+                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " + OPTION_ZK_BROKERS_ROOT_LONG +
+                                " is not provided");
+                    }
+                    String[] partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
+                    String[] leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
+                    if (partitions.length != leaders.length) {
+                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " need to be of same size");
+                    }
+                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
+                            (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue
+                            (OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG));
+                }
+                results = getOffsetLags(oldKafkaSpoutOffsetQuery);
+            } else {
+                String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
+                        OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG};
+                for (String oldOption: oldSpoutOptions) {
+                    if (commandLine.hasOption(oldOption)) {
+                        printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG);
+                    }
+                }
+                if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + OPTION_OLD_CONSUMER_LONG +
+                            " is not specified");
+                }
+                NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
+                        commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
+                results = getOffsetLags(newKafkaSpoutOffsetQuery);
+            }
+            System.out.print(JSONValue.toJSONString(results));
+        } catch (Exception ex) {
+            System.out.print("Unable to get offset lags for kafka. Reason: ");
+            ex.printStackTrace(System.out);
+        }
+    }
+
+    private static void printUsageAndExit (Options options, String message) {
+        System.out.println(message);
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("storm-kafka-monitor ", options);
+        System.exit(1);
+    }
+
+    private static Options buildOptions () {
+        Options options = new Options();
+        options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed " +
+                "offset");
+        options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
+        options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new " +
+                "consumer/spout e.g. hostname1:9092,hostname2:9092");
+        options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) ");
+        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in " +
+                "old spout");
+        options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to " +
+                OPTION_LEADERS_LONG + " for old spout with StaticHosts");
+        options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to " +
+                OPTION_PARTITIONS_LONG + " for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092");
+        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets  " +
+                "and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181");
+        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed" +
+                " offsets without the topic and partition nodes");
+        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " +
+                "/brokers (applicable only for old kafka spout) ");
+        return options;
+    }
+
+    /**
+     *
+     * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
+     * @return log head offset, spout offset and lag for each partition
+     */
+    public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
+        KafkaConsumer<String, String> consumer = null;
+        List<KafkaOffsetLagResult> result = new ArrayList<>();
+        try {
+            Properties props = new Properties();
+            props.put("bootstrap.servers", newKafkaSpoutOffsetQuery.getBootStrapBrokers());
+            props.put("group.id", newKafkaSpoutOffsetQuery.getConsumerGroupId());
+            props.put("enable.auto.commit", "false");
+            props.put("session.timeout.ms", "30000");
+            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            List<TopicPartition> topicPartitionList = new ArrayList<>();
+            consumer = new KafkaConsumer<>(props);
+            for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) {
+                List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
+                if (partitionInfoList != null) {
+                    for (PartitionInfo partitionInfo : partitionInfoList) {
+                        topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+                    }
+                }
+            }
+            consumer.assign(topicPartitionList);
+            for (TopicPartition topicPartition : topicPartitionList) {
+                OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
+                long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.offset() : -1;
+                consumer.seekToEnd(topicPartition);
+                result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, consumer.position(topicPartition)));
+            }
+        } finally {
+            if (consumer != null) {
+                consumer.close();
+            }
+        }
+        return result;
+    }
+
+    /**
+     *
+     * @param oldKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
+     * @return log head offset, spout offset and lag for each partition
+     */
+    public static List<KafkaOffsetLagResult> getOffsetLags (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
+        List<KafkaOffsetLagResult> result = new ArrayList<>();
+        Map<String, List<TopicPartition>> leaders = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
+        if (leaders != null) {
+            Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leaders);
+            Map<String, List<Integer>> topicPartitions = new HashMap<>();
+            for (Map.Entry<String, List<TopicPartition>> entry: leaders.entrySet()) {
+                for (TopicPartition topicPartition: entry.getValue()) {
+                    if (!topicPartitions.containsKey(topicPartition.topic())) {
+                        topicPartitions.put(topicPartition.topic(), new ArrayList<Integer>());
+                    }
+                    topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
+                }
+            }
+            Map<String, Map<Integer, Long>> oldConsumerOffsets = getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
+            for (Map.Entry<String, Map<Integer, Long>> topicOffsets: logHeadOffsets.entrySet()) {
+                for (Map.Entry<Integer, Long> partitionOffsets: topicOffsets.getValue().entrySet()) {
+                    Long consumerCommittedOffset = oldConsumerOffsets.get(topicOffsets.getKey()) != null ? oldConsumerOffsets.get(topicOffsets.getKey()).get
+                            (partitionOffsets.getKey()) : -1;
+                    consumerCommittedOffset = (consumerCommittedOffset == null ? -1 : consumerCommittedOffset);
+                    KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
+                            consumerCommittedOffset, partitionOffsets.getValue());
+                    result.add(kafkaOffsetLagResult);
+                }
+            }
+        }
+        return result;
+    }
+
+    private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
+        Map<String, List<TopicPartition>> result = new HashMap<>();
+        // this means that kafka spout was configured with StaticHosts hosts (leader for partition)
+        if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
+            String[] partitions = oldKafkaSpoutOffsetQuery.getPartitions().split(",");
+            String[] leaders = oldKafkaSpoutOffsetQuery.getLeaders().split(",");
+            for (int i = 0; i < leaders.length; ++i) {
+                if (!result.containsKey(leaders[i])) {
+                    result.put(leaders[i], new ArrayList<TopicPartition>());
+                }
+                result.get(leaders[i]).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(partitions[i])));
+            }
+        } else {
+            // else use zk nodes to figure out partitions and leaders for topics i.e. ZkHosts
+            CuratorFramework curatorFramework = null;
+            try {
+                String brokersZkNode = oldKafkaSpoutOffsetQuery.getBrokersZkPath();
+                if (!brokersZkNode.endsWith("/")) {
+                    brokersZkNode += "/";
+                }
+                String topicsZkPath = brokersZkNode + "topics";
+                curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
+                curatorFramework.start();
+                List<String> topics = new ArrayList<>();
+                if (oldKafkaSpoutOffsetQuery.isWildCardTopic()) {
+                    List<String> children = curatorFramework.getChildren().forPath(topicsZkPath);
+                    for (String child : children) {
+                        if (child.matches(oldKafkaSpoutOffsetQuery.getTopic())) {
+                            topics.add(child);
+                        }
+                    }
+                } else {
+                    topics.add(oldKafkaSpoutOffsetQuery.getTopic());
+                }
+                for (String topic: topics) {
+                    String partitionsPath = topicsZkPath + "/" + topic + "/partitions";
+                    List<String> children = curatorFramework.getChildren().forPath(partitionsPath);
+                    for (int i = 0; i < children.size(); ++i) {
+                        byte[] leaderData = curatorFramework.getData().forPath(partitionsPath + "/" + i + "/state");
+                        Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(leaderData, "UTF-8"));
+                        Integer leader = ((Number) value.get("leader")).intValue();
+                        byte[] brokerData = curatorFramework.getData().forPath(brokersZkNode + "ids/" + leader);
+                        Map<Object, Object> broker = (Map<Object, Object>) JSONValue.parse(new String(brokerData, "UTF-8"));
+                        String host = (String) broker.get("host");
+                        Integer port = ((Long) broker.get("port")).intValue();
+                        String leaderBroker = host + ":" + port;
+                        if (!result.containsKey(leaderBroker)) {
+                            result.put(leaderBroker, new ArrayList<TopicPartition>());
+                        }
+                        result.get(leaderBroker).add(new TopicPartition(topic, i));
+                    }
+                }
+            } finally {
+                if (curatorFramework != null) {
+                    curatorFramework.close();
+                }
+            }
+        }
+        return result;
+    }
+
+    private static Map<String, Map<Integer, Long>> getLogHeadOffsets (Map<String, List<TopicPartition>> leadersAndTopicPartitions) {
+        Map<String, Map<Integer, Long>> result = new HashMap<>();
+        if (leadersAndTopicPartitions != null) {
+            PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
+            SimpleConsumer simpleConsumer  = null;
+            for (Map.Entry<String, List<TopicPartition>> leader: leadersAndTopicPartitions.entrySet()) {
+                try {
+                    simpleConsumer = new SimpleConsumer(leader.getKey().split(":")[0], Integer.parseInt(leader.getKey().split(":")[1]), 10000, 64 *
+                            1024, "LogHeadOffsetRequest");
+                    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+                    for (TopicPartition topicPartition : leader.getValue()) {
+                        requestInfo.put(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()), partitionOffsetRequestInfo);
+                        if (!result.containsKey(topicPartition.topic())) {
+                            result.put(topicPartition.topic(), new HashMap<Integer, Long>());
+                        }
+                    }
+                    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
+                            "LogHeadOffsetRequest");
+                    OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
+                    for (TopicPartition topicPartition : leader.getValue()) {
+                        result.get(topicPartition.topic()).put(topicPartition.partition(), response.offsets(topicPartition.topic(), topicPartition.partition())[0]);
+                    }
+                } finally {
+                    if (simpleConsumer != null) {
+                        simpleConsumer.close();
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk (Map<String, List<Integer>> topicPartitions, OldKafkaSpoutOffsetQuery
+            oldKafkaSpoutOffsetQuery) throws Exception {
+        Map<String, Map<Integer, Long>> result = new HashMap<>();
+        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
+        curatorFramework.start();
+        String partitionPrefix = "partition_";
+        String zkPath = oldKafkaSpoutOffsetQuery.getZkPath();
+        if (!zkPath.endsWith("/")) {
+            zkPath += "/";
+        }
+        byte[] zkData;
+        try {
+            if (topicPartitions != null) {
+                for (Map.Entry<String, List<Integer>> topicEntry: topicPartitions.entrySet()) {
+                    Map<Integer, Long> partitionOffsets = new HashMap<>();
+                    for (Integer partition: topicEntry.getValue()) {
+                        String path = zkPath + (oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + partitionPrefix + partition;
+                        if (curatorFramework.checkExists().forPath(path) != null) {
+                            zkData = curatorFramework.getData().forPath(path);
+                            Map<Object, Object> offsetData = (Map<Object, Object>) JSONValue.parse(new String(zkData, "UTF-8"));
+                            partitionOffsets.put(partition, (Long) offsetData.get("offset"));
+                        }
+                    }
+                    result.put(topicEntry.getKey(), partitionOffsets);
+                }
+            }
+        } finally {
+            if (curatorFramework != null) {
+                curatorFramework.close();
+            }
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
new file mode 100644
index 0000000..8e7354f
--- /dev/null
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.monitor;
+
+/**
+ * Class representing information for querying kafka for log head offsets, consumer offsets and the difference for new kafka spout using new consumer api
+ */
+public class NewKafkaSpoutOffsetQuery {
+    private final String topics; // comma separated list of topics
+    private final String consumerGroupId; // consumer group id for which the offset needs to be calculated
+    private final String bootStrapBrokers; // bootstrap brokers
+
+    public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId) {
+        this.topics = topics;
+        this.bootStrapBrokers = bootstrapBrokers;
+        this.consumerGroupId = consumerGroupId;
+    }
+
+    public String getTopics() {
+        return topics;
+    }
+
+    public String getBootStrapBrokers() {
+        return bootStrapBrokers;
+    }
+
+    public String getConsumerGroupId() {
+        return this.consumerGroupId;
+    }
+
+
+    @Override
+    public String toString() {
+        return "NewKafkaSpoutOffsetQuery{" +
+                "topics='" + topics + '\'' +
+                ", consumerGroupId='" + consumerGroupId + '\'' +
+                ", bootStrapBrokers='" + bootStrapBrokers + '\'' +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        NewKafkaSpoutOffsetQuery that = (NewKafkaSpoutOffsetQuery) o;
+
+        if (topics != null ? !topics.equals(that.topics) : that.topics != null) return false;
+        if (consumerGroupId != null ? !consumerGroupId.equals(that.consumerGroupId) : that.consumerGroupId != null) return false;
+        return !(bootStrapBrokers != null ? !bootStrapBrokers.equals(that.bootStrapBrokers) : that.bootStrapBrokers != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topics != null ? topics.hashCode() : 0;
+        result = 31 * result + (consumerGroupId != null ? consumerGroupId.hashCode() : 0);
+        result = 31 * result + (bootStrapBrokers != null ? bootStrapBrokers.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
new file mode 100644
index 0000000..cdeed32
--- /dev/null
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.monitor;
+
+/**
+ * Class representing information for querying kafka for log head offsets, spout offsets and the difference for old kafka spout
+ */
+public class OldKafkaSpoutOffsetQuery {
+    private final String topic; //single topic or a wildcard topic
+    private final String zkServers; //comma separated list of zk servers and port e.g. hostname1:2181, hostname2:2181
+    private final String zkPath; //zk node prefix without topic/partition where committed offsets are stored
+    private final boolean isWildCardTopic; //if the topic is a wildcard
+    private final String brokersZkPath; //zk node prefix where kafka stores all broker information
+    private final String partitions; //comma separated list of partitions corresponding to leaders below (for StaticHosts)
+    private final String leaders; //comma separated list of leader brokers and port corresponding to the partitions above (for StaticHosts) e.g.
+    // hostname1:9092,hostname2:9092
+
+    public OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath) {
+        this(topic, zkServers, zkPath, isWildCardTopic, brokersZkPath, null, null);
+    }
+
+    public OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, String partitions, String leaders) {
+        this(topic, zkServers, zkPath, false, null, partitions, leaders);
+
+    }
+
+    private OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath, String partitions, String
+            leaders) {
+        this.topic = topic;
+        this.zkServers = zkServers;
+        this.zkPath = zkPath;
+        this.isWildCardTopic = isWildCardTopic;
+        this.brokersZkPath = brokersZkPath;
+        this.partitions = partitions;
+        this.leaders = leaders;
+    }
+
+    @Override
+    public String toString() {
+        return "OldKafkaSpoutOffsetQuery{" +
+                "topic='" + topic + '\'' +
+                ", zkServers='" + zkServers + '\'' +
+                ", zkPath='" + zkPath + '\'' +
+                ", isWildCardTopic=" + isWildCardTopic +
+                ", brokersZkPath='" + brokersZkPath + '\'' +
+                ", partitions='" + partitions + '\'' +
+                ", leaders='" + leaders + '\'' +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        OldKafkaSpoutOffsetQuery that = (OldKafkaSpoutOffsetQuery) o;
+
+        if (isWildCardTopic != that.isWildCardTopic) return false;
+        if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false;
+        if (zkServers != null ? !zkServers.equals(that.zkServers) : that.zkServers != null) return false;
+        if (zkPath != null ? !zkPath.equals(that.zkPath) : that.zkPath != null) return false;
+        if (brokersZkPath != null ? !brokersZkPath.equals(that.brokersZkPath) : that.brokersZkPath != null) return false;
+        if (partitions != null ? !partitions.equals(that.partitions) : that.partitions != null) return false;
+        return !(leaders != null ? !leaders.equals(that.leaders) : that.leaders != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topic != null ? topic.hashCode() : 0;
+        result = 31 * result + (zkServers != null ? zkServers.hashCode() : 0);
+        result = 31 * result + (zkPath != null ? zkPath.hashCode() : 0);
+        result = 31 * result + (isWildCardTopic ? 1 : 0);
+        result = 31 * result + (brokersZkPath != null ? brokersZkPath.hashCode() : 0);
+        result = 31 * result + (partitions != null ? partitions.hashCode() : 0);
+        result = 31 * result + (leaders != null ? leaders.hashCode() : 0);
+        return result;
+    }
+
+    public String getTopic() {
+
+        return topic;
+    }
+
+    public String getZkServers() {
+        return zkServers;
+    }
+
+    public String getZkPath() {
+        return zkPath;
+    }
+
+    public boolean isWildCardTopic() {
+        return isWildCardTopic;
+    }
+
+    public String getBrokersZkPath() {
+        return brokersZkPath;
+    }
+
+    public String getPartitions() {
+        return partitions;
+    }
+
+    public String getLeaders() {
+        return leaders;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 7bacd0b..d2bd313 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
 
 import org.apache.storm.Config;
 import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -188,6 +189,47 @@ public class KafkaSpout extends BaseRichSpout {
         }
     }
 
+    @Override
+    public Map<String, Object> getComponentConfiguration () {
+        Map<String, Object> configuration = super.getComponentConfiguration();
+        if (configuration == null) {
+            configuration = new HashMap<>();
+        }
+        String configKeyPrefix = "config.";
+        configuration.put(configKeyPrefix + "topics", this._spoutConfig.topic);
+        StringBuilder zkServers = new StringBuilder();
+        if (_spoutConfig.zkServers != null && _spoutConfig.zkServers.size() > 0) {
+            for (String zkServer : this._spoutConfig.zkServers) {
+                zkServers.append(zkServer + ":" + this._spoutConfig.zkPort + ",");
+            }
+            configuration.put(configKeyPrefix + "zkServers", zkServers.toString());
+        }
+        BrokerHosts brokerHosts = this._spoutConfig.hosts;
+        String zkRoot = this._spoutConfig.zkRoot + "/" + this._spoutConfig.id;
+        if (brokerHosts instanceof ZkHosts) {
+            ZkHosts zkHosts = (ZkHosts) brokerHosts;
+            configuration.put(configKeyPrefix + "zkNodeBrokers", zkHosts.brokerZkPath);
+        } else if (brokerHosts instanceof StaticHosts) {
+            StaticHosts staticHosts = (StaticHosts) brokerHosts;
+            GlobalPartitionInformation globalPartitionInformation = staticHosts.getPartitionInformation();
+            boolean useTopicNameForPath = globalPartitionInformation.getbUseTopicNameForPartitionPathId();
+            if (useTopicNameForPath) {
+                zkRoot += ("/" + this._spoutConfig.topic);
+            }
+            List<Partition> partitions = globalPartitionInformation.getOrderedPartitions();
+            StringBuilder staticPartitions = new StringBuilder();
+            StringBuilder leaderHosts = new StringBuilder();
+            for (Partition partition: partitions) {
+                staticPartitions.append(partition.partition + ",");
+                leaderHosts.append(partition.host.host + ":" + partition.host.port).append(",");
+            }
+            configuration.put(configKeyPrefix + "partitions", staticPartitions.toString());
+            configuration.put(configKeyPrefix + "leaders", leaderHosts.toString());
+        }
+        configuration.put(configKeyPrefix + "zkRoot", zkRoot);
+        return configuration;
+    }
+
     private void commit() {
         _lastUpdateMs = System.currentTimeMillis();
         for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
index 3108ff8..e420cb3 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
@@ -49,6 +49,10 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ
         partitionMap.put(partitionId, broker);
     }
 
+    public Boolean getbUseTopicNameForPartitionPathId () {
+        return bUseTopicNameForPartitionPathId;
+    }
+
     @Override
     public String toString() {
         return "GlobalPartitionInformation{" +

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7c7031..49a5c5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -285,6 +285,7 @@
         <module>examples/storm-starter</module>
         <module>external/storm-kafka-client</module>
         <module>external/storm-opentsdb</module>
+        <module>external/storm-kafka-monitor</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 3099ca7..f0162f0 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -26,7 +26,7 @@
   (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
                                               ACKER-FAIL-STREAM-ID mk-authorization-handler
                                               start-metrics-reporters]]])
-  (:import [org.apache.storm.utils Utils]
+  (:import [org.apache.storm.utils Utils TopologySpoutLag]
            [org.apache.storm.generated NimbusSummary])
   (:use [clojure.string :only [blank? lower-case trim split]])
   (:import [org.apache.storm.generated ExecutorSpecificStats
@@ -66,6 +66,7 @@
 (defmeter ui:num-all-topologies-summary-http-requests)
 (defmeter ui:num-topology-page-http-requests)
 (defmeter ui:num-topology-metric-http-requests)
+(defmeter ui:num-topology-lag-http-requests)
 (defmeter ui:num-build-visualization-http-requests)
 (defmeter ui:num-mk-visualization-data-http-requests)
 (defmeter ui:num-component-page-http-requests)
@@ -718,6 +719,12 @@
           merged-bolt-stats (map (fn [[k v]] (merge-executor-stats window k v)) bolt-comp-summs)]
       (merge {"window" window "window-hint" window-hint "spouts" merged-spout-stats "bolts" merged-bolt-stats}))))
 
+(defn topology-lag [id topology-conf]
+  (thrift/with-configured-nimbus-connection nimbus
+    (let [topology (.getUserTopology ^Nimbus$Client nimbus
+                                               id)]
+      (TopologySpoutLag/lag topology topology-conf))))
+
 (defn component-errors
   [errors-list topology-id secure?]
   (let [errors (->> errors-list
@@ -1057,6 +1064,12 @@
     (populate-context! servlet-request)
     (assert-authorized-user "getTopology" (topology-config id))
     (json-response (topology-metrics-page id (:window m) (check-include-sys? (:sys m))) (:callback m)))
+  (GET "/api/v1/topology/:id/lag" [:as {:keys [cookies servlet-request scheme]} id & m]
+    (mark! ui:num-topology-lag-http-requests)
+    (populate-context! servlet-request)
+    (let [topology-conf (topology-config id)]
+      (assert-authorized-user "getUserTopology" topology-conf)
+      (json-response (topology-lag id topology-conf) nil)))
   (GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m]
     (mark! ui:num-build-visualization-http-requests)
     (populate-context! servlet-request)

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
new file mode 100644
index 0000000..d1f3659
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TopologySpoutLag {
+    private static final String SPOUT_ID = "spoutId";
+    private static final String SPOUT_TYPE= "spoutType";
+    private static final String SPOUT_LAG_RESULT = "spoutLagResult";
+    private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
+
+    public static List<Map<String, Object>> lag (StormTopology stormTopology, Map topologyConf) {
+        List<Map<String, Object>> result = new ArrayList<>();
+        Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
+        Object object = null;
+        for (Map.Entry<String, SpoutSpec> spout: spouts.entrySet()) {
+            try {
+                SpoutSpec spoutSpec = spout.getValue();
+                ComponentObject componentObject = spoutSpec.get_spout_object();
+                object = Utils.getSetComponentObject(componentObject);
+                if (object.getClass().getCanonicalName().endsWith("storm.kafka.spout.KafkaSpout")) {
+                    result.add(getLagResultForNewKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
+                } else if (object.getClass().getCanonicalName().endsWith("storm.kafka.KafkaSpout")) {
+                    result.add(getLagResultForOldKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
+                }
+            } catch (Exception e) {
+                logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey() + " and spout class: " + object.getClass().getCanonicalName());
+                logger.warn("Exception message:" + e.getMessage());
+            }
+        }
+        return result;
+    }
+
+    private static List<String> getCommandLineOptionsForNewKafkaSpout (Map<String, Object> jsonConf) {
+        List<String> commands = new ArrayList<>();
+        String configKeyPrefix = "config.";
+        commands.add("-t");
+        commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
+        commands.add("-g");
+        commands.add((String)jsonConf.get(configKeyPrefix + "groupid"));
+        commands.add("-b");
+        commands.add((String)jsonConf.get(configKeyPrefix + "bootstrap.servers"));
+        return commands;
+    }
+
+    private static List<String> getCommandLineOptionsForOldKafkaSpout (Map<String, Object> jsonConf, Map topologyConf) {
+        List<String> commands = new ArrayList<>();
+        String configKeyPrefix = "config.";
+        commands.add("-o");
+        commands.add("-t");
+        commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
+        commands.add("-n");
+        commands.add((String)jsonConf.get(configKeyPrefix + "zkRoot"));
+        String zkServers = (String)jsonConf.get(configKeyPrefix + "zkServers");
+        if (zkServers == null || zkServers.isEmpty()) {
+            StringBuilder zkServersBuilder = new StringBuilder();
+            Integer zkPort = ((Number) topologyConf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+            for (String zkServer: (List<String>) topologyConf.get(Config.STORM_ZOOKEEPER_SERVERS)) {
+                zkServersBuilder.append(zkServer + ":" + zkPort + ",");
+            }
+            zkServers = zkServersBuilder.toString();
+        }
+        commands.add("-z");
+        commands.add(zkServers);
+        if (jsonConf.get(configKeyPrefix + "leaders") != null) {
+            commands.add("-p");
+            commands.add((String)jsonConf.get(configKeyPrefix + "partitions"));
+            commands.add("-l");
+            commands.add((String)jsonConf.get(configKeyPrefix + "leaders"));
+        } else {
+            commands.add("-r");
+            commands.add((String)jsonConf.get(configKeyPrefix + "zkNodeBrokers"));
+            Boolean isWildCard = (Boolean) topologyConf.get("kafka.topic.wildcard.match");
+            if (isWildCard != null && isWildCard.booleanValue()) {
+                commands.add("-w");
+            }
+        }
+        return commands;
+    }
+
+    private static Map<String, Object> getLagResultForKafka (String spoutId, SpoutSpec spoutSpec, Map topologyConf, boolean old) throws IOException {
+        ComponentCommon componentCommon = spoutSpec.get_common();
+        String json = componentCommon.get_json_conf();
+        String result = "Offset lags for kafka not supported for older versions. Please update kafka spout to latest version.";
+        if (json != null && !json.isEmpty()) {
+            List<String> commands = new ArrayList<>();
+            String stormHomeDir = System.getenv("STORM_BASE_DIR");
+            if (stormHomeDir != null && !stormHomeDir.endsWith("/")) {
+                stormHomeDir += File.separator;
+            }
+            commands.add(stormHomeDir != null ? stormHomeDir + "bin" + File.separator + "storm-kafka-monitor" : "storm-kafka-monitor");
+            Map<String, Object> jsonMap = (Map<String, Object>) JSONValue.parse(json);
+            commands.addAll(old ? getCommandLineOptionsForOldKafkaSpout(jsonMap, topologyConf) : getCommandLineOptionsForNewKafkaSpout(jsonMap));
+            result = ShellUtils.execCommand(commands.toArray(new String[0]));
+        }
+        Map<String, Object> kafkaSpoutLagInfo = new HashMap<>();
+        kafkaSpoutLagInfo.put(SPOUT_ID, spoutId);
+        kafkaSpoutLagInfo.put(SPOUT_TYPE, "KAFKA");
+        kafkaSpoutLagInfo.put(SPOUT_LAG_RESULT, result);
+        return kafkaSpoutLagInfo;
+    }
+
+    private static Map<String, Object> getLagResultForNewKafkaSpout (String spoutId, SpoutSpec spoutSpec, Map topologyConf) throws IOException {
+        return getLagResultForKafka(spoutId, spoutSpec, topologyConf, false);
+    }
+
+    private static Map<String, Object> getLagResultForOldKafkaSpout (String spoutId, SpoutSpec spoutSpec, Map topologyConf) throws IOException {
+        return getLagResultForKafka(spoutId, spoutSpec, topologyConf, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html
index 917831a..c83bd6d 100644
--- a/storm-core/src/ui/public/templates/topology-page-template.html
+++ b/storm-core/src/ui/public/templates/topology-page-template.html
@@ -204,6 +204,92 @@
   </table>
 </script>
 
+<script id="topology-kafka-spouts-lag-template" type="text/html">
+  <h2>Kafka Spouts Lag</h2>
+  <table class="table table-striped compact" id="topology-kafka-spouts-lag-table">
+    <thead>
+      <tr>
+        <th>
+          <span data-toggle="tooltip" data-placement="right" title="Kafka spout id">
+            Id
+          </span>
+        </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="above" title="Topic">
+            Topic
+          </span>
+        </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="above" title="Partition">
+            Partition
+          </span>
+        </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="above" title="Latest Offset">
+            Latest Offset
+          </span>
+        </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="above" title="Offset of last spout message successfully acked">
+            Spout Committed Offset
+          </span>
+        </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="left" title="Lag">
+            Lag
+          </span>
+        </th>
+      </tr>
+    </thead>
+    <tbody>
+      {{#kafkaSpoutsLagResults}}
+      <tr>
+        <td>{{id}}</td>
+        <td>{{topic}}</td>
+        <td>{{partition}}</td>
+        <td>{{logHeadOffset}}</td>
+        <td>{{consumerCommittedOffset}}</td>
+        <td>{{lag}}</td>
+      </tr>
+      {{/kafkaSpoutsLagResults}}
+    </tbody>
+  </table>
+</script>
+
+<script id="topology-spouts-lag-error-template" type="text/html">
+  <h2>Topology spouts lag error</h2>
+  <table class="table table-striped compact" id="topology-spouts-lag-error-table">
+    <thead>
+      <tr>
+        <th>
+          <span data-toggle="tooltip" data-placement="right" title="Spout id">
+            Id
+          </span>
+        </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="above" title="Type of spout">
+            Type
+          </span>
+        </th>
+        <th>
+          <span data-toggle="tooltip" data-placement="left" title="Message to denote the reason for failure to get the lag">
+            Message
+          </span>
+        </th>
+      </tr>
+    </thead>
+    <tbody>
+      {{#spoutsLagErrorResults}}
+      <tr>
+        <td>{{spoutId}}</td>
+        <td>{{spoutType}}</td>
+        <td>{{spoutLagResult}}</td>
+      </tr>
+      {{/spoutsLagErrorResults}}
+    </tbody>
+  </table>
+</script>
+
 <script id="topology-visualization-container-template" type="text/html">
   <p>
     <table class="table table-striped compact">

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index 061fd91..bb11933 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -70,6 +70,10 @@
     <div id="topology-stats" class="col-md-12"></div>
   </div>
   <div class="row">
+    <div id="topology-spouts-lag" class="col-md-12"></div>
+  </div>
+  </div>
+  <div class="row">
     <div id="spout-stats" class="col-md-12"></div>
   </div>
   <div class="row">
@@ -277,6 +281,7 @@ $(document).ready(function() {
         var topologySummary = $("#topology-summary");
         var topologyResources = $("#topology-resources");
         var topologyStats = $("#topology-stats");
+        var topologySpoutsLag = $("#topology-spouts-lag");
         var spoutStats = $("#spout-stats");
         var boltStats = $("#bolt-stats");
         var config = $("#topology-configuration");
@@ -384,8 +389,51 @@ $(document).ready(function() {
             $('#topology-configuration [data-toggle="tooltip"]').tooltip();
             $('#topology-actions [data-toggle="tooltip"]').tooltip();
             $('#topology-visualization [data-toggle="tooltip"]').tooltip();
+
+            var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
+            $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
+              if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
+                var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
+                var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
+
+                var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
+                var isJson = function (input) {
+                  try {
+                    JSON.parse(input);
+                  } catch (e) {
+                    return false;
+                  }
+                  return true;
+                };
+                var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
+                var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
+                var data = {};
+                if (kafkaSpoutsValidResults.length > 0) {
+                  data.kafkaSpoutsLagResults = [];
+                  kafkaSpoutsValidResults.forEach(function(ele) {
+                    var spoutLagResult = JSON.parse(ele.spoutLagResult);
+                    spoutLagResult.forEach(function(ele2) {
+                      data.kafkaSpoutsLagResults.push({
+                        id: ele.spoutId,
+                        topic: ele2.topic,
+                        partition: ele2.partition,
+                        logHeadOffset: ele2.logHeadOffset,
+                        consumerCommittedOffset: ele2.consumerCommittedOffset,
+                        lag: ele2.lag
+                      });
+                    });
+                  });
+                  topologySpoutsLag.append(Mustache.render(kafkaSpoutsLagTemplate,data));
+                }
+                if (kafkaSpoutsErrorResults.length > 0) {
+                  data.spoutsLagErrorResults = kafkaSpoutsErrorResults;
+                  topologySpoutsLag.append(Mustache.render(spoutsErrorTemplate,data));
+                }
+              }
+            });
       }});
     });
+
  });
 </script>
 </html>

http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index af7e226..e37e2ce 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -340,10 +340,10 @@
         </fileSet>
         <!-- $STORM_HOME/extlib -->
         <fileSet>
-            <directory></directory>
+            <directory>${project.basedir}/../../external/storm-kafka-monitor/target</directory>
             <outputDirectory>extlib</outputDirectory>
             <includes>
-                <include></include>
+                <include>storm*jar</include>
             </includes>
         </fileSet>
         <!-- $STORM_HOME/extlib-daemon, for daemons only -->


[3/3] storm git commit: Added STORM-1136 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1136 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e7f478ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e7f478ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e7f478ca

Branch: refs/heads/1.x-branch
Commit: e7f478cad0468ab7c079a008c566285762584e5e
Parents: 3e571c3
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Jun 20 17:43:46 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Jun 20 17:43:46 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e7f478ca/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d336b54..3153036 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui
  * STORM-1911: IClusterMetricsConsumer should use seconds to timestamp unit
  * STORM-1849: HDFSFileTopology should use the 3rd argument as topologyName
  * STORM-1906: Window count/length of zero should be disallowed


[2/3] storm git commit: STORM-1136: Addressing review comment.

Posted by sr...@apache.org.
STORM-1136: Addressing review comment.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e571c3b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e571c3b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e571c3b

Branch: refs/heads/1.x-branch
Commit: 3e571c3b0f800737681f4a43bd523d90b9c69536
Parents: c0a706a
Author: Priyank <ps...@hortonworks.com>
Authored: Thu Jun 9 12:02:14 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Mon Jun 20 17:28:11 2016 -0700

----------------------------------------------------------------------
 storm-core/src/ui/public/topology.html | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3e571c3b/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index bb11933..60b68f5 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -69,6 +69,7 @@
   <div class="row">
     <div id="topology-stats" class="col-md-12"></div>
   </div>
+  <!-- This gets populated only if the spouts lag data is available. Currently only kafka spout is supported-->
   <div class="row">
     <div id="topology-spouts-lag" class="col-md-12"></div>
   </div>