You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/24 05:30:59 UTC
[1/3] storm git commit: [STORM-1122] Fix the format issue in
Utils.java
Repository: storm
Updated Branches:
refs/heads/master ff3b8affa -> e93015c64
[STORM-1122] Fix the format issue in Utils.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8161beab
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8161beab
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8161beab
Branch: refs/heads/master
Commit: 8161beab46b5d489aa37db524b980c1676dd08fe
Parents: b0f3be0
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Oct 22 12:43:37 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 22 12:49:36 2015 -0500
----------------------------------------------------------------------
.../src/jvm/backtype/storm/utils/Utils.java | 262 ++++++++++---------
1 file changed, 132 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8161beab/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 7f02b56..c852306 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
-import java.io.*;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
@@ -48,7 +47,26 @@ import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.io.File;
import java.io.FileInputStream;
-import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.OutputStreamWriter;
+import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.FileOutputStream;
+import java.io.BufferedReader;
+import java.io.Serializable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.util.UUID;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -99,9 +117,9 @@ public class Utils {
Object ret = ois.readObject();
ois.close();
return (T)ret;
- } catch(IOException ioe) {
+ } catch (IOException ioe) {
throw new RuntimeException(ioe);
- } catch(ClassNotFoundException e) {
+ } catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@@ -155,9 +173,9 @@ public class Utils {
Object ret = JSONValue.parseWithException(in);
in.close();
return (Map<String,Object>)ret;
- } catch(IOException ioe) {
+ } catch (IOException ioe) {
throw new RuntimeException(ioe);
- } catch(ParseException e) {
+ } catch (ParseException e) {
throw new RuntimeException(e);
}
}
@@ -186,7 +204,7 @@ public class Utils {
try {
Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
List<URL> ret = new ArrayList<URL>();
- while(resources.hasMoreElements()) {
+ while (resources.hasMoreElements()) {
ret.add(resources.nextElement());
}
return ret;
@@ -259,7 +277,7 @@ public class Utils {
public static Map findAndReadConfigFile(String name) {
- return findAndReadConfigFile(name, true);
+ return findAndReadConfigFile(name, true);
}
public static Map readDefaultConfig() {
@@ -269,7 +287,7 @@ public class Utils {
public static Map readCommandLineOpts() {
Map ret = new HashMap();
String commandOptions = System.getProperty("storm.options");
- if(commandOptions != null) {
+ if (commandOptions != null) {
String[] configs = commandOptions.split(",");
for (String config : configs) {
config = URLDecoder.decode(config);
@@ -292,7 +310,7 @@ public class Utils {
Map ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map storm;
- if (confFile==null || confFile.equals("")) {
+ if (confFile == null || confFile.equals("")) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
storm = findAndReadConfigFile(confFile, true);
@@ -303,24 +321,24 @@ public class Utils {
}
private static Object normalizeConf(Object conf) {
- if(conf==null) return new HashMap();
- if(conf instanceof Map) {
+ if (conf == null) return new HashMap();
+ if (conf instanceof Map) {
Map confMap = new HashMap((Map) conf);
- for(Object key: confMap.keySet()) {
+ for (Object key : confMap.keySet()) {
Object val = confMap.get(key);
confMap.put(key, normalizeConf(val));
}
return confMap;
- } else if(conf instanceof List) {
+ } else if (conf instanceof List) {
List confList = new ArrayList((List) conf);
- for(int i=0; i<confList.size(); i++) {
+ for (int i = 0; i < confList.size(); i++) {
Object val = confList.get(i);
confList.set(i, normalizeConf(val));
}
return confList;
} else if (conf instanceof Integer) {
return ((Integer) conf).longValue();
- } else if(conf instanceof Float) {
+ } else if (conf instanceof Float) {
return ((Float) conf).doubleValue();
} else {
return conf;
@@ -332,9 +350,9 @@ public class Utils {
}
public static Object getSetComponentObject(ComponentObject obj) {
- if(obj.getSetField()==ComponentObject._Fields.SERIALIZED_JAVA) {
+ if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) {
return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
- } else if(obj.getSetField()==ComponentObject._Fields.JAVA_OBJECT) {
+ } else if (obj.getSetField() == ComponentObject._Fields.JAVA_OBJECT) {
return obj.get_java_object();
} else {
return obj.get_shell();
@@ -343,7 +361,7 @@ public class Utils {
public static <S, T> T get(Map<S, T> m, S key, T def) {
T ret = m.get(key);
- if(ret==null) {
+ if (ret == null) {
ret = def;
}
return ret;
@@ -351,7 +369,7 @@ public class Utils {
public static List<Object> tuple(Object... values) {
List<Object> ret = new ArrayList<Object>();
- for(Object v: values) {
+ for (Object v : values) {
ret.add(v);
}
return ret;
@@ -360,18 +378,18 @@ public class Utils {
public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
- download(client, file, localFile);
+ download(client, file, localFile);
} finally {
- client.close();
+ client.close();
}
}
public static void downloadFromHost(Map conf, String file, String localFile, String host, int port) throws IOException, TException, AuthorizationException {
NimbusClient client = new NimbusClient (conf, host, port, null);
try {
- download(client, file, localFile);
+ download(client, file, localFile);
} finally {
- client.close();
+ client.close();
}
}
@@ -379,21 +397,21 @@ public class Utils {
WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile));
try {
String id = client.getClient().beginFileDownload(file);
- while(true) {
- ByteBuffer chunk = client.getClient().downloadChunk(id);
- int written = out.write(chunk);
- if(written==0) break;
- }
+ while (true) {
+ ByteBuffer chunk = client.getClient().downloadChunk(id);
+ int written = out.write(chunk);
+ if (written == 0) break;
+ }
} finally {
- out.close();
+ out.close();
}
}
public static IFn loadClojureFn(String namespace, String name) {
try {
- clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
+ clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
} catch (Exception e) {
- //if playing from the repl and defining functions, file won't exist
+ //if playing from the repl and defining functions, file won't exist
}
return (IFn) RT.var(namespace, name).deref();
}
@@ -404,52 +422,52 @@ public class Utils {
public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
Map<V, K> ret = new HashMap<V, K>();
- for(K key: map.keySet()) {
+ for (K key : map.keySet()) {
ret.put(map.get(key), key);
}
return ret;
}
public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
- if(topology.get_spouts().containsKey(id)) {
+ if (topology.get_spouts().containsKey(id)) {
return topology.get_spouts().get(id).get_common();
}
- if(topology.get_bolts().containsKey(id)) {
+ if (topology.get_bolts().containsKey(id)) {
return topology.get_bolts().get(id).get_common();
}
- if(topology.get_state_spouts().containsKey(id)) {
+ if (topology.get_state_spouts().containsKey(id)) {
return topology.get_state_spouts().get(id).get_common();
}
throw new IllegalArgumentException("Could not find component with id " + id);
}
public static Integer getInt(Object o) {
- Integer result = getInt(o, null);
- if (null == result) {
- throw new IllegalArgumentException("Don't know how to convert null to int");
- }
- return result;
+ Integer result = getInt(o, null);
+ if (null == result) {
+ throw new IllegalArgumentException("Don't know how to convert null to int");
+ }
+ return result;
}
public static Integer getInt(Object o, Integer defaultValue) {
- if (null == o) {
- return defaultValue;
- }
-
- if (o instanceof Integer ||
- o instanceof Short ||
- o instanceof Byte) {
- return ((Number) o).intValue();
- } else if (o instanceof Long) {
- final long l = (Long) o;
- if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
- return (int) l;
- }
- } else if (o instanceof String) {
- return Integer.parseInt((String) o);
- }
-
- throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
+ if (null == o) {
+ return defaultValue;
+ }
+
+ if (o instanceof Integer ||
+ o instanceof Short ||
+ o instanceof Byte) {
+ return ((Number) o).intValue();
+ } else if (o instanceof Long) {
+ final long l = (Long) o;
+ if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
+ return (int) l;
+ }
+ } else if (o instanceof String) {
+ return Integer.parseInt((String) o);
+ }
+
+ throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
public static Double getDouble(Object o) {
@@ -472,15 +490,14 @@ public class Utils {
}
public static boolean getBoolean(Object o, boolean defaultValue) {
- if (null == o) {
- return defaultValue;
- }
-
- if(o instanceof Boolean) {
- return (Boolean) o;
- } else {
- throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
- }
+ if (null == o) {
+ return defaultValue;
+ }
+ if (o instanceof Boolean) {
+ return (Boolean) o;
+ } else {
+ throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
+ }
}
public static long secureRandomLong() {
@@ -493,7 +510,7 @@ public class Utils {
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
List<String> serverPorts = new ArrayList<String>();
- for(String zkServer: (List<String>) servers) {
+ for (String zkServer : (List<String>) servers) {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
@@ -507,14 +524,14 @@ public class Utils {
protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
{
builder.connectString(zkStr)
- .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
- .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
- .retryPolicy(new StormBoundedExponentialBackoffRetry(
+ .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+ .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+ .retryPolicy(new StormBoundedExponentialBackoffRetry(
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
- if(auth!=null && auth.scheme!=null && auth.payload!=null) {
+ if (auth != null && auth.scheme != null && auth.payload != null) {
builder = builder.authorization(auth.scheme, auth.payload);
}
}
@@ -535,28 +552,13 @@ public class Utils {
return ret;
}
- /**
- *
-(defn integer-divided [sum num-pieces]
- (let [base (int (/ sum num-pieces))
- num-inc (mod sum num-pieces)
- num-bases (- num-pieces num-inc)]
- (if (= num-inc 0)
- {base num-bases}
- {base num-bases (inc base) num-inc}
- )))
- * @param sum
- * @param numPieces
- * @return
- */
-
public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) {
int base = sum / numPieces;
int numInc = sum % numPieces;
int numBases = numPieces - numInc;
TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
ret.put(base, numBases);
- if(numInc!=0) {
+ if (numInc != 0) {
ret.put(base+1, numInc);
}
return ret;
@@ -572,7 +574,7 @@ public class Utils {
try {
BufferedReader r = new BufferedReader(new InputStreamReader(in));
String line = null;
- while ((line = r.readLine())!= null) {
+ while ((line = r.readLine()) != null) {
LOG.info("{}:{}", prefix, line);
}
} catch (IOException e) {
@@ -582,8 +584,8 @@ public class Utils {
public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
Throwable t = throwable;
- while(t != null) {
- if(klass.isInstance(t)) {
+ while (t != null) {
+ if (klass.isInstance(t)) {
return true;
}
t = t.getCause();
@@ -599,9 +601,9 @@ public class Utils {
*/
public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
return null != System.getProperty("java.security.auth.login.config")
- || (conf != null
+ || (conf != null
&& conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
- && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
+ && !((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
}
/**
@@ -612,7 +614,7 @@ public class Utils {
public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
return (conf != null
&& conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
- && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
+ && !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}
public static List<ACL> getWorkerACL(Map conf) {
@@ -622,37 +624,37 @@ public class Utils {
}
String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
if (stormZKUser == null) {
- throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+ throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
}
String[] split = stormZKUser.split(":",2);
if (split.length != 2) {
- throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+ throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
}
ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
return ret;
}
- public static String threadDump() {
- final StringBuilder dump = new StringBuilder();
- final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
- final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
- for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
- dump.append('"');
- dump.append(threadInfo.getThreadName());
- dump.append("\" ");
- final Thread.State state = threadInfo.getThreadState();
- dump.append("\n java.lang.Thread.State: ");
- dump.append(state);
- final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
- for (final StackTraceElement stackTraceElement : stackTraceElements) {
- dump.append("\n at ");
- dump.append(stackTraceElement);
- }
- dump.append("\n\n");
- }
- return dump.toString();
- }
+ public static String threadDump() {
+ final StringBuilder dump = new StringBuilder();
+ final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
+ final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+ for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
+ dump.append('"');
+ dump.append(threadInfo.getThreadName());
+ dump.append("\" ");
+ final Thread.State state = threadInfo.getThreadState();
+ dump.append("\n java.lang.Thread.State: ");
+ dump.append(state);
+ final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+ for (final StackTraceElement stackTraceElement : stackTraceElements) {
+ dump.append("\n at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append("\n\n");
+ }
+ return dump.toString();
+ }
// Assumes caller is synchronizing
private static SerializationDelegate getSerializationDelegate(Map stormConf) {
@@ -675,20 +677,20 @@ public class Utils {
return delegate;
}
- public static void handleUncaughtException(Throwable t) {
- if (t != null && t instanceof Error) {
- if (t instanceof OutOfMemoryError) {
- try {
- System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
- } catch (Throwable err) {
- //Again we don't want to exit because of logging issues.
+ public static void handleUncaughtException(Throwable t) {
+ if (t != null && t instanceof Error) {
+ if (t instanceof OutOfMemoryError) {
+ try {
+ System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
+ } catch (Throwable err) {
+ //Again we don't want to exit because of logging issues.
+ }
+ Runtime.getRuntime().halt(-1);
+ } else {
+ //Running in daemon mode, we would pass Error to calling thread.
+ throw (Error) t;
+ }
}
- Runtime.getRuntime().halt(-1);
- } else {
- //Running in daemon mode, we would pass Error to calling thread.
- throw (Error) t;
- }
}
- }
}
[2/3] storm git commit: Merge branch '1122' of
https://github.com/zhuoliu/storm into STORM-1122
Posted by ka...@apache.org.
Merge branch '1122' of https://github.com/zhuoliu/storm into STORM-1122
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6177ae4f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6177ae4f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6177ae4f
Branch: refs/heads/master
Commit: 6177ae4f8cb3f6b71cf7457ace249f11946bcacb
Parents: ff3b8af 8161bea
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Oct 24 10:39:56 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 10:39:56 2015 +0900
----------------------------------------------------------------------
.../src/jvm/backtype/storm/utils/Utils.java | 262 ++++++++++---------
1 file changed, 132 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: add STORM-1122 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1122 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e93015c6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e93015c6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e93015c6
Branch: refs/heads/master
Commit: e93015c642267484cc65fa7c0a3c205f90673f14
Parents: 6177ae4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Oct 24 11:55:42 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 11:55:42 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e93015c6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8adfa0e..431148a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1122: Fix the format issue in Utils.java
* STORM-1111: Fix Validation for lots of different configs
* STORM-1125: Adding separate ZK client for read in Nimbus ZK State
* STORM-1121: Remove method call to avoid overhead during topology submission time