You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/24 05:30:59 UTC

[1/3] storm git commit: [STORM-1122] Fix the format issue in Utils.java

Repository: storm
Updated Branches:
  refs/heads/master ff3b8affa -> e93015c64


[STORM-1122] Fix the format issue in Utils.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8161beab
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8161beab
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8161beab

Branch: refs/heads/master
Commit: 8161beab46b5d489aa37db524b980c1676dd08fe
Parents: b0f3be0
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Oct 22 12:43:37 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Oct 22 12:49:36 2015 -0500

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/utils/Utils.java     | 262 ++++++++++---------
 1 file changed, 132 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8161beab/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 7f02b56..c852306 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.SafeConstructor;
 
-import java.io.*;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.ByteBuffer;
@@ -48,7 +47,26 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.io.File;
 import java.io.FileInputStream;
-import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.OutputStreamWriter;
+import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.FileOutputStream;
+import java.io.BufferedReader;
+import java.io.Serializable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.util.UUID;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -99,9 +117,9 @@ public class Utils {
             Object ret = ois.readObject();
             ois.close();
             return (T)ret;
-        } catch(IOException ioe) {
+        } catch (IOException ioe) {
             throw new RuntimeException(ioe);
-        } catch(ClassNotFoundException e) {
+        } catch (ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
     }
@@ -155,9 +173,9 @@ public class Utils {
             Object ret = JSONValue.parseWithException(in);
             in.close();
             return (Map<String,Object>)ret;
-        } catch(IOException ioe) {
+        } catch (IOException ioe) {
             throw new RuntimeException(ioe);
-        } catch(ParseException e) {
+        } catch (ParseException e) {
             throw new RuntimeException(e);
         }
     }
