You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/10 15:33:51 UTC
[04/10] storm git commit: Remove duplicated utils
Remove duplicated utils
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f6b58a52
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f6b58a52
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f6b58a52
Branch: refs/heads/master
Commit: f6b58a52ad30f35e7a635ffbffdb9fc7c7f2de37
Parents: 0726c11
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Fri Mar 4 17:10:44 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Fri Mar 4 17:10:44 2016 +0800
----------------------------------------------------------------------
.../org/apache/storm/daemon/StormCommon.java | 28 ++++++++--------
.../src/jvm/org/apache/storm/utils/Utils.java | 34 --------------------
2 files changed, 14 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f6b58a52/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 7680fbc..7fa5ba4 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -239,8 +239,8 @@ public class StormCommon {
ComponentCommon common = getComponentCommon(componentObj);
if (common != null) {
int parallelismHintNum = Thrift.getParallelismHint(common);
- Integer taskNum = Utils.parseInt(conf.get(Config.TOPOLOGY_TASKS));
- if (taskNum != null && taskNum > 0 && parallelismHintNum <= 0) {
+ Integer taskNum = Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
+ if (taskNum > 0 && parallelismHintNum <= 0) {
throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
}
}
@@ -317,7 +317,7 @@ public class StormCommon {
}
public static void addAcker(Map conf, StormTopology topology) {
- int ackerNum = Utils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.parseInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ int ackerNum = Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
@@ -326,7 +326,7 @@ public class StormCommon {
Map<String, Object> ackerConf = new HashMap<String, Object>();
ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
- ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
@@ -339,7 +339,7 @@ public class StormCommon {
for (SpoutSpec spout : topology.get_spouts().values()) {
ComponentCommon common = spout.get_common();
Map spoutConf = componentConf(spout);
- spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
common.set_json_conf(JSONValue.toJSONString(spoutConf));
common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
@@ -404,10 +404,10 @@ public class StormCommon {
}
public static void addEventLogger(Map conf, StormTopology topology) {
- Integer numExecutors = Utils.parseInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.parseInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
HashMap<String, Object> componentConf = new HashMap<String, Object>();
componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
- componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
for(Object component : allComponents(topology).values()) {
@@ -437,7 +437,7 @@ public class StormCommon {
for (Map<String, Object> info : registerInfo) {
String className = (String) info.get("class");
Object argument = info.get("argument");
- Integer phintNum = Utils.parseInt(info.get("parallelism.hint"), 1);
+ Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1);
Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf);
@@ -499,8 +499,8 @@ public class StormCommon {
}
public static boolean hasAckers(Map stormConf) {
- Integer ackerNum = Utils.parseInt(stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
- if (ackerNum == null || ackerNum > 0) {
+ Object ackerNum = stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+ if (ackerNum == null || Utils.getInt(ackerNum) > 0) {
return true;
} else {
return false;
@@ -508,8 +508,8 @@ public class StormCommon {
}
public static boolean hasEventLoggers(Map stormConf) {
- Integer eventLoggerNum = Utils.parseInt(stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
- if (eventLoggerNum == null || eventLoggerNum > 0) {
+ Object eventLoggerNum = stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
+ if (eventLoggerNum == null || Utils.getInt(eventLoggerNum) > 0) {
return true;
} else {
return false;
@@ -539,9 +539,9 @@ public class StormCommon {
Map<String, Integer> componentIdToTaskNum = new TreeMap<String, Integer>();
for (Map.Entry<String, Object> entry : components.entrySet()) {
Map conf = componentConf(entry.getValue());
- Integer taskNum = Utils.parseInt(conf.get(Config.TOPOLOGY_TASKS));
+ Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
if (taskNum != null) {
- componentIdToTaskNum.put(entry.getKey(), taskNum);
+ componentIdToTaskNum.put(entry.getKey(), Utils.getInt(taskNum));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f6b58a52/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 8dde52c..e59f83f 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2303,40 +2303,6 @@ public class Utils {
return a ^ b;
}
- public static Integer parseInt(Object o) {
- if (o == null) {
- return null;
- }
-
- if (o instanceof String) {
- return Integer.parseInt(String.valueOf(o));
- } else if (o instanceof Long) {
- long value = (Long) o;
- return (int) value;
- } else if (o instanceof Integer) {
- return (Integer) o;
- } else {
- throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
- }
- }
-
- public static Integer parseInt(Object o, int defaultValue) {
- if (o == null) {
- return defaultValue;
- }
-
- if (o instanceof String) {
- return Integer.parseInt(String.valueOf(o));
- } else if (o instanceof Long) {
- long value = (Long) o;
- return (int) value;
- } else if (o instanceof Integer) {
- return (Integer) o;
- } else {
- return defaultValue;
- }
- }
-
public static List<String> getRepeat(List<String> list) {
List<String> rtn = new ArrayList<String>();
Set<String> idSet = new HashSet<String>();