You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/24 05:30:48 UTC
storm git commit: [STORM-1122] Fix the format issue in Utils.java
Repository: storm
Updated Branches:
refs/heads/0.10.x-branch 122686352 -> 6c662e337
[STORM-1122] Fix the format issue in Utils.java
Conflicts:
storm-core/src/jvm/backtype/storm/utils/Utils.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6c662e33
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6c662e33
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6c662e33
Branch: refs/heads/0.10.x-branch
Commit: 6c662e337c42272cb4bfba7b585a36e04cbf4143
Parents: 1226863
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Oct 22 12:43:37 2015 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 12:07:55 2015 +0900
----------------------------------------------------------------------
.../src/jvm/backtype/storm/utils/Utils.java | 258 ++++++++++---------
1 file changed, 130 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6c662e33/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 15e6985..90a447a 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
-import java.io.*;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
@@ -48,7 +47,26 @@ import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.io.File;
import java.io.FileInputStream;
-import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.OutputStreamWriter;
+import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.FileOutputStream;
+import java.io.BufferedReader;
+import java.io.Serializable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.util.UUID;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -99,9 +117,9 @@ public class Utils {
Object ret = ois.readObject();
ois.close();
return (T)ret;
- } catch(IOException ioe) {
+ } catch (IOException ioe) {
throw new RuntimeException(ioe);
- } catch(ClassNotFoundException e) {
+ } catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@@ -155,9 +173,9 @@ public class Utils {
Object ret = JSONValue.parseWithException(in);
in.close();
return (Map<String,Object>)ret;
- } catch(IOException ioe) {
+ } catch (IOException ioe) {
throw new RuntimeException(ioe);
- } catch(ParseException e) {
+ } catch (ParseException e) {
throw new RuntimeException(e);
}
}
@@ -186,7 +204,7 @@ public class Utils {
try {
Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
List<URL> ret = new ArrayList<URL>();
- while(resources.hasMoreElements()) {
+ while (resources.hasMoreElements()) {
ret.add(resources.nextElement());
}
return ret;
@@ -259,7 +277,7 @@ public class Utils {
public static Map findAndReadConfigFile(String name) {
- return findAndReadConfigFile(name, true);
+ return findAndReadConfigFile(name, true);
}
public static Map readDefaultConfig() {
@@ -269,7 +287,7 @@ public class Utils {
public static Map readCommandLineOpts() {
Map ret = new HashMap();
String commandOptions = System.getProperty("storm.options");
- if(commandOptions != null) {
+ if (commandOptions != null) {
String[] configs = commandOptions.split(",");
for (String config : configs) {
config = URLDecoder.decode(config);
@@ -292,7 +310,7 @@ public class Utils {
Map ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map storm;
- if (confFile==null || confFile.equals("")) {
+ if (confFile == null || confFile.equals("")) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
storm = findAndReadConfigFile(confFile, true);
@@ -303,24 +321,24 @@ public class Utils {
}
private static Object normalizeConf(Object conf) {
- if(conf==null) return new HashMap();
- if(conf instanceof Map) {
+ if (conf == null) return new HashMap();
+ if (conf instanceof Map) {
Map confMap = new HashMap((Map) conf);
- for(Object key: confMap.keySet()) {
+ for (Object key : confMap.keySet()) {
Object val = confMap.get(key);
confMap.put(key, normalizeConf(val));
}
return confMap;
- } else if(conf instanceof List) {
+ } else if (conf instanceof List) {
List confList = new ArrayList((List) conf);
- for(int i=0; i<confList.size(); i++) {
+ for (int i = 0; i < confList.size(); i++) {
Object val = confList.get(i);
confList.set(i, normalizeConf(val));
}
return confList;
} else if (conf instanceof Integer) {
return ((Integer) conf).longValue();
- } else if(conf instanceof Float) {
+ } else if (conf instanceof Float) {
return ((Float) conf).doubleValue();
} else {
return conf;
@@ -332,9 +350,9 @@ public class Utils {
}
public static Object getSetComponentObject(ComponentObject obj) {
- if(obj.getSetField()==ComponentObject._Fields.SERIALIZED_JAVA) {
+ if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) {
return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
- } else if(obj.getSetField()==ComponentObject._Fields.JAVA_OBJECT) {
+ } else if (obj.getSetField() == ComponentObject._Fields.JAVA_OBJECT) {
return obj.get_java_object();
} else {
return obj.get_shell();
@@ -343,7 +361,7 @@ public class Utils {
public static <S, T> T get(Map<S, T> m, S key, T def) {
T ret = m.get(key);
- if(ret==null) {
+ if (ret == null) {
ret = def;
}
return ret;
@@ -351,7 +369,7 @@ public class Utils {
public static List<Object> tuple(Object... values) {
List<Object> ret = new ArrayList<Object>();
- for(Object v: values) {
+ for (Object v : values) {
ret.add(v);
}
return ret;
@@ -360,9 +378,9 @@ public class Utils {
public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
- download(client, file, localFile);
+ download(client, file, localFile);
} finally {
- client.close();
+ client.close();
}
}
@@ -370,21 +388,21 @@ public class Utils {
WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile));
try {
String id = client.getClient().beginFileDownload(file);
- while(true) {
- ByteBuffer chunk = client.getClient().downloadChunk(id);
- int written = out.write(chunk);
- if(written==0) break;
- }
+ while (true) {
+ ByteBuffer chunk = client.getClient().downloadChunk(id);
+ int written = out.write(chunk);
+ if (written == 0) break;
+ }
} finally {
- out.close();
+ out.close();
}
}
public static IFn loadClojureFn(String namespace, String name) {
try {
- clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
+ clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
} catch (Exception e) {
- //if playing from the repl and defining functions, file won't exist
+ //if playing from the repl and defining functions, file won't exist
}
return (IFn) RT.var(namespace, name).deref();
}
@@ -395,64 +413,63 @@ public class Utils {
public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
Map<V, K> ret = new HashMap<V, K>();
- for(K key: map.keySet()) {
+ for (K key : map.keySet()) {
ret.put(map.get(key), key);
}
return ret;
}
public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
- if(topology.get_spouts().containsKey(id)) {
+ if (topology.get_spouts().containsKey(id)) {
return topology.get_spouts().get(id).get_common();
}
- if(topology.get_bolts().containsKey(id)) {
+ if (topology.get_bolts().containsKey(id)) {
return topology.get_bolts().get(id).get_common();
}
- if(topology.get_state_spouts().containsKey(id)) {
+ if (topology.get_state_spouts().containsKey(id)) {
return topology.get_state_spouts().get(id).get_common();
}
throw new IllegalArgumentException("Could not find component with id " + id);
}
public static Integer getInt(Object o) {
- Integer result = getInt(o, null);
- if (null == result) {
- throw new IllegalArgumentException("Don't know how to convert null to int");
- }
- return result;
+ Integer result = getInt(o, null);
+ if (null == result) {
+ throw new IllegalArgumentException("Don't know how to convert null to int");
+ }
+ return result;
}
public static Integer getInt(Object o, Integer defaultValue) {
- if (null == o) {
- return defaultValue;
- }
-
- if (o instanceof Integer ||
- o instanceof Short ||
- o instanceof Byte) {
- return ((Number) o).intValue();
- } else if (o instanceof Long) {
- final long l = (Long) o;
- if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
- return (int) l;
- }
- } else if (o instanceof String) {
- return Integer.parseInt((String) o);
- }
-
- throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
+ if (null == o) {
+ return defaultValue;
+ }
+
+ if (o instanceof Integer ||
+ o instanceof Short ||
+ o instanceof Byte) {
+ return ((Number) o).intValue();
+ } else if (o instanceof Long) {
+ final long l = (Long) o;
+ if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
+ return (int) l;
+ }
+ } else if (o instanceof String) {
+ return Integer.parseInt((String) o);
+ }
+
+ throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
public static boolean getBoolean(Object o, boolean defaultValue) {
- if (null == o) {
- return defaultValue;
- }
-
- if(o instanceof Boolean) {
- return (Boolean) o;
- } else {
- throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
- }
+ if (null == o) {
+ return defaultValue;
+ }
+ if (o instanceof Boolean) {
+ return (Boolean) o;
+ } else {
+ throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
+ }
}
public static long secureRandomLong() {
@@ -465,7 +482,7 @@ public class Utils {
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
List<String> serverPorts = new ArrayList<String>();
- for(String zkServer: (List<String>) servers) {
+ for (String zkServer : (List<String>) servers) {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
@@ -479,14 +496,14 @@ public class Utils {
protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
{
builder.connectString(zkStr)
- .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
- .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
- .retryPolicy(new StormBoundedExponentialBackoffRetry(
+ .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+ .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+ .retryPolicy(new StormBoundedExponentialBackoffRetry(
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
- if(auth!=null && auth.scheme!=null && auth.payload!=null) {
+ if (auth != null && auth.scheme != null && auth.payload != null) {
builder = builder.authorization(auth.scheme, auth.payload);
}
}
@@ -507,28 +524,13 @@ public class Utils {
return ret;
}
- /**
- *
-(defn integer-divided [sum num-pieces]
- (let [base (int (/ sum num-pieces))
- num-inc (mod sum num-pieces)
- num-bases (- num-pieces num-inc)]
- (if (= num-inc 0)
- {base num-bases}
- {base num-bases (inc base) num-inc}
- )))
- * @param sum
- * @param numPieces
- * @return
- */
-
public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) {
int base = sum / numPieces;
int numInc = sum % numPieces;
int numBases = numPieces - numInc;
TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
ret.put(base, numBases);
- if(numInc!=0) {
+ if (numInc != 0) {
ret.put(base+1, numInc);
}
return ret;
@@ -544,7 +546,7 @@ public class Utils {
try {
BufferedReader r = new BufferedReader(new InputStreamReader(in));
String line = null;
- while ((line = r.readLine())!= null) {
+ while ((line = r.readLine()) != null) {
LOG.info("{}:{}", prefix, line);
}
} catch (IOException e) {
@@ -554,8 +556,8 @@ public class Utils {
public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
Throwable t = throwable;
- while(t != null) {
- if(klass.isInstance(t)) {
+ while (t != null) {
+ if (klass.isInstance(t)) {
return true;
}
t = t.getCause();
@@ -571,9 +573,9 @@ public class Utils {
*/
public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
return null != System.getProperty("java.security.auth.login.config")
- || (conf != null
+ || (conf != null
&& conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
- && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
+ && !((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
}
/**
@@ -584,7 +586,7 @@ public class Utils {
public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
return (conf != null
&& conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
- && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
+ && !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}
public static List<ACL> getWorkerACL(Map conf) {
@@ -594,37 +596,37 @@ public class Utils {
}
String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
if (stormZKUser == null) {
- throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+ throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
}
String[] split = stormZKUser.split(":",2);
if (split.length != 2) {
- throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+ throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
}
ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
return ret;
}
- public static String threadDump() {
- final StringBuilder dump = new StringBuilder();
- final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
- final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
- for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
- dump.append('"');
- dump.append(threadInfo.getThreadName());
- dump.append("\" ");
- final Thread.State state = threadInfo.getThreadState();
- dump.append("\n java.lang.Thread.State: ");
- dump.append(state);
- final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
- for (final StackTraceElement stackTraceElement : stackTraceElements) {
- dump.append("\n at ");
- dump.append(stackTraceElement);
- }
- dump.append("\n\n");
- }
- return dump.toString();
- }
+ public static String threadDump() {
+ final StringBuilder dump = new StringBuilder();
+ final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
+ final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+ for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
+ dump.append('"');
+ dump.append(threadInfo.getThreadName());
+ dump.append("\" ");
+ final Thread.State state = threadInfo.getThreadState();
+ dump.append("\n java.lang.Thread.State: ");
+ dump.append(state);
+ final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+ for (final StackTraceElement stackTraceElement : stackTraceElements) {
+ dump.append("\n at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append("\n\n");
+ }
+ return dump.toString();
+ }
// Assumes caller is synchronizing
private static SerializationDelegate getSerializationDelegate(Map stormConf) {
@@ -647,20 +649,20 @@ public class Utils {
return delegate;
}
- public static void handleUncaughtException(Throwable t) {
- if (t != null && t instanceof Error) {
- if (t instanceof OutOfMemoryError) {
- try {
- System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
- } catch (Throwable err) {
- //Again we don't want to exit because of logging issues.
+ public static void handleUncaughtException(Throwable t) {
+ if (t != null && t instanceof Error) {
+ if (t instanceof OutOfMemoryError) {
+ try {
+ System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
+ } catch (Throwable err) {
+ //Again we don't want to exit because of logging issues.
+ }
+ Runtime.getRuntime().halt(-1);
+ } else {
+ //Running in daemon mode, we would pass Error to calling thread.
+ throw (Error) t;
+ }
}
- Runtime.getRuntime().halt(-1);
- } else {
- //Running in daemon mode, we would pass Error to calling thread.
- throw (Error) t;
- }
}
- }
}