@@ -186,7 +204,7 @@ public class Utils {
         try {
             Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
             List<URL> ret = new ArrayList<URL>();
-            while(resources.hasMoreElements()) {
+            while (resources.hasMoreElements()) {
                 ret.add(resources.nextElement());
             }
             return ret;
@@ -259,7 +277,7 @@ public class Utils {
 
 
     public static Map findAndReadConfigFile(String name) {
-       return findAndReadConfigFile(name, true);
+        return findAndReadConfigFile(name, true);
     }
 
     public static Map readDefaultConfig() {
@@ -269,7 +287,7 @@ public class Utils {
     public static Map readCommandLineOpts() {
         Map ret = new HashMap();
         String commandOptions = System.getProperty("storm.options");
-        if(commandOptions != null) {
+        if (commandOptions != null) {
             String[] configs = commandOptions.split(",");
             for (String config : configs) {
                 config = URLDecoder.decode(config);
@@ -292,7 +310,7 @@ public class Utils {
         Map ret = readDefaultConfig();
         String confFile = System.getProperty("storm.conf.file");
         Map storm;
-        if (confFile==null || confFile.equals("")) {
+        if (confFile == null || confFile.equals("")) {
             storm = findAndReadConfigFile("storm.yaml", false);
         } else {
             storm = findAndReadConfigFile(confFile, true);
@@ -303,24 +321,24 @@ public class Utils {
     }
 
     private static Object normalizeConf(Object conf) {
-        if(conf==null) return new HashMap();
-        if(conf instanceof Map) {
+        if (conf == null) return new HashMap();
+        if (conf instanceof Map) {
             Map confMap = new HashMap((Map) conf);
-            for(Object key: confMap.keySet()) {
+            for (Object key : confMap.keySet()) {
                 Object val = confMap.get(key);
                 confMap.put(key, normalizeConf(val));
             }
             return confMap;
-        } else if(conf instanceof List) {
+        } else if (conf instanceof List) {
             List confList =  new ArrayList((List) conf);
-            for(int i=0; i<confList.size(); i++) {
+            for (int i = 0; i < confList.size(); i++) {
                 Object val = confList.get(i);
                 confList.set(i, normalizeConf(val));
             }
             return confList;
         } else if (conf instanceof Integer) {
             return ((Integer) conf).longValue();
-        } else if(conf instanceof Float) {
+        } else if (conf instanceof Float) {
             return ((Float) conf).doubleValue();
         } else {
             return conf;
@@ -332,9 +350,9 @@ public class Utils {
     }
 
     public static Object getSetComponentObject(ComponentObject obj) {
-        if(obj.getSetField()==ComponentObject._Fields.SERIALIZED_JAVA) {
+        if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) {
             return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
-        } else if(obj.getSetField()==ComponentObject._Fields.JAVA_OBJECT) {
+        } else if (obj.getSetField() == ComponentObject._Fields.JAVA_OBJECT) {
             return obj.get_java_object();
         } else {
             return obj.get_shell();
@@ -343,7 +361,7 @@ public class Utils {
 
     public static <S, T> T get(Map<S, T> m, S key, T def) {
         T ret = m.get(key);
-        if(ret==null) {
+        if (ret == null) {
             ret = def;
         }
         return ret;
@@ -351,7 +369,7 @@ public class Utils {
 
     public static List<Object> tuple(Object... values) {
         List<Object> ret = new ArrayList<Object>();
-        for(Object v: values) {
+        for (Object v : values) {
             ret.add(v);
         }
         return ret;
@@ -360,18 +378,18 @@ public class Utils {
     public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException {
         NimbusClient client = NimbusClient.getConfiguredClient(conf);
         try {
-        	download(client, file, localFile);
+            download(client, file, localFile);
         } finally {
-        	client.close();
+            client.close();
         }
     }
 
     public static void downloadFromHost(Map conf, String file, String localFile, String host, int port) throws IOException, TException, AuthorizationException {
         NimbusClient client = new NimbusClient (conf, host, port, null);
         try {
-        	download(client, file, localFile);
+            download(client, file, localFile);
         } finally {
-        	client.close();
+            client.close();
         }
     }
 
@@ -379,21 +397,21 @@ public class Utils {
         WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile));
         try {
             String id = client.getClient().beginFileDownload(file);
-	        while(true) {
-	            ByteBuffer chunk = client.getClient().downloadChunk(id);
-	            int written = out.write(chunk);
-	            if(written==0) break;
-	        }
+            while (true) {
+                ByteBuffer chunk = client.getClient().downloadChunk(id);
+                int written = out.write(chunk);
+                if (written == 0) break;
+            }
         } finally {
-        	out.close();
+            out.close();
         }
     }
 
     public static IFn loadClojureFn(String namespace, String name) {
         try {
-          clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
+            clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
         } catch (Exception e) {
-          //if playing from the repl and defining functions, file won't exist
+            //if playing from the repl and defining functions, file won't exist
         }
         return (IFn) RT.var(namespace, name).deref();
     }
@@ -404,52 +422,52 @@ public class Utils {
 
     public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
         Map<V, K> ret = new HashMap<V, K>();
-        for(K key: map.keySet()) {
+        for (K key : map.keySet()) {
             ret.put(map.get(key), key);
         }
         return ret;
     }
 
     public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
-        if(topology.get_spouts().containsKey(id)) {
+        if (topology.get_spouts().containsKey(id)) {
             return topology.get_spouts().get(id).get_common();
         }
-        if(topology.get_bolts().containsKey(id)) {
+        if (topology.get_bolts().containsKey(id)) {
             return topology.get_bolts().get(id).get_common();
         }
-        if(topology.get_state_spouts().containsKey(id)) {
+        if (topology.get_state_spouts().containsKey(id)) {
             return topology.get_state_spouts().get(id).get_common();
         }
         throw new IllegalArgumentException("Could not find component with id " + id);
     }
 
     public static Integer getInt(Object o) {
-      Integer result = getInt(o, null);
-      if (null == result) {
-        throw new IllegalArgumentException("Don't know how to convert null to int");
-      }
-      return result;
+        Integer result = getInt(o, null);
+        if (null == result) {
+            throw new IllegalArgumentException("Don't know how to convert null to int");
+        }
+        return result;
     }
 
     public static Integer getInt(Object o, Integer defaultValue) {
-      if (null == o) {
-        return defaultValue;
-      }
-
-      if (o instanceof Integer ||
-          o instanceof Short ||
-          o instanceof Byte) {
-          return ((Number) o).intValue();
-      } else if (o instanceof Long) {
-          final long l = (Long) o;
-          if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
-              return (int) l;
-          }
-      } else if (o instanceof String) {
-          return Integer.parseInt((String) o);
-      }
-
-      throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
+        if (null == o) {
+            return defaultValue;
+        }
+
+        if (o instanceof Integer ||
+                o instanceof Short ||
+                o instanceof Byte) {
+            return ((Number) o).intValue();
+        } else if (o instanceof Long) {
+            final long l = (Long) o;
+            if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
+                return (int) l;
+            }
+        } else if (o instanceof String) {
+            return Integer.parseInt((String) o);
+        }
+
+        throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
     }
 
     public static Double getDouble(Object o) {
@@ -472,15 +490,14 @@ public class Utils {
     }
 
     public static boolean getBoolean(Object o, boolean defaultValue) {
-      if (null == o) {
-        return defaultValue;
-      }
-
-      if(o instanceof Boolean) {
-          return (Boolean) o;
-      } else {
-          throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
-      }
+        if (null == o) {
+            return defaultValue;
+        }
+        if (o instanceof Boolean) {
+            return (Boolean) o;
+        } else {
+            throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
+        }
     }
 
     public static long secureRandomLong() {
@@ -493,7 +510,7 @@ public class Utils {
 
     public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
         List<String> serverPorts = new ArrayList<String>();
-        for(String zkServer: (List<String>) servers) {
+        for (String zkServer : (List<String>) servers) {
             serverPorts.add(zkServer + ":" + Utils.getInt(port));
         }
         String zkStr = StringUtils.join(serverPorts, ",") + root;
@@ -507,14 +524,14 @@ public class Utils {
     protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
     {
         builder.connectString(zkStr)
-            .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
-            .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
-            .retryPolicy(new StormBoundedExponentialBackoffRetry(
+                .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+                .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+                .retryPolicy(new StormBoundedExponentialBackoffRetry(
                         Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
                         Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)),
                         Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
 
-        if(auth!=null && auth.scheme!=null && auth.payload!=null) {
+        if (auth != null && auth.scheme != null && auth.payload != null) {
             builder = builder.authorization(auth.scheme, auth.payload);
         }
     }
@@ -535,28 +552,13 @@ public class Utils {
         return ret;
     }
 
-    /**
-     *
-(defn integer-divided [sum num-pieces]
-  (let [base (int (/ sum num-pieces))
-        num-inc (mod sum num-pieces)
-        num-bases (- num-pieces num-inc)]
-    (if (= num-inc 0)
-      {base num-bases}
-      {base num-bases (inc base) num-inc}
-      )))
-     * @param sum
-     * @param numPieces
-     * @return
-     */
-
     public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) {
         int base = sum / numPieces;
         int numInc = sum % numPieces;
         int numBases = numPieces - numInc;
         TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
         ret.put(base, numBases);
-        if(numInc!=0) {
+        if (numInc != 0) {
             ret.put(base+1, numInc);
         }
         return ret;
@@ -572,7 +574,7 @@ public class Utils {
         try {
             BufferedReader r = new BufferedReader(new InputStreamReader(in));
             String line = null;
-            while ((line = r.readLine())!= null) {
+            while ((line = r.readLine()) != null) {
                 LOG.info("{}:{}", prefix, line);
             }
         } catch (IOException e) {
@@ -582,8 +584,8 @@ public class Utils {
 
     public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
         Throwable t = throwable;
-        while(t != null) {
-            if(klass.isInstance(t)) {
+        while (t != null) {
+            if (klass.isInstance(t)) {
                 return true;
             }
             t = t.getCause();
@@ -599,9 +601,9 @@ public class Utils {
      */
     public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
         return null != System.getProperty("java.security.auth.login.config")
-            || (conf != null
+                || (conf != null
                 && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
-                && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
+                && !((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
     }
 
     /**
@@ -612,7 +614,7 @@ public class Utils {
     public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
         return (conf != null
                 && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
-                && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
+                && !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
     }
 
     public static List<ACL> getWorkerACL(Map conf) {
@@ -622,37 +624,37 @@ public class Utils {
         }
         String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
         if (stormZKUser == null) {
-           throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+            throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
         }
         String[] split = stormZKUser.split(":",2);
         if (split.length != 2) {
-          throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+            throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
         }
         ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
         ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
         return ret;
     }
 
-   public static String threadDump() {
-       final StringBuilder dump = new StringBuilder();
-       final java.lang.management.ThreadMXBean threadMXBean =  java.lang.management.ManagementFactory.getThreadMXBean();
-       final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
-       for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
-           dump.append('"');
-           dump.append(threadInfo.getThreadName());
-           dump.append("\" ");
-           final Thread.State state = threadInfo.getThreadState();
-           dump.append("\n   java.lang.Thread.State: ");
-           dump.append(state);
-           final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
-           for (final StackTraceElement stackTraceElement : stackTraceElements) {
-               dump.append("\n        at ");
-               dump.append(stackTraceElement);
-           }
-           dump.append("\n\n");
-       }
-       return dump.toString();
-   }
+    public static String threadDump() {
+        final StringBuilder dump = new StringBuilder();
+        final java.lang.management.ThreadMXBean threadMXBean =  java.lang.management.ManagementFactory.getThreadMXBean();
+        final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+        for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
+            dump.append('"');
+            dump.append(threadInfo.getThreadName());
+            dump.append("\" ");
+            final Thread.State state = threadInfo.getThreadState();
+            dump.append("\n   java.lang.Thread.State: ");
+            dump.append(state);
+            final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+            for (final StackTraceElement stackTraceElement : stackTraceElements) {
+                dump.append("\n        at ");
+                dump.append(stackTraceElement);
+            }
+            dump.append("\n\n");
+        }
+        return dump.toString();
+    }
 
     // Assumes caller is synchronizing
     private static SerializationDelegate getSerializationDelegate(Map stormConf) {
@@ -675,20 +677,20 @@ public class Utils {
         return delegate;
     }
 
-  public static void handleUncaughtException(Throwable t) {
-    if (t != null && t instanceof Error) {
-      if (t instanceof OutOfMemoryError) {
-        try {
-          System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
-        } catch (Throwable err) {
-          //Again we don't want to exit because of logging issues.
+    public static void handleUncaughtException(Throwable t) {
+        if (t != null && t instanceof Error) {
+            if (t instanceof OutOfMemoryError) {
+                try {
+                    System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
+                } catch (Throwable err) {
+                    //Again we don't want to exit because of logging issues.
+                }
+                Runtime.getRuntime().halt(-1);
+            } else {
+                //Running in daemon mode, we would pass Error to calling thread.
+                throw (Error) t;
+            }
         }
-        Runtime.getRuntime().halt(-1);
-      } else {
-        //Running in daemon mode, we would pass Error to calling thread.
-        throw (Error) t;
-      }
     }
-  }
 }
 


[2/3] storm git commit: Merge branch '1122' of https://github.com/zhuoliu/storm into STORM-1122

Posted by ka...@apache.org.
Merge branch '1122' of https://github.com/zhuoliu/storm into STORM-1122


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6177ae4f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6177ae4f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6177ae4f

Branch: refs/heads/master
Commit: 6177ae4f8cb3f6b71cf7457ace249f11946bcacb
Parents: ff3b8af 8161bea
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Oct 24 10:39:56 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 10:39:56 2015 +0900

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/utils/Utils.java     | 262 ++++++++++---------
 1 file changed, 132 insertions(+), 130 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: add STORM-1122 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1122 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e93015c6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e93015c6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e93015c6

Branch: refs/heads/master
Commit: e93015c642267484cc65fa7c0a3c205f90673f14
Parents: 6177ae4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Oct 24 11:55:42 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 11:55:42 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e93015c6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8adfa0e..431148a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1122: Fix the format issue in Utils.java
  * STORM-1111: Fix Validation for lots of different configs
  * STORM-1125: Adding separate ZK client for read in Nimbus ZK State
  * STORM-1121: Remove method call to avoid overhead during topology submission time