You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/06/21 01:09:49 UTC
[1/3] storm git commit: STORM-1136: Command line module to return
kafka spout offsets lag and display in storm ui.
Repository: storm
Updated Branches:
refs/heads/1.x-branch 382c3811a -> e7f478cad
STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c0a706a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c0a706a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c0a706a3
Branch: refs/heads/1.x-branch
Commit: c0a706a3de1c3eab6b6da20e934364c1737c3fac
Parents: 382c381
Author: Priyank <ps...@hortonworks.com>
Authored: Wed May 18 12:14:50 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Mon Jun 20 17:28:07 2016 -0700
----------------------------------------------------------------------
bin/storm-kafka-monitor | 43 +++
.../apache/storm/kafka/spout/KafkaSpout.java | 17 +
external/storm-kafka-monitor/README.md | 41 ++
external/storm-kafka-monitor/pom.xml | 127 +++++++
.../kafka/monitor/KafkaOffsetLagResult.java | 102 +++++
.../storm/kafka/monitor/KafkaOffsetLagUtil.java | 376 +++++++++++++++++++
.../kafka/monitor/NewKafkaSpoutOffsetQuery.java | 76 ++++
.../kafka/monitor/OldKafkaSpoutOffsetQuery.java | 124 ++++++
.../jvm/org/apache/storm/kafka/KafkaSpout.java | 42 +++
.../trident/GlobalPartitionInformation.java | 4 +
pom.xml | 1 +
storm-core/src/clj/org/apache/storm/ui/core.clj | 15 +-
.../apache/storm/utils/TopologySpoutLag.java | 141 +++++++
.../templates/topology-page-template.html | 86 +++++
storm-core/src/ui/public/topology.html | 48 +++
storm-dist/binary/src/main/assembly/binary.xml | 4 +-
16 files changed, 1244 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/bin/storm-kafka-monitor
----------------------------------------------------------------------
diff --git a/bin/storm-kafka-monitor b/bin/storm-kafka-monitor
new file mode 100755
index 0000000..146152a
--- /dev/null
+++ b/bin/storm-kafka-monitor
@@ -0,0 +1,43 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Resolve links - $0 may be a softlink
+PRG="${0}"
+
+while [ -h "${PRG}" ]; do
+ ls=`ls -ld "${PRG}"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "${PRG}"`/"$link"
+ fi
+done
+
+STORM_BIN_DIR=`dirname ${PRG}`
+export STORM_BASE_DIR=`cd ${STORM_BIN_DIR}/..;pwd`
+
+# Which java to use
+if [ -z "$JAVA_HOME" ]; then
+ JAVA="java"
+else
+ JAVA="$JAVA_HOME/bin/java"
+fi
+
+exec "$JAVA" -cp "$STORM_BASE_DIR/extlib/*" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index d211ae9..bba245b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -381,6 +381,23 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
return "{acked=" + acked + "} ";
}
+ @Override
+ public Map<String, Object> getComponentConfiguration () {
+ Map<String, Object> configuration = super.getComponentConfiguration();
+ if (configuration == null) {
+ configuration = new HashMap<>();
+ }
+ String configKeyPrefix = "config.";
+ StringBuilder topics = new StringBuilder();
+ for (String topic: this.kafkaSpoutConfig.getSubscribedTopics()) {
+ topics.append(topic).append(",");
+ }
+ configuration.put(configKeyPrefix + "topics", topics.toString());
+ configuration.put(configKeyPrefix + "groupid", this.kafkaSpoutConfig.getConsumerGroupId());
+ configuration.put(configKeyPrefix + "bootstrap.servers", this.kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
+ return configuration;
+ }
+
// ======= Offsets Commit Management ==========
private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/README.md b/external/storm-kafka-monitor/README.md
new file mode 100644
index 0000000..4026baf
--- /dev/null
+++ b/external/storm-kafka-monitor/README.md
@@ -0,0 +1,41 @@
+#Storm Kafka Monitor
+
+Tool to query kafka spout lags and show in Storm UI
+
+## Usage
+This tool provides a way to query kafka offsets that the spout has consumed successfully and the latest
+offsets in kafka. It provides an easy way to see how the topology is performing. It is a command line
+interface called storm-kafka-monitor in the bin directory. The results have also been included in storm
+ui on the topology page. It supports both new and the old kafka spout. Please execute the command
+line without any options to see usage.
+
+```java
+$STORM_HOME_DIR/bin/storm-kafka-monitor
+```
+
+## Future Work
+The offset lag calculation support for trident kafka spouts will be added soon.
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/pom.xml b/external/storm-kafka-monitor/pom.xml
new file mode 100644
index 0000000..0ddb156
--- /dev/null
+++ b/external/storm-kafka-monitor/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.0.2-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>storm-kafka-monitor</artifactId>
+ <name>storm-kafka-monitor</name>
+
+
+ <packaging>jar</packaging>
+
+ <developers>
+ <developer>
+ <id>pshah</id>
+ <name>Priyank Shah</name>
+ <email>priyank5485@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <!--kafka libraries-->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>${kafka.artifact.id}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+ <shadedPattern>org.apache.kafka.shaded</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>kafka</pattern>
+ <shadedPattern>kafka.shaded</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>other.kafka</pattern>
+ <shadedPattern>other.kafka.shaded</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.slf4j.impl</pattern>
+ <shadedPattern>org.slf4j.impl.shaded</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>org.apache.curator</pattern>
+ <shadedPattern>org.apache.curator.shaded</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.json.simple</pattern>
+ <shadedPattern>org.json.simple.shaded</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons.cli</pattern>
+ <shadedPattern>org.apache.commons.cli.shaded</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
new file mode 100644
index 0000000..a6d898c
--- /dev/null
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import org.json.simple.JSONAware;
+
+/**
+ * Class representing the log head offsets, spout offsets and the lag for a topic
+ */
+public class KafkaOffsetLagResult implements JSONAware {
+ private String topic;
+ private int partition;
+ private long consumerCommittedOffset;
+ private long logHeadOffset;
+ private long lag;
+
+ public KafkaOffsetLagResult(String topic, int parition, long consumerCommittedOffset, long logHeadOffset) {
+ this.topic = topic;
+ this.partition = parition;
+ this.consumerCommittedOffset = consumerCommittedOffset;
+ this.logHeadOffset = logHeadOffset;
+ this.lag = this.logHeadOffset - this.consumerCommittedOffset;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public long getConsumerCommittedOffset() {
+ return consumerCommittedOffset;
+ }
+
+ public long getLogHeadOffset() {
+ return logHeadOffset;
+ }
+
+ public long getLag() {
+ return lag;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaOffsetLagResult{" +
+ "topic='" + topic + '\'' +
+ ", partition=" + partition +
+ ", consumerCommittedOffset=" + consumerCommittedOffset +
+ ", logHeadOffset=" + logHeadOffset +
+ ", lag=" + lag +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ KafkaOffsetLagResult that = (KafkaOffsetLagResult) o;
+
+ if (partition != that.partition) return false;
+ if (consumerCommittedOffset != that.consumerCommittedOffset) return false;
+ if (logHeadOffset != that.logHeadOffset) return false;
+ if (lag != that.lag) return false;
+ return !(topic != null ? !topic.equals(that.topic) : that.topic != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topic != null ? topic.hashCode() : 0;
+ result = 31 * result + partition;
+ result = 31 * result + (int) (consumerCommittedOffset ^ (consumerCommittedOffset >>> 32));
+ result = 31 * result + (int) (logHeadOffset ^ (logHeadOffset >>> 32));
+ result = 31 * result + (int) (lag ^ (lag >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toJSONString() {
+ return "{\"topic\":\"" + topic + "\",\"partition\":" + partition + ",\"consumerCommittedOffset\":" + consumerCommittedOffset + "," +
+ "\"logHeadOffset\":" + logHeadOffset + ",\"lag\":" + lag + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
new file mode 100644
index 0000000..a4436f9
--- /dev/null
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.json.simple.JSONValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class for querying offset lag for kafka spout
+ */
+public class KafkaOffsetLagUtil {
+ private static final String OPTION_TOPIC_SHORT = "t";
+ private static final String OPTION_TOPIC_LONG = "topics";
+ private static final String OPTION_OLD_CONSUMER_SHORT = "o";
+ private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
+ private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
+ private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers";
+ private static final String OPTION_GROUP_ID_SHORT = "g";
+ private static final String OPTION_GROUP_ID_LONG = "groupid";
+ private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
+ private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic";
+ private static final String OPTION_PARTITIONS_SHORT = "p";
+ private static final String OPTION_PARTITIONS_LONG = "partitions";
+ private static final String OPTION_LEADERS_SHORT = "l";
+ private static final String OPTION_LEADERS_LONG = "leaders";
+ private static final String OPTION_ZK_SERVERS_SHORT = "z";
+ private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
+ private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
+ private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
+ private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
+ private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
+
+ public static void main (String args[]) {
+ try {
+ List<KafkaOffsetLagResult> results;
+ Options options = buildOptions();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine commandLine = parser.parse(options, args);
+ if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
+ printUsageAndExit(options, OPTION_TOPIC_LONG + " is required");
+ }
+ if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
+ OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
+ if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+ printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
+ OPTION_OLD_CONSUMER_LONG);
+ }
+ if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
+ printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " +
+ OPTION_OLD_CONSUMER_LONG);
+ }
+ String[] topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
+ if (topics != null && topics.length > 1) {
+ printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic or a " +
+ "wildcard string for matching topics is supported");
+ }
+ if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
+ if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG + " or " + OPTION_LEADERS_LONG + " is not accepted with " +
+ OPTION_ZK_BROKERS_ROOT_LONG);
+ }
+ oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
+ (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
+ (OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
+ } else {
+ if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) {
+ printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG);
+ }
+ if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " + OPTION_ZK_BROKERS_ROOT_LONG +
+ " is not provided");
+ }
+ String[] partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
+ String[] leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
+ if (partitions.length != leaders.length) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " need to be of same size");
+ }
+ oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
+ (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue
+ (OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG));
+ }
+ results = getOffsetLags(oldKafkaSpoutOffsetQuery);
+ } else {
+ String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
+ OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG};
+ for (String oldOption: oldSpoutOptions) {
+ if (commandLine.hasOption(oldOption)) {
+ printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG);
+ }
+ }
+ if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+ printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + OPTION_OLD_CONSUMER_LONG +
+ " is not specified");
+ }
+ NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
+ commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
+ results = getOffsetLags(newKafkaSpoutOffsetQuery);
+ }
+ System.out.print(JSONValue.toJSONString(results));
+ } catch (Exception ex) {
+ System.out.print("Unable to get offset lags for kafka. Reason: ");
+ ex.printStackTrace(System.out);
+ }
+ }
+
+ private static void printUsageAndExit (Options options, String message) {
+ System.out.println(message);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("storm-kafka-monitor ", options);
+ System.exit(1);
+ }
+
+ private static Options buildOptions () {
+ Options options = new Options();
+ options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed " +
+ "offset");
+ options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
+ options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new " +
+ "consumer/spout e.g. hostname1:9092,hostname2:9092");
+ options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) ");
+ options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in " +
+ "old spout");
+ options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to " +
+ OPTION_LEADERS_LONG + " for old spout with StaticHosts");
+ options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to " +
+ OPTION_PARTITIONS_LONG + " for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092");
+ options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets " +
+ "and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181");
+ options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed" +
+ " offsets without the topic and partition nodes");
+ options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " +
+ "/brokers (applicable only for old kafka spout) ");
+ return options;
+ }
+
+ /**
+ *
+ * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
+ * @return log head offset, spout offset and lag for each partition
+ */
+ public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
+ KafkaConsumer<String, String> consumer = null;
+ List<KafkaOffsetLagResult> result = new ArrayList<>();
+ try {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", newKafkaSpoutOffsetQuery.getBootStrapBrokers());
+ props.put("group.id", newKafkaSpoutOffsetQuery.getConsumerGroupId());
+ props.put("enable.auto.commit", "false");
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ List<TopicPartition> topicPartitionList = new ArrayList<>();
+ consumer = new KafkaConsumer<>(props);
+ for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) {
+ List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
+ if (partitionInfoList != null) {
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ }
+ consumer.assign(topicPartitionList);
+ for (TopicPartition topicPartition : topicPartitionList) {
+ OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
+ long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.offset() : -1;
+ consumer.seekToEnd(topicPartition);
+ result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, consumer.position(topicPartition)));
+ }
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ return result;
+ }
+
+ /**
+ *
+ * @param oldKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
+ * @return log head offset, spout offset and lag for each partition
+ */
+ public static List<KafkaOffsetLagResult> getOffsetLags (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
+ List<KafkaOffsetLagResult> result = new ArrayList<>();
+ Map<String, List<TopicPartition>> leaders = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
+ if (leaders != null) {
+ Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leaders);
+ Map<String, List<Integer>> topicPartitions = new HashMap<>();
+ for (Map.Entry<String, List<TopicPartition>> entry: leaders.entrySet()) {
+ for (TopicPartition topicPartition: entry.getValue()) {
+ if (!topicPartitions.containsKey(topicPartition.topic())) {
+ topicPartitions.put(topicPartition.topic(), new ArrayList<Integer>());
+ }
+ topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
+ }
+ }
+ Map<String, Map<Integer, Long>> oldConsumerOffsets = getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
+ for (Map.Entry<String, Map<Integer, Long>> topicOffsets: logHeadOffsets.entrySet()) {
+ for (Map.Entry<Integer, Long> partitionOffsets: topicOffsets.getValue().entrySet()) {
+ Long consumerCommittedOffset = oldConsumerOffsets.get(topicOffsets.getKey()) != null ? oldConsumerOffsets.get(topicOffsets.getKey()).get
+ (partitionOffsets.getKey()) : -1;
+ consumerCommittedOffset = (consumerCommittedOffset == null ? -1 : consumerCommittedOffset);
+ KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
+ consumerCommittedOffset, partitionOffsets.getValue());
+ result.add(kafkaOffsetLagResult);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
+ Map<String, List<TopicPartition>> result = new HashMap<>();
+ // this means that kafka spout was configured with StaticHosts hosts (leader for partition)
+ if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
+ String[] partitions = oldKafkaSpoutOffsetQuery.getPartitions().split(",");
+ String[] leaders = oldKafkaSpoutOffsetQuery.getLeaders().split(",");
+ for (int i = 0; i < leaders.length; ++i) {
+ if (!result.containsKey(leaders[i])) {
+ result.put(leaders[i], new ArrayList<TopicPartition>());
+ }
+ result.get(leaders[i]).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(partitions[i])));
+ }
+ } else {
+ // else use zk nodes to figure out partitions and leaders for topics i.e. ZkHosts
+ CuratorFramework curatorFramework = null;
+ try {
+ String brokersZkNode = oldKafkaSpoutOffsetQuery.getBrokersZkPath();
+ if (!brokersZkNode.endsWith("/")) {
+ brokersZkNode += "/";
+ }
+ String topicsZkPath = brokersZkNode + "topics";
+ curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
+ curatorFramework.start();
+ List<String> topics = new ArrayList<>();
+ if (oldKafkaSpoutOffsetQuery.isWildCardTopic()) {
+ List<String> children = curatorFramework.getChildren().forPath(topicsZkPath);
+ for (String child : children) {
+ if (child.matches(oldKafkaSpoutOffsetQuery.getTopic())) {
+ topics.add(child);
+ }
+ }
+ } else {
+ topics.add(oldKafkaSpoutOffsetQuery.getTopic());
+ }
+ for (String topic: topics) {
+ String partitionsPath = topicsZkPath + "/" + topic + "/partitions";
+ List<String> children = curatorFramework.getChildren().forPath(partitionsPath);
+ for (int i = 0; i < children.size(); ++i) {
+ byte[] leaderData = curatorFramework.getData().forPath(partitionsPath + "/" + i + "/state");
+ Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(leaderData, "UTF-8"));
+ Integer leader = ((Number) value.get("leader")).intValue();
+ byte[] brokerData = curatorFramework.getData().forPath(brokersZkNode + "ids/" + leader);
+ Map<Object, Object> broker = (Map<Object, Object>) JSONValue.parse(new String(brokerData, "UTF-8"));
+ String host = (String) broker.get("host");
+ Integer port = ((Long) broker.get("port")).intValue();
+ String leaderBroker = host + ":" + port;
+ if (!result.containsKey(leaderBroker)) {
+ result.put(leaderBroker, new ArrayList<TopicPartition>());
+ }
+ result.get(leaderBroker).add(new TopicPartition(topic, i));
+ }
+ }
+ } finally {
+ if (curatorFramework != null) {
+ curatorFramework.close();
+ }
+ }
+ }
+ return result;
+ }
+
+ private static Map<String, Map<Integer, Long>> getLogHeadOffsets (Map<String, List<TopicPartition>> leadersAndTopicPartitions) {
+ Map<String, Map<Integer, Long>> result = new HashMap<>();
+ if (leadersAndTopicPartitions != null) {
+ PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
+ SimpleConsumer simpleConsumer = null;
+ for (Map.Entry<String, List<TopicPartition>> leader: leadersAndTopicPartitions.entrySet()) {
+ try {
+ simpleConsumer = new SimpleConsumer(leader.getKey().split(":")[0], Integer.parseInt(leader.getKey().split(":")[1]), 10000, 64 *
+ 1024, "LogHeadOffsetRequest");
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ for (TopicPartition topicPartition : leader.getValue()) {
+ requestInfo.put(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()), partitionOffsetRequestInfo);
+ if (!result.containsKey(topicPartition.topic())) {
+ result.put(topicPartition.topic(), new HashMap<Integer, Long>());
+ }
+ }
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
+ "LogHeadOffsetRequest");
+ OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
+ for (TopicPartition topicPartition : leader.getValue()) {
+ result.get(topicPartition.topic()).put(topicPartition.partition(), response.offsets(topicPartition.topic(), topicPartition.partition())[0]);
+ }
+ } finally {
+ if (simpleConsumer != null) {
+ simpleConsumer.close();
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk (Map<String, List<Integer>> topicPartitions, OldKafkaSpoutOffsetQuery
+ oldKafkaSpoutOffsetQuery) throws Exception {
+ Map<String, Map<Integer, Long>> result = new HashMap<>();
+ CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
+ curatorFramework.start();
+ String partitionPrefix = "partition_";
+ String zkPath = oldKafkaSpoutOffsetQuery.getZkPath();
+ if (!zkPath.endsWith("/")) {
+ zkPath += "/";
+ }
+ byte[] zkData;
+ try {
+ if (topicPartitions != null) {
+ for (Map.Entry<String, List<Integer>> topicEntry: topicPartitions.entrySet()) {
+ Map<Integer, Long> partitionOffsets = new HashMap<>();
+ for (Integer partition: topicEntry.getValue()) {
+ String path = zkPath + (oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + partitionPrefix + partition;
+ if (curatorFramework.checkExists().forPath(path) != null) {
+ zkData = curatorFramework.getData().forPath(path);
+ Map<Object, Object> offsetData = (Map<Object, Object>) JSONValue.parse(new String(zkData, "UTF-8"));
+ partitionOffsets.put(partition, (Long) offsetData.get("offset"));
+ }
+ }
+ result.put(topicEntry.getKey(), partitionOffsets);
+ }
+ }
+ } finally {
+ if (curatorFramework != null) {
+ curatorFramework.close();
+ }
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
new file mode 100644
index 0000000..8e7354f
--- /dev/null
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.monitor;
+
+/**
+ * Class representing information for querying kafka for log head offsets, consumer offsets and the difference for new kafka spout using new consumer api
+ */
+public class NewKafkaSpoutOffsetQuery {
+ private final String topics; // comma separated list of topics
+ private final String consumerGroupId; // consumer group id for which the offset needs to be calculated
+ private final String bootStrapBrokers; // bootstrap brokers
+
+ public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId) {
+ this.topics = topics;
+ this.bootStrapBrokers = bootstrapBrokers;
+ this.consumerGroupId = consumerGroupId;
+ }
+
+ public String getTopics() {
+ return topics;
+ }
+
+ public String getBootStrapBrokers() {
+ return bootStrapBrokers;
+ }
+
+ public String getConsumerGroupId() {
+ return this.consumerGroupId;
+ }
+
+
+ @Override
+ public String toString() {
+ return "NewKafkaSpoutOffsetQuery{" +
+ "topics='" + topics + '\'' +
+ ", consumerGroupId='" + consumerGroupId + '\'' +
+ ", bootStrapBrokers='" + bootStrapBrokers + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ NewKafkaSpoutOffsetQuery that = (NewKafkaSpoutOffsetQuery) o;
+
+ if (topics != null ? !topics.equals(that.topics) : that.topics != null) return false;
+ if (consumerGroupId != null ? !consumerGroupId.equals(that.consumerGroupId) : that.consumerGroupId != null) return false;
+ return !(bootStrapBrokers != null ? !bootStrapBrokers.equals(that.bootStrapBrokers) : that.bootStrapBrokers != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topics != null ? topics.hashCode() : 0;
+ result = 31 * result + (consumerGroupId != null ? consumerGroupId.hashCode() : 0);
+ result = 31 * result + (bootStrapBrokers != null ? bootStrapBrokers.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
new file mode 100644
index 0000000..cdeed32
--- /dev/null
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.monitor;
+
+/**
+ * Class representing information for querying kafka for log head offsets, spout offsets and the difference for old kafka spout
+ */
+public class OldKafkaSpoutOffsetQuery {
+ private final String topic; //single topic or a wildcard topic
+ private final String zkServers; //comma separated list of zk servers and port e.g. hostname1:2181, hostname2:2181
+ private final String zkPath; //zk node prefix without topic/partition where committed offsets are stored
+ private final boolean isWildCardTopic; //if the topic is a wildcard
+ private final String brokersZkPath; //zk node prefix where kafka stores all broker information
+ private final String partitions; //comma separated list of partitions corresponding to leaders below (for StaticHosts)
+ private final String leaders; //comma separated list of leader brokers and port corresponding to the partitions above (for StaticHosts) e.g.
+ // hostname1:9092,hostname2:9092
+
+ public OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath) {
+ this(topic, zkServers, zkPath, isWildCardTopic, brokersZkPath, null, null);
+ }
+
+ public OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, String partitions, String leaders) {
+ this(topic, zkServers, zkPath, false, null, partitions, leaders);
+
+ }
+
+ private OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath, String partitions, String
+ leaders) {
+ this.topic = topic;
+ this.zkServers = zkServers;
+ this.zkPath = zkPath;
+ this.isWildCardTopic = isWildCardTopic;
+ this.brokersZkPath = brokersZkPath;
+ this.partitions = partitions;
+ this.leaders = leaders;
+ }
+
+ @Override
+ public String toString() {
+ return "OldKafkaSpoutOffsetQuery{" +
+ "topic='" + topic + '\'' +
+ ", zkServers='" + zkServers + '\'' +
+ ", zkPath='" + zkPath + '\'' +
+ ", isWildCardTopic=" + isWildCardTopic +
+ ", brokersZkPath='" + brokersZkPath + '\'' +
+ ", partitions='" + partitions + '\'' +
+ ", leaders='" + leaders + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ OldKafkaSpoutOffsetQuery that = (OldKafkaSpoutOffsetQuery) o;
+
+ if (isWildCardTopic != that.isWildCardTopic) return false;
+ if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false;
+ if (zkServers != null ? !zkServers.equals(that.zkServers) : that.zkServers != null) return false;
+ if (zkPath != null ? !zkPath.equals(that.zkPath) : that.zkPath != null) return false;
+ if (brokersZkPath != null ? !brokersZkPath.equals(that.brokersZkPath) : that.brokersZkPath != null) return false;
+ if (partitions != null ? !partitions.equals(that.partitions) : that.partitions != null) return false;
+ return !(leaders != null ? !leaders.equals(that.leaders) : that.leaders != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topic != null ? topic.hashCode() : 0;
+ result = 31 * result + (zkServers != null ? zkServers.hashCode() : 0);
+ result = 31 * result + (zkPath != null ? zkPath.hashCode() : 0);
+ result = 31 * result + (isWildCardTopic ? 1 : 0);
+ result = 31 * result + (brokersZkPath != null ? brokersZkPath.hashCode() : 0);
+ result = 31 * result + (partitions != null ? partitions.hashCode() : 0);
+ result = 31 * result + (leaders != null ? leaders.hashCode() : 0);
+ return result;
+ }
+
+ public String getTopic() {
+
+ return topic;
+ }
+
+ public String getZkServers() {
+ return zkServers;
+ }
+
+ public String getZkPath() {
+ return zkPath;
+ }
+
+ public boolean isWildCardTopic() {
+ return isWildCardTopic;
+ }
+
+ public String getBrokersZkPath() {
+ return brokersZkPath;
+ }
+
+ public String getPartitions() {
+ return partitions;
+ }
+
+ public String getLeaders() {
+ return leaders;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 7bacd0b..d2bd313 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
import org.apache.storm.Config;
import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -188,6 +189,47 @@ public class KafkaSpout extends BaseRichSpout {
}
}
+ @Override
+ public Map<String, Object> getComponentConfiguration () {
+ Map<String, Object> configuration = super.getComponentConfiguration();
+ if (configuration == null) {
+ configuration = new HashMap<>();
+ }
+ String configKeyPrefix = "config.";
+ configuration.put(configKeyPrefix + "topics", this._spoutConfig.topic);
+ StringBuilder zkServers = new StringBuilder();
+ if (_spoutConfig.zkServers != null && _spoutConfig.zkServers.size() > 0) {
+ for (String zkServer : this._spoutConfig.zkServers) {
+ zkServers.append(zkServer + ":" + this._spoutConfig.zkPort + ",");
+ }
+ configuration.put(configKeyPrefix + "zkServers", zkServers.toString());
+ }
+ BrokerHosts brokerHosts = this._spoutConfig.hosts;
+ String zkRoot = this._spoutConfig.zkRoot + "/" + this._spoutConfig.id;
+ if (brokerHosts instanceof ZkHosts) {
+ ZkHosts zkHosts = (ZkHosts) brokerHosts;
+ configuration.put(configKeyPrefix + "zkNodeBrokers", zkHosts.brokerZkPath);
+ } else if (brokerHosts instanceof StaticHosts) {
+ StaticHosts staticHosts = (StaticHosts) brokerHosts;
+ GlobalPartitionInformation globalPartitionInformation = staticHosts.getPartitionInformation();
+ boolean useTopicNameForPath = globalPartitionInformation.getbUseTopicNameForPartitionPathId();
+ if (useTopicNameForPath) {
+ zkRoot += ("/" + this._spoutConfig.topic);
+ }
+ List<Partition> partitions = globalPartitionInformation.getOrderedPartitions();
+ StringBuilder staticPartitions = new StringBuilder();
+ StringBuilder leaderHosts = new StringBuilder();
+ for (Partition partition: partitions) {
+ staticPartitions.append(partition.partition + ",");
+ leaderHosts.append(partition.host.host + ":" + partition.host.port).append(",");
+ }
+ configuration.put(configKeyPrefix + "partitions", staticPartitions.toString());
+ configuration.put(configKeyPrefix + "leaders", leaderHosts.toString());
+ }
+ configuration.put(configKeyPrefix + "zkRoot", zkRoot);
+ return configuration;
+ }
+
private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
index 3108ff8..e420cb3 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
@@ -49,6 +49,10 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ
partitionMap.put(partitionId, broker);
}
+ public Boolean getbUseTopicNameForPartitionPathId () {
+ return bUseTopicNameForPartitionPathId;
+ }
+
@Override
public String toString() {
return "GlobalPartitionInformation{" +
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7c7031..49a5c5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -285,6 +285,7 @@
<module>examples/storm-starter</module>
<module>external/storm-kafka-client</module>
<module>external/storm-opentsdb</module>
+ <module>external/storm-kafka-monitor</module>
</modules>
<dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 3099ca7..f0162f0 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -26,7 +26,7 @@
(:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
ACKER-FAIL-STREAM-ID mk-authorization-handler
start-metrics-reporters]]])
- (:import [org.apache.storm.utils Utils]
+ (:import [org.apache.storm.utils Utils TopologySpoutLag]
[org.apache.storm.generated NimbusSummary])
(:use [clojure.string :only [blank? lower-case trim split]])
(:import [org.apache.storm.generated ExecutorSpecificStats
@@ -66,6 +66,7 @@
(defmeter ui:num-all-topologies-summary-http-requests)
(defmeter ui:num-topology-page-http-requests)
(defmeter ui:num-topology-metric-http-requests)
+(defmeter ui:num-topology-lag-http-requests)
(defmeter ui:num-build-visualization-http-requests)
(defmeter ui:num-mk-visualization-data-http-requests)
(defmeter ui:num-component-page-http-requests)
@@ -718,6 +719,12 @@
merged-bolt-stats (map (fn [[k v]] (merge-executor-stats window k v)) bolt-comp-summs)]
(merge {"window" window "window-hint" window-hint "spouts" merged-spout-stats "bolts" merged-bolt-stats}))))
+(defn topology-lag [id topology-conf]
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [topology (.getUserTopology ^Nimbus$Client nimbus
+ id)]
+ (TopologySpoutLag/lag topology topology-conf))))
+
(defn component-errors
[errors-list topology-id secure?]
(let [errors (->> errors-list
@@ -1057,6 +1064,12 @@
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(json-response (topology-metrics-page id (:window m) (check-include-sys? (:sys m))) (:callback m)))
+ (GET "/api/v1/topology/:id/lag" [:as {:keys [cookies servlet-request scheme]} id & m]
+ (mark! ui:num-topology-lag-http-requests)
+ (populate-context! servlet-request)
+ (let [topology-conf (topology-config id)]
+ (assert-authorized-user "getUserTopology" topology-conf)
+ (json-response (topology-lag id topology-conf) nil)))
(GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m]
(mark! ui:num-build-visualization-http-requests)
(populate-context! servlet-request)
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
new file mode 100644
index 0000000..d1f3659
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TopologySpoutLag {
+ private static final String SPOUT_ID = "spoutId";
+ private static final String SPOUT_TYPE= "spoutType";
+ private static final String SPOUT_LAG_RESULT = "spoutLagResult";
+ private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
+
+ public static List<Map<String, Object>> lag (StormTopology stormTopology, Map topologyConf) {
+ List<Map<String, Object>> result = new ArrayList<>();
+ Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
+ Object object = null;
+ for (Map.Entry<String, SpoutSpec> spout: spouts.entrySet()) {
+ try {
+ SpoutSpec spoutSpec = spout.getValue();
+ ComponentObject componentObject = spoutSpec.get_spout_object();
+ object = Utils.getSetComponentObject(componentObject);
+ if (object.getClass().getCanonicalName().endsWith("storm.kafka.spout.KafkaSpout")) {
+ result.add(getLagResultForNewKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
+ } else if (object.getClass().getCanonicalName().endsWith("storm.kafka.KafkaSpout")) {
+ result.add(getLagResultForOldKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
+ }
+ } catch (Exception e) {
+ logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey() + " and spout class: " + object.getClass().getCanonicalName());
+ logger.warn("Exception message:" + e.getMessage());
+ }
+ }
+ return result;
+ }
+
+ private static List<String> getCommandLineOptionsForNewKafkaSpout (Map<String, Object> jsonConf) {
+ List<String> commands = new ArrayList<>();
+ String configKeyPrefix = "config.";
+ commands.add("-t");
+ commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
+ commands.add("-g");
+ commands.add((String)jsonConf.get(configKeyPrefix + "groupid"));
+ commands.add("-b");
+ commands.add((String)jsonConf.get(configKeyPrefix + "bootstrap.servers"));
+ return commands;
+ }
+
+ private static List<String> getCommandLineOptionsForOldKafkaSpout (Map<String, Object> jsonConf, Map topologyConf) {
+ List<String> commands = new ArrayList<>();
+ String configKeyPrefix = "config.";
+ commands.add("-o");
+ commands.add("-t");
+ commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
+ commands.add("-n");
+ commands.add((String)jsonConf.get(configKeyPrefix + "zkRoot"));
+ String zkServers = (String)jsonConf.get(configKeyPrefix + "zkServers");
+ if (zkServers == null || zkServers.isEmpty()) {
+ StringBuilder zkServersBuilder = new StringBuilder();
+ Integer zkPort = ((Number) topologyConf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+ for (String zkServer: (List<String>) topologyConf.get(Config.STORM_ZOOKEEPER_SERVERS)) {
+ zkServersBuilder.append(zkServer + ":" + zkPort + ",");
+ }
+ zkServers = zkServersBuilder.toString();
+ }
+ commands.add("-z");
+ commands.add(zkServers);
+ if (jsonConf.get(configKeyPrefix + "leaders") != null) {
+ commands.add("-p");
+ commands.add((String)jsonConf.get(configKeyPrefix + "partitions"));
+ commands.add("-l");
+ commands.add((String)jsonConf.get(configKeyPrefix + "leaders"));
+ } else {
+ commands.add("-r");
+ commands.add((String)jsonConf.get(configKeyPrefix + "zkNodeBrokers"));
+ Boolean isWildCard = (Boolean) topologyConf.get("kafka.topic.wildcard.match");
+ if (isWildCard != null && isWildCard.booleanValue()) {
+ commands.add("-w");
+ }
+ }
+ return commands;
+ }
+
+ private static Map<String, Object> getLagResultForKafka (String spoutId, SpoutSpec spoutSpec, Map topologyConf, boolean old) throws IOException {
+ ComponentCommon componentCommon = spoutSpec.get_common();
+ String json = componentCommon.get_json_conf();
+ String result = "Offset lags for kafka not supported for older versions. Please update kafka spout to latest version.";
+ if (json != null && !json.isEmpty()) {
+ List<String> commands = new ArrayList<>();
+ String stormHomeDir = System.getenv("STORM_BASE_DIR");
+ if (stormHomeDir != null && !stormHomeDir.endsWith("/")) {
+ stormHomeDir += File.separator;
+ }
+ commands.add(stormHomeDir != null ? stormHomeDir + "bin" + File.separator + "storm-kafka-monitor" : "storm-kafka-monitor");
+ Map<String, Object> jsonMap = (Map<String, Object>) JSONValue.parse(json);
+ commands.addAll(old ? getCommandLineOptionsForOldKafkaSpout(jsonMap, topologyConf) : getCommandLineOptionsForNewKafkaSpout(jsonMap));
+ result = ShellUtils.execCommand(commands.toArray(new String[0]));
+ }
+ Map<String, Object> kafkaSpoutLagInfo = new HashMap<>();
+ kafkaSpoutLagInfo.put(SPOUT_ID, spoutId);
+ kafkaSpoutLagInfo.put(SPOUT_TYPE, "KAFKA");
+ kafkaSpoutLagInfo.put(SPOUT_LAG_RESULT, result);
+ return kafkaSpoutLagInfo;
+ }
+
+ private static Map<String, Object> getLagResultForNewKafkaSpout (String spoutId, SpoutSpec spoutSpec, Map topologyConf) throws IOException {
+ return getLagResultForKafka(spoutId, spoutSpec, topologyConf, false);
+ }
+
+ private static Map<String, Object> getLagResultForOldKafkaSpout (String spoutId, SpoutSpec spoutSpec, Map topologyConf) throws IOException {
+ return getLagResultForKafka(spoutId, spoutSpec, topologyConf, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html
index 917831a..c83bd6d 100644
--- a/storm-core/src/ui/public/templates/topology-page-template.html
+++ b/storm-core/src/ui/public/templates/topology-page-template.html
@@ -204,6 +204,92 @@
</table>
</script>
+<script id="topology-kafka-spouts-lag-template" type="text/html">
+ <h2>Kafka Spouts Lag</h2>
+ <table class="table table-striped compact" id="topology-kafka-spouts-lag-table">
+ <thead>
+ <tr>
+ <th>
+ <span data-toggle="tooltip" data-placement="right" title="Kafka spout id">
+ Id
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Topic">
+ Topic
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Partition">
+ Partition
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Latest Offset">
+ Latest Offset
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Offset of last spout message successfully acked">
+ Spout Committed Offset
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="left" title="Lag">
+ Lag
+ </span>
+ </th>
+ </tr>
+ </thead>
+ <tbody>
+ {{#kafkaSpoutsLagResults}}
+ <tr>
+ <td>{{id}}</td>
+ <td>{{topic}}</td>
+ <td>{{partition}}</td>
+ <td>{{logHeadOffset}}</td>
+ <td>{{consumerCommittedOffset}}</td>
+ <td>{{lag}}</td>
+ </tr>
+ {{/kafkaSpoutsLagResults}}
+ </tbody>
+ </table>
+</script>
+
+<script id="topology-spouts-lag-error-template" type="text/html">
+ <h2>Topology spouts lag error</h2>
+ <table class="table table-striped compact" id="topology-spouts-lag-error-table">
+ <thead>
+ <tr>
+ <th>
+ <span data-toggle="tooltip" data-placement="right" title="Spout id">
+ Id
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="above" title="Type of spout">
+ Type
+ </span>
+ </th>
+ <th>
+ <span data-toggle="tooltip" data-placement="left" title="Message to denote the reason for failure to get the lag">
+ Message
+ </span>
+ </th>
+ </tr>
+ </thead>
+ <tbody>
+ {{#spoutsLagErrorResults}}
+ <tr>
+ <td>{{spoutId}}</td>
+ <td>{{spoutType}}</td>
+ <td>{{spoutLagResult}}</td>
+ </tr>
+ {{/spoutsLagErrorResults}}
+ </tbody>
+ </table>
+</script>
+
<script id="topology-visualization-container-template" type="text/html">
<p>
<table class="table table-striped compact">
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index 061fd91..bb11933 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -70,6 +70,10 @@
<div id="topology-stats" class="col-md-12"></div>
</div>
<div class="row">
+ <div id="topology-spouts-lag" class="col-md-12"></div>
+ </div>
+ </div>
+ <div class="row">
<div id="spout-stats" class="col-md-12"></div>
</div>
<div class="row">
@@ -277,6 +281,7 @@ $(document).ready(function() {
var topologySummary = $("#topology-summary");
var topologyResources = $("#topology-resources");
var topologyStats = $("#topology-stats");
+ var topologySpoutsLag = $("#topology-spouts-lag");
var spoutStats = $("#spout-stats");
var boltStats = $("#bolt-stats");
var config = $("#topology-configuration");
@@ -384,8 +389,51 @@ $(document).ready(function() {
$('#topology-configuration [data-toggle="tooltip"]').tooltip();
$('#topology-actions [data-toggle="tooltip"]').tooltip();
$('#topology-visualization [data-toggle="tooltip"]').tooltip();
+
+ var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
+ $.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
+ if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) {
+ var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html();
+ var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html();
+
+ var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";});
+ var isJson = function (input) {
+ try {
+ JSON.parse(input);
+ } catch (e) {
+ return false;
+ }
+ return true;
+ };
+ var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
+ var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
+ var data = {};
+ if (kafkaSpoutsValidResults.length > 0) {
+ data.kafkaSpoutsLagResults = [];
+ kafkaSpoutsValidResults.forEach(function(ele) {
+ var spoutLagResult = JSON.parse(ele.spoutLagResult);
+ spoutLagResult.forEach(function(ele2) {
+ data.kafkaSpoutsLagResults.push({
+ id: ele.spoutId,
+ topic: ele2.topic,
+ partition: ele2.partition,
+ logHeadOffset: ele2.logHeadOffset,
+ consumerCommittedOffset: ele2.consumerCommittedOffset,
+ lag: ele2.lag
+ });
+ });
+ });
+ topologySpoutsLag.append(Mustache.render(kafkaSpoutsLagTemplate,data));
+ }
+ if (kafkaSpoutsErrorResults.length > 0) {
+ data.spoutsLagErrorResults = kafkaSpoutsErrorResults;
+ topologySpoutsLag.append(Mustache.render(spoutsErrorTemplate,data));
+ }
+ }
+ });
}});
});
+
});
</script>
</html>
http://git-wip-us.apache.org/repos/asf/storm/blob/c0a706a3/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index af7e226..e37e2ce 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -340,10 +340,10 @@
</fileSet>
<!-- $STORM_HOME/extlib -->
<fileSet>
- <directory></directory>
+ <directory>${project.basedir}/../../external/storm-kafka-monitor/target</directory>
<outputDirectory>extlib</outputDirectory>
<includes>
- <include></include>
+ <include>storm*jar</include>
</includes>
</fileSet>
<!-- $STORM_HOME/extlib-daemon, for daemons only -->
[3/3] storm git commit: Added STORM-1136 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1136 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e7f478ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e7f478ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e7f478ca
Branch: refs/heads/1.x-branch
Commit: e7f478cad0468ab7c079a008c566285762584e5e
Parents: 3e571c3
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Jun 20 17:43:46 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Jun 20 17:43:46 2016 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e7f478ca/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d336b54..3153036 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui
* STORM-1911: IClusterMetricsConsumer should use seconds to timestamp unit
* STORM-1849: HDFSFileTopology should use the 3rd argument as topologyName
* STORM-1906: Window count/length of zero should be disallowed
[2/3] storm git commit: STORM-1136: Addressing review comment.
Posted by sr...@apache.org.
STORM-1136: Addressing review comment.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e571c3b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e571c3b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e571c3b
Branch: refs/heads/1.x-branch
Commit: 3e571c3b0f800737681f4a43bd523d90b9c69536
Parents: c0a706a
Author: Priyank <ps...@hortonworks.com>
Authored: Thu Jun 9 12:02:14 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Mon Jun 20 17:28:11 2016 -0700
----------------------------------------------------------------------
storm-core/src/ui/public/topology.html | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3e571c3b/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index bb11933..60b68f5 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -69,6 +69,7 @@
<div class="row">
<div id="topology-stats" class="col-md-12"></div>
</div>
+ <!-- This gets populated only if the spouts lag data is available. Currently only kafka spout is supported-->
<div class="row">
<div id="topology-spouts-lag" class="col-md-12"></div>
</div>