You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/08/23 02:28:39 UTC

storm git commit: Merge branch 'STORM-3201-1.x' of STORM-3201-1.x

Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch ed79d96dd -> 242ad03c9


Merge branch 'STORM-3201-1.x' of STORM-3201-1.x

STORM-3201: Cleanup Lag


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/242ad03c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/242ad03c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/242ad03c

Branch: refs/heads/1.1.x-branch
Commit: 242ad03c981974a33b1eabbf671d7bf41a0d151d
Parents: ed79d96
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Wed Aug 22 12:48:00 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Wed Aug 22 17:46:04 2018 -0500

----------------------------------------------------------------------
 .../apache/storm/utils/TopologySpoutLag.java    | 101 ++++++++++---------
 1 file changed, 52 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/242ad03c/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index bb327ee..1217577 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,9 +18,14 @@
 
 package org.apache.storm.utils;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.ComponentObject;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
 import org.json.simple.JSONValue;
@@ -28,70 +33,45 @@ import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class TopologySpoutLag {
     private static final String SPOUT_ID = "spoutId";
-    private static final String SPOUT_TYPE= "spoutType";
+    private static final String SPOUT_TYPE = "spoutType";
     private static final String SPOUT_LAG_RESULT = "spoutLagResult";
     private static final String ERROR_INFO = "errorInfo";
+    private static final String CONFIG_KEY_PREFIX = "config.";
+    private static final String TOPICS_CONFIG = CONFIG_KEY_PREFIX + "topics";
+    private static final String GROUPID_CONFIG = CONFIG_KEY_PREFIX + "groupid";
+    private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
+    private static final String LEADERS_CONFIG = CONFIG_KEY_PREFIX + "leaders";
+    private static final String ZKROOT_CONFIG = CONFIG_KEY_PREFIX + "zkRoot";
     private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
 
-    public static Map<String, Map<String, Object>> lag (StormTopology stormTopology, Map topologyConf) {
+    public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map topologyConf) {
         Map<String, Map<String, Object>> result = new HashMap<>();
         Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
-        String className = null;
         for (Map.Entry<String, SpoutSpec> spout: spouts.entrySet()) {
             try {
                 SpoutSpec spoutSpec = spout.getValue();
-                ComponentObject componentObject = spoutSpec.get_spout_object();
-                // FIXME: yes it's a trick so we might be better to find alternative way...
-                className = getClassNameFromComponentObject(componentObject);
-                logger.debug("spout classname: {}", className);
-                if (className.endsWith("storm.kafka.spout.KafkaSpout")) {
-                    result.put(spout.getKey(), getLagResultForNewKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
-                } else if (className.endsWith("storm.kafka.KafkaSpout")) {
-                    result.put(spout.getKey(), getLagResultForOldKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
-                }
+                addLagResultForKafkaSpout(result, spout.getKey(), spoutSpec, topologyConf);
             } catch (Exception e) {
-                logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey() + " and spout class: " + className);
+                logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey());
                 logger.warn("Exception message:" + e.getMessage(), e);
             }
         }
         return result;
     }
 
-    private static String getClassNameFromComponentObject(ComponentObject componentObject) {
-        try {
-            Object object = Utils.getSetComponentObject(componentObject);
-            return object.getClass().getCanonicalName();
-        } catch (RuntimeException e) {
-
-            if (e.getCause() instanceof ClassNotFoundException) {
-                return e.getCause().getMessage().trim();
-            }
-
-            throw e;
-        }
-    }
-
     private static List<String> getCommandLineOptionsForNewKafkaSpout (Map<String, Object> jsonConf) {
         logger.debug("json configuration: {}", jsonConf);
 
         List<String> commands = new ArrayList<>();
-        String configKeyPrefix = "config.";
         commands.add("-t");
-        commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
+        commands.add((String) jsonConf.get(TOPICS_CONFIG));
         commands.add("-g");
-        commands.add((String)jsonConf.get(configKeyPrefix + "groupid"));
+        commands.add((String) jsonConf.get(GROUPID_CONFIG));
         commands.add("-b");
-        commands.add((String)jsonConf.get(configKeyPrefix + "bootstrap.servers"));
-        String securityProtocol = (String)jsonConf.get(configKeyPrefix + "security.protocol");
+        commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
+        String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol");
         if (securityProtocol != null && !securityProtocol.isEmpty()) {
             commands.add("-s");
             commands.add(securityProtocol);
@@ -103,13 +83,12 @@ public class TopologySpoutLag {
         logger.debug("json configuration: {}", jsonConf);
 
         List<String> commands = new ArrayList<>();
-        String configKeyPrefix = "config.";
         commands.add("-o");
         commands.add("-t");
-        commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
+        commands.add((String)jsonConf.get(TOPICS_CONFIG));
         commands.add("-n");
-        commands.add((String)jsonConf.get(configKeyPrefix + "zkRoot"));
-        String zkServers = (String)jsonConf.get(configKeyPrefix + "zkServers");
+        commands.add((String)jsonConf.get(ZKROOT_CONFIG));
+        String zkServers = (String)jsonConf.get(CONFIG_KEY_PREFIX + "zkServers");
         if (zkServers == null || zkServers.isEmpty()) {
             StringBuilder zkServersBuilder = new StringBuilder();
             Integer zkPort = ((Number) topologyConf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
@@ -120,14 +99,14 @@ public class TopologySpoutLag {
         }
         commands.add("-z");
         commands.add(zkServers);
-        if (jsonConf.get(configKeyPrefix + "leaders") != null) {
+        if (jsonConf.get(LEADERS_CONFIG) != null) {
             commands.add("-p");
-            commands.add((String)jsonConf.get(configKeyPrefix + "partitions"));
+            commands.add((String)jsonConf.get(CONFIG_KEY_PREFIX + "partitions"));
             commands.add("-l");
-            commands.add((String)jsonConf.get(configKeyPrefix + "leaders"));
+            commands.add((String)jsonConf.get(LEADERS_CONFIG));
         } else {
             commands.add("-r");
-            commands.add((String)jsonConf.get(configKeyPrefix + "zkNodeBrokers"));
+            commands.add((String)jsonConf.get(CONFIG_KEY_PREFIX + "zkNodeBrokers"));
             Boolean isWildCard = (Boolean) topologyConf.get("kafka.topic.wildcard.match");
             if (isWildCard != null && isWildCard.booleanValue()) {
                 commands.add("-w");
@@ -136,6 +115,30 @@ public class TopologySpoutLag {
         return commands;
     }
 
+    private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>> finalResult, String spoutId, SpoutSpec spoutSpec,
+                                                  Map topologyConf) throws IOException {
+        ComponentCommon componentCommon = spoutSpec.get_common();
+        String json = componentCommon.get_json_conf();
+        if (json != null && !json.isEmpty()) {
+            Map<String, Object> jsonMap = null;
+            try {
+                jsonMap = (Map<String, Object>) JSONValue.parseWithException(json);
+            } catch (ParseException e) {
+                throw new IOException(e);
+            }
+
+            if (jsonMap.containsKey(TOPICS_CONFIG)
+                && jsonMap.containsKey(GROUPID_CONFIG)
+                && jsonMap.containsKey(BOOTSTRAP_CONFIG)) {
+                finalResult.put(spoutId, getLagResultForNewKafkaSpout(spoutId, spoutSpec, topologyConf));
+            } else if (jsonMap.containsKey(TOPICS_CONFIG)
+                && jsonMap.containsKey(ZKROOT_CONFIG)) {
+                //Probably the old spout
+                finalResult.put(spoutId, getLagResultForOldKafkaSpout(spoutId, spoutSpec, topologyConf));
+            }
+        }
+    }
+
     private static Map<String, Object> getLagResultForKafka (String spoutId, SpoutSpec spoutSpec, Map topologyConf, boolean old) throws IOException {
         ComponentCommon componentCommon = spoutSpec.get_common();
         String json = componentCommon.get_json_conf();