You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/08/22 22:45:40 UTC
[1/2] storm git commit: STORM-3201: Cleanup Topology Spout Lag
Repository: storm
Updated Branches:
refs/heads/1.x-branch 4f8b0e39b -> ea84f47e1
STORM-3201: Cleanup Topology Spout Lag
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c4966d4b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c4966d4b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c4966d4b
Branch: refs/heads/1.x-branch
Commit: c4966d4b2c2386a54b7fadf2b08e8d454fddb219
Parents: 4f8b0e3
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Aug 20 14:27:50 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Aug 22 11:10:29 2018 -0500
----------------------------------------------------------------------
.../apache/storm/utils/TopologySpoutLag.java | 101 ++++++++++---------
1 file changed, 52 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c4966d4b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index bb327ee..1217577 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,9 +18,14 @@
package org.apache.storm.utils;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.ComponentObject;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.json.simple.JSONValue;
@@ -28,70 +33,45 @@ import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class TopologySpoutLag {
private static final String SPOUT_ID = "spoutId";
- private static final String SPOUT_TYPE= "spoutType";
+ private static final String SPOUT_TYPE = "spoutType";
private static final String SPOUT_LAG_RESULT = "spoutLagResult";
private static final String ERROR_INFO = "errorInfo";
+ private static final String CONFIG_KEY_PREFIX = "config.";
+ private static final String TOPICS_CONFIG = CONFIG_KEY_PREFIX + "topics";
+ private static final String GROUPID_CONFIG = CONFIG_KEY_PREFIX + "groupid";
+ private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
+ private static final String LEADERS_CONFIG = CONFIG_KEY_PREFIX + "leaders";
+ private static final String ZKROOT_CONFIG = CONFIG_KEY_PREFIX + "zkRoot";
private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
- public static Map<String, Map<String, Object>> lag (StormTopology stormTopology, Map topologyConf) {
+ public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map topologyConf) {
Map<String, Map<String, Object>> result = new HashMap<>();
Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
- String className = null;
for (Map.Entry<String, SpoutSpec> spout: spouts.entrySet()) {
try {
SpoutSpec spoutSpec = spout.getValue();
- ComponentObject componentObject = spoutSpec.get_spout_object();
- // FIXME: yes it's a trick so we might be better to find alternative way...
- className = getClassNameFromComponentObject(componentObject);
- logger.debug("spout classname: {}", className);
- if (className.endsWith("storm.kafka.spout.KafkaSpout")) {
- result.put(spout.getKey(), getLagResultForNewKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
- } else if (className.endsWith("storm.kafka.KafkaSpout")) {
- result.put(spout.getKey(), getLagResultForOldKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
- }
+ addLagResultForKafkaSpout(result, spout.getKey(), spoutSpec, topologyConf);
} catch (Exception e) {
- logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey() + " and spout class: " + className);
+ logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey());
logger.warn("Exception message:" + e.getMessage(), e);
}
}
return result;
}
- private static String getClassNameFromComponentObject(ComponentObject componentObject) {
- try {
- Object object = Utils.getSetComponentObject(componentObject);
- return object.getClass().getCanonicalName();
- } catch (RuntimeException e) {
-
- if (e.getCause() instanceof ClassNotFoundException) {
- return e.getCause().getMessage().trim();
- }
-
- throw e;
- }
- }
-
private static List<String> getCommandLineOptionsForNewKafkaSpout (Map<String, Object> jsonConf) {
logger.debug("json configuration: {}", jsonConf);
List<String> commands = new ArrayList<>();
- String configKeyPrefix = "config.";
commands.add("-t");
- commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
+ commands.add((String) jsonConf.get(TOPICS_CONFIG));
commands.add("-g");
- commands.add((String)jsonConf.get(configKeyPrefix + "groupid"));
+ commands.add((String) jsonConf.get(GROUPID_CONFIG));
commands.add("-b");
- commands.add((String)jsonConf.get(configKeyPrefix + "bootstrap.servers"));
- String securityProtocol = (String)jsonConf.get(configKeyPrefix + "security.protocol");
+ commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
+ String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol");
if (securityProtocol != null && !securityProtocol.isEmpty()) {
commands.add("-s");
commands.add(securityProtocol);
@@ -103,13 +83,12 @@ public class TopologySpoutLag {
logger.debug("json configuration: {}", jsonConf);
List<String> commands = new ArrayList<>();
- String configKeyPrefix = "config.";
commands.add("-o");
commands.add("-t");
- commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
+ commands.add((String)jsonConf.get(TOPICS_CONFIG));
commands.add("-n");
- commands.add((String)jsonConf.get(configKeyPrefix + "zkRoot"));
- String zkServers = (String)jsonConf.get(configKeyPrefix + "zkServers");
+ commands.add((String)jsonConf.get(ZKROOT_CONFIG));
+ String zkServers = (String)jsonConf.get(CONFIG_KEY_PREFIX + "zkServers");
if (zkServers == null || zkServers.isEmpty()) {
StringBuilder zkServersBuilder = new StringBuilder();
Integer zkPort = ((Number) topologyConf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
@@ -120,14 +99,14 @@ public class TopologySpoutLag {
}
commands.add("-z");
commands.add(zkServers);
- if (jsonConf.get(configKeyPrefix + "leaders") != null) {
+ if (jsonConf.get(LEADERS_CONFIG) != null) {
commands.add("-p");
- commands.add((String)jsonConf.get(configKeyPrefix + "partitions"));
+ commands.add((String)jsonConf.get(CONFIG_KEY_PREFIX + "partitions"));
commands.add("-l");
- commands.add((String)jsonConf.get(configKeyPrefix + "leaders"));
+ commands.add((String)jsonConf.get(LEADERS_CONFIG));
} else {
commands.add("-r");
- commands.add((String)jsonConf.get(configKeyPrefix + "zkNodeBrokers"));
+ commands.add((String)jsonConf.get(CONFIG_KEY_PREFIX + "zkNodeBrokers"));
Boolean isWildCard = (Boolean) topologyConf.get("kafka.topic.wildcard.match");
if (isWildCard != null && isWildCard.booleanValue()) {
commands.add("-w");
@@ -136,6 +115,30 @@ public class TopologySpoutLag {
return commands;
}
+ private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>> finalResult, String spoutId, SpoutSpec spoutSpec,
+ Map topologyConf) throws IOException {
+ ComponentCommon componentCommon = spoutSpec.get_common();
+ String json = componentCommon.get_json_conf();
+ if (json != null && !json.isEmpty()) {
+ Map<String, Object> jsonMap = null;
+ try {
+ jsonMap = (Map<String, Object>) JSONValue.parseWithException(json);
+ } catch (ParseException e) {
+ throw new IOException(e);
+ }
+
+ if (jsonMap.containsKey(TOPICS_CONFIG)
+ && jsonMap.containsKey(GROUPID_CONFIG)
+ && jsonMap.containsKey(BOOTSTRAP_CONFIG)) {
+ finalResult.put(spoutId, getLagResultForNewKafkaSpout(spoutId, spoutSpec, topologyConf));
+ } else if (jsonMap.containsKey(TOPICS_CONFIG)
+ && jsonMap.containsKey(ZKROOT_CONFIG)) {
+ //Probably the old spout
+ finalResult.put(spoutId, getLagResultForOldKafkaSpout(spoutId, spoutSpec, topologyConf));
+ }
+ }
+ }
+
private static Map<String, Object> getLagResultForKafka (String spoutId, SpoutSpec spoutSpec, Map topologyConf, boolean old) throws IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
String json = componentCommon.get_json_conf();
[2/2] storm git commit: Merge branch 'STORM-3201-1.x' of
STORM-3201-1.x
Posted by bo...@apache.org.
Merge branch 'STORM-3201-1.x' of STORM-3201-1.x
STORM-3201: Cleanup Lag
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea84f47e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea84f47e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea84f47e
Branch: refs/heads/1.x-branch
Commit: ea84f47e12c65ba0f1e6856a1c8eef9c758a671d
Parents: 4f8b0e3 c4966d4
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Wed Aug 22 12:48:00 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Wed Aug 22 12:48:00 2018 -0500
----------------------------------------------------------------------
.../apache/storm/utils/TopologySpoutLag.java | 101 ++++++++++---------
1 file changed, 52 insertions(+), 49 deletions(-)
----------------------------------------------------------------------