You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/10 15:33:51 UTC

[04/10] storm git commit: Remove duplicated utils

Remove duplicated utils


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f6b58a52
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f6b58a52
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f6b58a52

Branch: refs/heads/master
Commit: f6b58a52ad30f35e7a635ffbffdb9fc7c7f2de37
Parents: 0726c11
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Fri Mar 4 17:10:44 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Fri Mar 4 17:10:44 2016 +0800

----------------------------------------------------------------------
 .../org/apache/storm/daemon/StormCommon.java    | 28 ++++++++--------
 .../src/jvm/org/apache/storm/utils/Utils.java   | 34 --------------------
 2 files changed, 14 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f6b58a52/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 7680fbc..7fa5ba4 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -239,8 +239,8 @@ public class StormCommon {
             ComponentCommon common = getComponentCommon(componentObj);
             if (common != null) {
                 int parallelismHintNum = Thrift.getParallelismHint(common);
-                Integer taskNum = Utils.parseInt(conf.get(Config.TOPOLOGY_TASKS));
-                if (taskNum != null && taskNum > 0 && parallelismHintNum <= 0) {
+                Integer taskNum = Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
+                if (taskNum > 0 && parallelismHintNum <= 0) {
                     throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
                 }
             }
@@ -317,7 +317,7 @@ public class StormCommon {
     }
 
     public static void addAcker(Map conf, StormTopology topology) {
-        int ackerNum = Utils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.parseInt(conf.get(Config.TOPOLOGY_WORKERS)));
+        int ackerNum = Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
         Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
 
         Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
@@ -326,7 +326,7 @@ public class StormCommon {
 
         Map<String, Object> ackerConf = new HashMap<String, Object>();
         ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
-        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
 
         Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
 
@@ -339,7 +339,7 @@ public class StormCommon {
         for (SpoutSpec spout : topology.get_spouts().values()) {
             ComponentCommon common = spout.get_common();
             Map spoutConf = componentConf(spout);
-            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
             common.set_json_conf(JSONValue.toJSONString(spoutConf));
             common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
             common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
@@ -404,10 +404,10 @@ public class StormCommon {
     }
 
     public static void addEventLogger(Map conf, StormTopology topology) {
-        Integer numExecutors = Utils.parseInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.parseInt(conf.get(Config.TOPOLOGY_WORKERS)));
+        Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
         HashMap<String, Object> componentConf = new HashMap<String, Object>();
         componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
-        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
         Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
 
         for(Object component : allComponents(topology).values()) {
@@ -437,7 +437,7 @@ public class StormCommon {
             for (Map<String, Object> info : registerInfo) {
                 String className = (String) info.get("class");
                 Object argument = info.get("argument");
-                Integer phintNum = Utils.parseInt(info.get("parallelism.hint"), 1);
+                Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1);
                 Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
                 metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
                 Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf);
@@ -499,8 +499,8 @@ public class StormCommon {
     }
 
     public static boolean hasAckers(Map stormConf) {
-        Integer ackerNum = Utils.parseInt(stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
-        if (ackerNum == null || ackerNum > 0) {
+        Object ackerNum = stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+        if (ackerNum == null || Utils.getInt(ackerNum) > 0) {
             return true;
         } else {
             return false;
@@ -508,8 +508,8 @@ public class StormCommon {
     }
 
     public static boolean hasEventLoggers(Map stormConf) {
-        Integer eventLoggerNum = Utils.parseInt(stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
-        if (eventLoggerNum == null || eventLoggerNum > 0) {
+        Object eventLoggerNum = stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
+        if (eventLoggerNum == null || Utils.getInt(eventLoggerNum) > 0) {
             return true;
         } else {
             return false;
@@ -539,9 +539,9 @@ public class StormCommon {
         Map<String, Integer> componentIdToTaskNum = new TreeMap<String, Integer>();
         for (Map.Entry<String, Object> entry : components.entrySet()) {
             Map conf = componentConf(entry.getValue());
-            Integer taskNum = Utils.parseInt(conf.get(Config.TOPOLOGY_TASKS));
+            Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
             if (taskNum != null) {
-                componentIdToTaskNum.put(entry.getKey(), taskNum);
+                componentIdToTaskNum.put(entry.getKey(), Utils.getInt(taskNum));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f6b58a52/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 8dde52c..e59f83f 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2303,40 +2303,6 @@ public class Utils {
         return a ^ b;
     }
 
-    public static Integer parseInt(Object o) {
-        if (o == null) {
-            return null;
-        }
-
-        if (o instanceof String) {
-            return Integer.parseInt(String.valueOf(o));
-        } else if (o instanceof Long) {
-            long value = (Long) o;
-            return (int) value;
-        } else if (o instanceof Integer) {
-            return (Integer) o;
-        } else {
-            throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
-        }
-    }
-
-    public static Integer parseInt(Object o, int defaultValue) {
-        if (o == null) {
-            return defaultValue;
-        }
-
-        if (o instanceof String) {
-            return Integer.parseInt(String.valueOf(o));
-        } else if (o instanceof Long) {
-            long value = (Long) o;
-            return (int) value;
-        } else if (o instanceof Integer) {
-            return (Integer) o;
-        } else {
-            return defaultValue;
-        }
-    }
-
     public static List<String> getRepeat(List<String> list) {
         List<String> rtn = new ArrayList<String>();
         Set<String> idSet = new HashSet<String>();