You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:40 UTC

[02/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java
index 6000688..59e14d9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java
@@ -43,12 +43,10 @@ import com.alibaba.jstorm.cluster.StormConfig;
  */
 public class JStormServerUtils {
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(JStormServerUtils.class);
+    private static final Logger LOG = LoggerFactory.getLogger(JStormServerUtils.class);
 
-    public static void downloadCodeFromMaster(Map conf, String localRoot,
-            String masterCodeDir, String topologyId, boolean isSupervisor)
-            throws IOException, TException {
+    public static void downloadCodeFromMaster(Map conf, String localRoot, String masterCodeDir, String topologyId, boolean isSupervisor) throws IOException,
+            TException {
         FileUtils.forceMkdir(new File(localRoot));
         FileUtils.forceMkdir(new File(StormConfig.stormlib_path(localRoot)));
 
@@ -64,25 +62,18 @@ public class JStormServerUtils {
         String masterStormConfPath = StormConfig.stormconf_path(masterCodeDir);
         Utils.downloadFromMaster(conf, masterStormConfPath, localStormConfPath);
 
-        Map stormConf =
-                (Map) StormConfig.readLocalObject(topologyId,
-                        localStormConfPath);
+        Map stormConf = (Map) StormConfig.readLocalObject(topologyId, localStormConfPath);
 
         if (stormConf == null)
             throw new IOException("Get topology conf error: " + topologyId);
 
-        List<String> libs =
-                (List<String>) stormConf
-                        .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
+        List<String> libs = (List<String>) stormConf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
         if (libs == null)
             return;
         for (String libName : libs) {
-            String localStromLibPath =
-                    StormConfig.stormlib_path(localRoot, libName);
-            String masterStormLibPath =
-                    StormConfig.stormlib_path(masterCodeDir, libName);
-            Utils.downloadFromMaster(conf, masterStormLibPath,
-                    localStromLibPath);
+            String localStromLibPath = StormConfig.stormlib_path(localRoot, libName);
+            String masterStormLibPath = StormConfig.stormlib_path(masterCodeDir, libName);
+            Utils.downloadFromMaster(conf, masterStormLibPath, localStromLibPath);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
index 983f579..ad56815 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
@@ -53,6 +53,7 @@ import org.apache.commons.exec.ExecuteException;
 import org.apache.commons.exec.ExecuteResultHandler;
 import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.TFieldIdEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,11 +68,9 @@ import com.alibaba.jstorm.client.ConfigExtension;
  * JStorm utility
  * 
  * @author yannian/Longda/Xin.Zhou/Xin.Li
- * 
  */
 public class JStormUtils {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(JStormUtils.class);
+    private static final Logger LOG = LoggerFactory.getLogger(JStormUtils.class);
 
     public static long SIZE_1_K = 1024;
     public static long SIZE_1_M = SIZE_1_K * 1024;
@@ -223,8 +222,7 @@ public class JStormUtils {
     }
 
     /**
-     * Gets the pid of this JVM, because Java doesn't provide a real way to do
-     * this.
+     * Gets the pid of this JVM, because Java doesn't provide a real way to do this.
      * 
      * @return
      */
@@ -238,8 +236,7 @@ public class JStormUtils {
         return split[0];
     }
 
-    public static void exec_command(String command) throws ExecuteException,
-            IOException {
+    public static void exec_command(String command) throws ExecuteException, IOException {
         String[] cmdlist = command.split(" ");
         CommandLine cmd = new CommandLine(cmdlist[0]);
         for (int i = 1; i < cmdlist.length; i++) {
@@ -257,14 +254,12 @@ public class JStormUtils {
      * @param dir
      * @param destdir
      */
-    public static void extract_dir_from_jar(String jarpath, String dir,
-            String destdir) {
+    public static void extract_dir_from_jar(String jarpath, String dir, String destdir) {
         String cmd = "unzip -qq " + jarpath + " " + dir + "/** -d " + destdir;
         try {
             exec_command(cmd);
         } catch (Exception e) {
-            LOG.warn("No " + dir + " from " + jarpath + " by cmd:" + cmd
-                    + "!\n" + e.getMessage());
+            LOG.warn("No " + dir + " from " + jarpath + " by cmd:" + cmd + "!\n" + e.getMessage());
         }
 
     }
@@ -278,8 +273,7 @@ public class JStormUtils {
                 LOG.info("kill -9 process " + pid);
                 sleepMs(100);
             } catch (ExecuteException e) {
-                LOG.info("Error when trying to kill " + pid
-                        + ". Process has been killed");
+                LOG.info("Error when trying to kill " + pid + ". Process has been killed");
             } catch (Exception e) {
                 LOG.info("Error when trying to kill " + pid + ".Exception ", e);
             }
@@ -291,8 +285,7 @@ public class JStormUtils {
             exec_command("kill " + pid);
             LOG.info("kill process " + pid);
         } catch (ExecuteException e) {
-            LOG.info("Error when trying to kill " + pid
-                    + ". Process has been killed. ");
+            LOG.info("Error when trying to kill " + pid + ". Process has been killed. ");
         } catch (Exception e) {
             LOG.info("Error when trying to kill " + pid + ".Exception ", e);
         }
@@ -349,7 +342,8 @@ public class JStormUtils {
         String output = null;
         try {
             String pid = JStormUtils.process_pid();
-            output = SystemOperation.exec("top -b -n 1 | grep " + pid);
+            String command = String.format("top -b -n 1 -p %s | grep -w %s", pid, pid);
+            output = SystemOperation.exec(command);
             String subStr = output.substring(output.indexOf("S") + 1);
             for (int i = 0; i < subStr.length(); i++) {
                 char ch = subStr.charAt(i);
@@ -369,64 +363,89 @@ public class JStormUtils {
 
         return value;
     }
-    
+
+    public static Double getDiskUsage() {
+        if (!OSInfo.isLinux() && !OSInfo.isMac()) {
+            return 0.0;
+        }
+        try {
+            String output = SystemOperation.exec("df -h /");
+            if (output != null) {
+                String[] lines = output.split("[\\r\\n]+");
+                if (lines.length >= 2) {
+                    String[] parts = lines[1].split("\\s+");
+                    if (parts.length >= 5) {
+                        String pct = parts[4];
+                        if (pct.endsWith("%")) {
+                            return Integer.valueOf(pct.substring(0, pct.length() - 1)) / 100.0;
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to get disk usage:", e);
+        }
+        return 0.0;
+    }
+
     public static Double getMemUsage() {
-    	if (OSInfo.isLinux() == true) {
-    		try {
-    			Double value = 0.0;
+        if (OSInfo.isLinux() == true) {
+            try {
+                Double value = 0.0;
                 String pid = JStormUtils.process_pid();
-                String output = SystemOperation.exec("top -b -n 1 | grep " + pid);
-                
-                int m = 0;  
-                String[] strArray = output.split(" ");  
-                for (int i = 0; i < strArray.length; i++) {  
-                    String info = strArray[i];  
-                    if (info.trim().length() == 0){  
-                        continue;  
-                    }  
-                    if(m == 5) {
-                    	// memory
-                        String unit = info.substring(info.length() - 1); 
-                        
-                        if(unit.equalsIgnoreCase("g")) {  
-                            value =  Double.parseDouble(info.substring(0, info.length() - 1));
+                String command = String.format("top -b -n 1 -p %s | grep -w %s", pid, pid);
+                String output = SystemOperation.exec(command);
+
+                int m = 0;
+                String[] strArray = output.split(" ");
+                for (int i = 0; i < strArray.length; i++) {
+                    String info = strArray[i];
+                    if (info.trim().length() == 0) {
+                        continue;
+                    }
+                    if (m == 5) {
+                        // memory
+                        String unit = info.substring(info.length() - 1);
+
+                        if (unit.equalsIgnoreCase("g")) {
+                            value = Double.parseDouble(info.substring(0, info.length() - 1));
                             value *= 1000000000;
-                        } else if(unit.equalsIgnoreCase("m")) {  
-                        	value =  Double.parseDouble(info.substring(0, info.length() - 1)); 
-                        	value *= 1000000;
-                        } else {  
-                        	value =  Double.parseDouble(info);  
-                        } 
+                        } else if (unit.equalsIgnoreCase("m")) {
+                            value = Double.parseDouble(info.substring(0, info.length() - 1));
+                            value *= 1000000;
+                        } else {
+                            value = Double.parseDouble(info);
+                        }
+                        
+                        //LOG.info("!!!! Get Memory Size:{}, info:{}", value, info);
                         return value;
-                    }  
-                    if(m == 8) {
-                    	// cpu usage
-                          
-                    }  
-                    if(m == 9) {
-                    	// memory ratio
-                         
-                    }  
-                    m++;  
-                }  
+                    }
+                    if (m == 8) {
+                        // cpu usage
+
+                    }
+                    if (m == 9) {
+                        // memory ratio
+
+                    }
+                    m++;
+                }
             } catch (Exception e) {
                 LOG.warn("Failed to get memory usage .");
 
             }
-    	}
-    	
-    	// this will be incorrect
-		MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();    
+        }
+
+        // this will be incorrect
+        MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
         MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
 
         return Double.valueOf(memoryUsage.getUsed());
     }
 
     /**
-     * If it is backend, please set resultHandler, such as
-     * DefaultExecuteResultHandler If it is frontend,
-     * ByteArrayOutputStream.toString get the result
-     * 
+     * If it is backend, please set resultHandler, such as DefaultExecuteResultHandler If it is frontend, ByteArrayOutputStream.toString get the result
+     * <p/>
      * This function don't care whether the command is successfully or not
      * 
      * @param command
@@ -436,9 +455,8 @@ public class JStormUtils {
      * @return
      * @throws IOException
      */
-    public static ByteArrayOutputStream launchProcess(String command,
-            final Map environment, final String workDir,
-            ExecuteResultHandler resultHandler) throws IOException {
+    public static ByteArrayOutputStream launchProcess(String command, final Map environment, final String workDir, ExecuteResultHandler resultHandler)
+            throws IOException {
 
         String[] cmdlist = command.split(" ");
 
@@ -479,8 +497,7 @@ public class JStormUtils {
 
     }
 
-    protected static java.lang.Process launchProcess(final String[] cmdlist,
-            final Map<String, String> environment) throws IOException {
+    protected static Process launchProcess(final String[] cmdlist, final Map<String, String> environment) throws IOException {
         ArrayList<String> buff = new ArrayList<String>();
         for (String tok : cmdlist) {
             if (!tok.isEmpty()) {
@@ -499,33 +516,26 @@ public class JStormUtils {
     }
 
     /**
-     * @@@ it should use DefaultExecutor to start a process, but some little
-     *     problem have been found, such as exitCode/output string so still use
-     *     the old method to start process
-     * 
      * @param command
      * @param environment
      * @param backend
      * @return
      * @throws IOException
+     * @@@ it should use DefaultExecutor to start a process, but some little problem have been found, such as exitCode/output string so still use the old method
+     *     to start process
      */
-    public static java.lang.Process launch_process(final String command,
-            final Map<String, String> environment, boolean backend)
-            throws IOException {
+    public static Process launch_process(final String command, final Map<String, String> environment, boolean backend) throws IOException {
 
         if (backend == true) {
             new Thread(new Runnable() {
 
                 @Override
                 public void run() {
-                    String[] cmdlist =
-                            (new String("nohup " + command + " &")).split(" ");
+                    String[] cmdlist = (new String("nohup " + command + " &")).split(" ");
                     try {
                         launchProcess(cmdlist, environment);
                     } catch (IOException e) {
-                        LOG.error(
-                                "Failed to run " + command + ":" + e.getCause(),
-                                e);
+                        LOG.error("Failed to run " + command + ":" + e.getCause(), e);
                     }
                 }
             }).start();
@@ -568,9 +578,8 @@ public class JStormUtils {
     }
 
     /**
-     * 
      * if the list exist repeat string, return the repeated string
-     * 
+     * <p/>
      * this function will be used to check wheter bolt or spout exist same id
      * 
      * @param sets
@@ -629,7 +638,7 @@ public class JStormUtils {
         return rtn;
     }
 
-    public static <T> Long bit_xor_vals(java.util.List<T> vals) {
+    public static <T> Long bit_xor_vals(List<T> vals) {
         Long rtn = 0l;
         for (T n : vals) {
             rtn = bit_xor(rtn, n);
@@ -638,7 +647,7 @@ public class JStormUtils {
         return rtn;
     }
 
-    public static <T> Long bit_xor_vals_sets(java.util.Set<T> vals) {
+    public static <T> Long bit_xor_vals_sets(Set<T> vals) {
         Long rtn = 0l;
         for (T n : vals) {
             rtn = bit_xor(rtn, n);
@@ -675,7 +684,7 @@ public class JStormUtils {
         return rtn;
     }
 
-    public static <V> List<V> mk_list(java.util.Set<V> args) {
+    public static <V> List<V> mk_list(Set<V> args) {
         ArrayList<V> rtn = new ArrayList<V>();
         if (args != null) {
             for (V o : args) {
@@ -712,8 +721,7 @@ public class JStormUtils {
         } else if (o instanceof Long) {
             return (Long) o;
         } else {
-            throw new RuntimeException("Invalid value "
-                    + o.getClass().getName() + " " + o);
+            throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
         }
     }
 
@@ -733,8 +741,18 @@ public class JStormUtils {
         } else if (o instanceof Double) {
             return (Double) o;
         } else {
-            throw new RuntimeException("Invalid value "
-                    + o.getClass().getName() + " " + o);
+            throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
+        }
+    }
+
+    public static Double parseDouble(Object o, double defaultValue) {
+        if (o == null) {
+            return defaultValue;
+        }
+        try {
+            return parseDouble(o);
+        } catch (Exception ignored) {
+            return defaultValue;
         }
     }
 
@@ -769,8 +787,7 @@ public class JStormUtils {
         } else if (o instanceof Integer) {
             return (Integer) o;
         } else {
-            throw new RuntimeException("Invalid value "
-                    + o.getClass().getName() + " " + o);
+            throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
         }
     }
 
@@ -791,6 +808,20 @@ public class JStormUtils {
         }
     }
 
+    public static Boolean parseBoolean(Object o) {
+        if (o == null) {
+            return null;
+        }
+
+        if (o instanceof String) {
+            return Boolean.valueOf((String) o);
+        } else if (o instanceof Boolean) {
+            return (Boolean) o;
+        } else {
+            throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
+        }
+    }
+
     public static boolean parseBoolean(Object o, boolean defaultValue) {
         if (o == null) {
             return defaultValue;
@@ -863,8 +894,7 @@ public class JStormUtils {
         } else if (oldValue instanceof BigInteger) {
             return ((BigInteger) oldValue).add((BigInteger) newValue);
         } else if (oldValue instanceof Number) {
-            return ((Number) oldValue).doubleValue()
-                    + ((Number) newValue).doubleValue();
+            return ((Number) oldValue).doubleValue() + ((Number) newValue).doubleValue();
         } else {
             return null;
         }
@@ -933,8 +963,7 @@ public class JStormUtils {
 
     public static String formatSimpleDouble(Double value) {
         try {
-            java.text.DecimalFormat form =
-                    new java.text.DecimalFormat("##0.000");
+            java.text.DecimalFormat form = new java.text.DecimalFormat("##0.000");
             String s = form.format(value);
             return s;
         } catch (Exception e) {
@@ -955,8 +984,7 @@ public class JStormUtils {
 
     public static double formatDoubleDecPoint4(Double value) {
         try {
-            java.text.DecimalFormat form =
-                    new java.text.DecimalFormat("###.0000");
+            java.text.DecimalFormat form = new java.text.DecimalFormat("###.0000");
             String s = form.format(value);
             return Double.valueOf(s);
         } catch (Exception e) {
@@ -1041,18 +1069,13 @@ public class JStormUtils {
     }
 
     /**
-     * @@@ Todo
-     * 
      * @return
+     * @@@ Todo
      */
     public static Long getPhysicMemorySize() {
         Object object;
         try {
-            object =
-                    ManagementFactory.getPlatformMBeanServer().getAttribute(
-                            new ObjectName("java.lang", "type",
-                                    "OperatingSystem"),
-                            "TotalPhysicalMemorySize");
+            object = ManagementFactory.getPlatformMBeanServer().getAttribute(new ObjectName("java.lang", "type", "OperatingSystem"), "TotalPhysicalMemorySize");
         } catch (Exception e) {
             LOG.warn("Failed to get system physical memory size,", e);
             return null;
@@ -1089,19 +1112,15 @@ public class JStormUtils {
 
     public static String getLogFileName() {
         try {
-            Logger rootLogger =
-                    LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
+            Logger rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
             if (rootLogger instanceof ch.qos.logback.classic.Logger) {
-                ch.qos.logback.classic.Logger logbackLogger =
-                        (ch.qos.logback.classic.Logger) rootLogger;
+                ch.qos.logback.classic.Logger logbackLogger = (ch.qos.logback.classic.Logger) rootLogger;
                 // Logger framework is Logback
-                for (Iterator<ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent>> index =
-                        logbackLogger.iteratorForAppenders(); index.hasNext();) {
-                    ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent> appender =
-                            index.next();
+                for (Iterator<ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent>> index = logbackLogger.iteratorForAppenders(); index
+                        .hasNext();) {
+                    ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent> appender = index.next();
                     if (appender instanceof ch.qos.logback.core.FileAppender) {
-                        ch.qos.logback.core.FileAppender fileAppender =
-                                (ch.qos.logback.core.FileAppender) appender;
+                        ch.qos.logback.core.FileAppender fileAppender = (ch.qos.logback.core.FileAppender) appender;
                         return fileAppender.getFile();
                     }
                 }
@@ -1156,8 +1175,7 @@ public class JStormUtils {
 
         FileOutputStream workerOut = new FileOutputStream(new File(file));
 
-        PrintStream ps =
-                new PrintStream(new BufferedOutputStream(workerOut), true);
+        PrintStream ps = new PrintStream(new BufferedOutputStream(workerOut), true);
         System.setOut(ps);
         System.setErr(ps);
 
@@ -1170,13 +1188,11 @@ public class JStormUtils {
         return new AsyncLoopDefaultKill();
     }
 
-    public static TreeMap<Integer, Integer> integer_divided(int sum,
-            int num_pieces) {
+    public static TreeMap<Integer, Integer> integer_divided(int sum, int num_pieces) {
         return Utils.integerDivided(sum, num_pieces);
     }
 
-    public static <K, V> HashMap<K, V> filter_val(RunnableCallback fn,
-            Map<K, V> amap) {
+    public static <K, V> HashMap<K, V> filter_val(RunnableCallback fn, Map<K, V> amap) {
         HashMap<K, V> rtn = new HashMap<K, V>();
 
         for (Entry<K, V> entry : amap.entrySet()) {
@@ -1191,16 +1207,14 @@ public class JStormUtils {
     }
 
     public static List<Integer> getSupervisorPortList(Map conf) {
-        List<Integer> portList =
-                (List<Integer>) conf.get(Config.SUPERVISOR_SLOTS_PORTS);
+        List<Integer> portList = (List<Integer>) conf.get(Config.SUPERVISOR_SLOTS_PORTS);
         if (portList != null && portList.size() > 0) {
             return portList;
         }
 
         LOG.info("Generate port list through CPU cores and system memory size");
 
-        double cpuWeight =
-                ConfigExtension.getSupervisorSlotsPortCpuWeight(conf);
+        double cpuWeight = ConfigExtension.getSupervisorSlotsPortCpuWeight(conf);
         int sysCpuNum = 4;
         try {
             sysCpuNum = Runtime.getRuntime().availableProcessors();
@@ -1211,11 +1225,11 @@ public class JStormUtils {
         int cpuPortNum = (int) (sysCpuNum / cpuWeight);
         if (cpuPortNum < 1) {
 
-            LOG.info("Invalid supervisor.slots.port.cpu.weight setting :"
-                    + cpuWeight + ", cpu cores:" + sysCpuNum);
+            LOG.info("Invalid supervisor.slots.port.cpu.weight setting :" + cpuWeight + ", cpu cores:" + sysCpuNum);
             cpuPortNum = 1;
         }
 
+        Double memWeight = ConfigExtension.getSupervisorSlotsPortMemWeight(conf);
         int memPortNum = Integer.MAX_VALUE;
         Long physicalMemSize = JStormUtils.getPhysicMemorySize();
         if (physicalMemSize == null) {
@@ -1223,7 +1237,7 @@ public class JStormUtils {
         } else {
             LOG.info("Get system memory size :" + physicalMemSize);
             long workerMemSize = ConfigExtension.getMemSizePerWorker(conf);
-            memPortNum = (int) (physicalMemSize / workerMemSize);
+            memPortNum = (int) (physicalMemSize / (workerMemSize * memWeight));
             if (memPortNum < 1) {
                 LOG.info("Invalide worker.memory.size setting:" + workerMemSize);
                 memPortNum = 4;
@@ -1261,14 +1275,10 @@ public class JStormUtils {
     }
 
     public static Object createDisruptorWaitStrategy(Map conf) {
-        String waitStrategy =
-                (String) conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY);
+        String waitStrategy = (String) conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY);
         Object ret;
-
-        if (waitStrategy.indexOf("TimeoutBlockingWaitStrategy") != -1) {
-            long timeout =
-                    parseLong(conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT),
-                            10);
+        if (waitStrategy.contains("TimeoutBlockingWaitStrategy")) {
+            long timeout = parseLong(conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10);
             ret = Utils.newInstance(waitStrategy, timeout, TimeUnit.MILLISECONDS);
         } else {
             ret = Utils.newInstance(waitStrategy);
@@ -1276,4 +1286,64 @@ public class JStormUtils {
 
         return ret;
     }
+    
+    public static Object thriftToObject(Object obj) {
+    	Object ret = null;
+    	if (obj instanceof org.apache.thrift.TBase) {
+			ret = thriftToMap((org.apache.thrift.TBase)obj);
+		}else if (obj instanceof List) {
+			ret = new ArrayList<>();
+			for (Object item : (List)obj) {
+				((List)ret).add(thriftToObject(item));
+			}
+		}else if (obj instanceof Map) {
+			ret = new HashMap<String, Object>();
+			Set<Entry> entrySet = ((Map)obj).entrySet();
+			for (Entry entry : entrySet) {
+				((Map)ret).put(String.valueOf(entry.getKey()), thriftToObject(entry.getValue()));
+			}
+		}else {
+
+			ret = String.valueOf(obj);
+		}
+    	
+    	return ret;
+    }
+    
+    public static  Map<String, Object> thriftToMap(
+    		org.apache.thrift.TBase thriftObj) {
+    	Map<String, Object> ret = new HashMap<String, Object>();
+    	
+    	int i = 1;
+    	TFieldIdEnum field = thriftObj.fieldForId(i);
+    	while(field != null) {
+    		if (thriftObj.isSet(field)) {
+    			Object obj = thriftObj.getFieldValue(field);
+    			ret.put(field.getFieldName(), thriftToObject(obj));
+    			
+    		}
+    		field = thriftObj.fieldForId(++i);
+    	}
+    	
+    	return ret;
+    }
+    
+    public static List<Map<String, Object>> thriftToMap(List thriftObjs) {
+    	List<Map<String, Object> > ret = new ArrayList<Map<String, Object> > () ;
+    	
+    	for (Object thriftObj : thriftObjs) {
+    		ret.add(thriftToMap((org.apache.thrift.TBase)thriftObj));
+    	}
+    	
+    	return ret;
+    }
+
+
+    public static long halfValueOfSum(long v1, long v2, boolean increment) {
+        long ret = (v1 + v2) / 2;
+        if (increment) {
+            ret += (v1 + v2) % 2;
+        }
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java
index d082fcc..91be977 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java
@@ -21,121 +21,121 @@ import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.SafeConstructor;
 
 public class LoadConf {
-	private static final Logger LOG = LoggerFactory.getLogger(LoadConf.class);
-
-	public static List<URL> findResources(String name) {
-		try {
-			Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
-			List<URL> ret = new ArrayList<URL>();
-			while (resources.hasMoreElements()) {
-				ret.add(resources.nextElement());
-			}
-			return ret;
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * 
-	 * @param name
-	 * @param mustExist   -- if this is true, the file must exist, otherwise throw exception 
-	 * @param canMultiple -- if this is false and there is multiple conf,  it will throw exception
-	 * @return
-	 */
-	public static Map findAndReadYaml(String name, boolean mustExist, boolean canMultiple) {
-		InputStream in = null;
-		boolean confFileEmpty = false;
-		try {
-			in = getConfigFileInputStream(name, canMultiple);
-			if (null != in) {
-				Yaml yaml = new Yaml(new SafeConstructor());
-				Map ret = (Map) yaml.load(new InputStreamReader(in));
-				if (null != ret) {
-					return new HashMap(ret);
-				} else {
-					confFileEmpty = true;
-				}
-			}
-
-			if (mustExist) {
-				if (confFileEmpty)
-					throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs");
-				else
-					throw new RuntimeException("Could not find config file on classpath " + name);
-			} else {
-				return new HashMap();
-			}
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		} finally {
-			if (null != in) {
-				try {
-					in.close();
-				} catch (IOException e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-	}
-
-	public static InputStream getConfigFileInputStream(String configFilePath, boolean canMultiple) throws IOException {
-		if (null == configFilePath) {
-			throw new IOException("Could not find config file, name not specified");
-		}
-
-		HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath));
-		if (resources.isEmpty()) {
-			File configFile = new File(configFilePath);
-			if (configFile.exists()) {
-				return new FileInputStream(configFile);
-			}
-		} else if (resources.size() > 1 && canMultiple == false) {
-			throw new IOException("Found multiple " + configFilePath
-					+ " resources. You're probably bundling the Storm jars with your topology jar. " + resources);
-		} else {
-			LOG.info("Using " + configFilePath + " from resources");
-			URL resource = resources.iterator().next();
-			return resource.openStream();
-		}
-		return null;
-	}
-
-	public static InputStream getConfigFileInputStream(String configFilePath) throws IOException {
-		return getConfigFileInputStream(configFilePath, true);
-	}
-
-	public static Map LoadYaml(String confPath) {
-
-		return findAndReadYaml(confPath, true, true);
-
-	}
-
-	public static Map LoadProperty(String prop) {
-
-		InputStream in = null;
-		Properties properties = new Properties();
-
-		try {
-			in = getConfigFileInputStream(prop);
-			properties.load(in);
-		} catch (FileNotFoundException e) {
-			throw new RuntimeException("No such file " + prop);
-		} catch (Exception e1) {
-			throw new RuntimeException("Failed to read config file");
-		} finally {
-			if (null != in) {
-				try {
-					in.close();
-				} catch (IOException e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-
-		Map ret = new HashMap();
-		ret.putAll(properties);
-		return ret;
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(LoadConf.class);
+
+    public static List<URL> findResources(String name) {
+        try {
+            Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
+            List<URL> ret = new ArrayList<URL>();
+            while (resources.hasMoreElements()) {
+                ret.add(resources.nextElement());
+            }
+            return ret;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * 
+     * @param name
+     * @param mustExist -- if this is true, the file must exist, otherwise throw exception
+     * @param canMultiple -- if this is false and there is multiple conf, it will throw exception
+     * @return
+     */
+    public static Map findAndReadYaml(String name, boolean mustExist, boolean canMultiple) {
+        InputStream in = null;
+        boolean confFileEmpty = false;
+        try {
+            in = getConfigFileInputStream(name, canMultiple);
+            if (null != in) {
+                Yaml yaml = new Yaml(new SafeConstructor());
+                Map ret = (Map) yaml.load(new InputStreamReader(in));
+                if (null != ret) {
+                    return new HashMap(ret);
+                } else {
+                    confFileEmpty = true;
+                }
+            }
+
+            if (mustExist) {
+                if (confFileEmpty)
+                    throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs");
+                else
+                    throw new RuntimeException("Could not find config file on classpath " + name);
+            } else {
+                return new HashMap();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (null != in) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    public static InputStream getConfigFileInputStream(String configFilePath, boolean canMultiple) throws IOException {
+        if (null == configFilePath) {
+            throw new IOException("Could not find config file, name not specified");
+        }
+
+        HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath));
+        if (resources.isEmpty()) {
+            File configFile = new File(configFilePath);
+            if (configFile.exists()) {
+                return new FileInputStream(configFile);
+            }
+        } else if (resources.size() > 1 && canMultiple == false) {
+            throw new IOException("Found multiple " + configFilePath + " resources. You're probably bundling the Storm jars with your topology jar. "
+                    + resources);
+        } else {
+            LOG.info("Using " + configFilePath + " from resources");
+            URL resource = resources.iterator().next();
+            return resource.openStream();
+        }
+        return null;
+    }
+
+    public static InputStream getConfigFileInputStream(String configFilePath) throws IOException {
+        return getConfigFileInputStream(configFilePath, true);
+    }
+
+    public static Map LoadYaml(String confPath) {
+
+        return findAndReadYaml(confPath, true, true);
+
+    }
+
+    public static Map LoadProperty(String prop) {
+
+        InputStream in = null;
+        Properties properties = new Properties();
+
+        try {
+            in = getConfigFileInputStream(prop);
+            properties.load(in);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException("No such file " + prop);
+        } catch (Exception e1) {
+            throw new RuntimeException("Failed to read config file");
+        } finally {
+            if (null != in) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        Map ret = new HashMap();
+        ret.putAll(properties);
+        return ret;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
index 8bca599..e005f38 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
@@ -103,41 +103,39 @@ public class NetWorkUtils {
         try {
             address = InetAddress.getByName(host);
         } catch (UnknownHostException e) {
-            LOG.warn("NetWorkUtil can't transfer hostname(" + host
-                    + ") to ip, return hostname", e);
+            LOG.warn("NetWorkUtil can't transfer hostname(" + host + ") to ip, return hostname", e);
             return host;
         }
         return address.getHostAddress();
     }
-    
-    public static List<String>  host2Ip(List<String> servers) {
-    	if (servers == null || servers.size() == 0) {
-    		return new ArrayList<String>();
-    	}
-    	
-    	Set<String> ret = new HashSet<String>();
-    	for (String server : servers) {
-    		if (StringUtils.isBlank(server)) {
-    			continue;
-    		}
-    		
-    		InetAddress ia;
-			try {
-				ia = InetAddress.getByName(server);
-			} catch (UnknownHostException e) {
-				// TODO Auto-generated catch block
-				LOG.info("Fail to get address of ", server);
-				continue;
-			}
-    		if (ia.isLoopbackAddress() || ia.isAnyLocalAddress()) {
-    			ret.add(NetWorkUtils.ip());
-    		}else {
-    			ret.add(ia.getHostAddress());
-    		}
-    	}
-    	
-    	
-    	return JStormUtils.mk_list(ret);
+
+    public static List<String> host2Ip(List<String> servers) {
+        if (servers == null || servers.size() == 0) {
+            return new ArrayList<String>();
+        }
+
+        Set<String> ret = new HashSet<String>();
+        for (String server : servers) {
+            if (StringUtils.isBlank(server)) {
+                continue;
+            }
+
+            InetAddress ia;
+            try {
+                ia = InetAddress.getByName(server);
+            } catch (UnknownHostException e) {
+                // TODO Auto-generated catch block
+                LOG.info("Fail to get address of ", server);
+                continue;
+            }
+            if (ia.isLoopbackAddress() || ia.isAnyLocalAddress()) {
+                ret.add(NetWorkUtils.ip());
+            } else {
+                ret.add(ia.getHostAddress());
+            }
+        }
+
+        return JStormUtils.mk_list(ret);
     }
 
     public static String ip2Host(String ip) {
@@ -145,8 +143,7 @@ public class NetWorkUtils {
         try {
             address = InetAddress.getByName(ip);
         } catch (UnknownHostException e) {
-            LOG.warn("NetWorkUtil can't transfer ip(" + ip
-                    + ") to hostname, return ip", e);
+            LOG.warn("NetWorkUtil can't transfer ip(" + ip + ") to hostname, return ip", e);
             return ip;
         }
         return address.getHostName();
@@ -168,13 +165,13 @@ public class NetWorkUtils {
         return StringUtils.equalsIgnoreCase(ip1, ip2);
 
     }
-    
+
     public static void main(String[] args) {
-    	List<String> servers = new ArrayList<String>();
-    	servers.add("localhost");
-    	
-    	System.out.println(host2Ip(servers));
-    	
+        List<String> servers = new ArrayList<String>();
+        servers.add("localhost");
+
+        System.out.println(host2Ip(servers));
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java
index d4f6e0f..f5acda7 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java
@@ -17,141 +17,144 @@
  */
 package com.alibaba.jstorm.utils;
 
-public class OSInfo {  
-    
-    private static String OS = System.getProperty("os.name").toLowerCase();  
-      
-    private static OSInfo _instance = new OSInfo();  
-      
-    private EPlatform platform;  
-      
-    private OSInfo(){}  
-      
-    public static boolean isLinux(){  
-        return OS.indexOf("linux")>=0;  
-    }  
-      
-    public static boolean isMacOS(){  
-        return OS.indexOf("mac")>=0&&OS.indexOf("os")>0&&OS.indexOf("x")<0;  
-    }  
-      
-    public static boolean isMacOSX(){  
-        return OS.indexOf("mac")>=0&&OS.indexOf("os")>0&&OS.indexOf("x")>0;  
-    }  
-    
+public class OSInfo {
+
+    private static String OS = System.getProperty("os.name").toLowerCase();
+
+    private static OSInfo _instance = new OSInfo();
+
+    private EPlatform platform;
+
+    private OSInfo() {
+    }
+
+    public static boolean isLinux() {
+        return OS.indexOf("linux") >= 0;
+    }
+
+    public static boolean isMacOS() {
+        return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0 && OS.indexOf("x") < 0;
+    }
+
+    public static boolean isMacOSX() {
+        return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0 && OS.indexOf("x") > 0;
+    }
+
     public static boolean isMac() {
-        return OS.indexOf("mac")>=0&&OS.indexOf("os")>0;
-    }
-      
-    public static boolean isWindows(){  
-        return OS.indexOf("windows")>=0;  
-    }  
-      
-    public static boolean isOS2(){  
-        return OS.indexOf("os/2")>=0;  
-    }  
-      
-    public static boolean isSolaris(){  
-        return OS.indexOf("solaris")>=0;  
-    }  
-      
-    public static boolean isSunOS(){  
-        return OS.indexOf("sunos")>=0;  
-    }  
-      
-    public static boolean isMPEiX(){  
-        return OS.indexOf("mpe/ix")>=0;  
-    }  
-      
-    public static boolean isHPUX(){  
-        return OS.indexOf("hp-ux")>=0;  
-    }  
-      
-    public static boolean isAix(){  
-        return OS.indexOf("aix")>=0;  
-    }  
-      
-    public static boolean isOS390(){  
-        return OS.indexOf("os/390")>=0;  
-    }  
-      
-    public static boolean isFreeBSD(){  
-        return OS.indexOf("freebsd")>=0;  
-    }  
-      
-    public static boolean isIrix(){  
-        return OS.indexOf("irix")>=0;  
-    }  
-      
-    public static boolean isDigitalUnix(){  
-        return OS.indexOf("digital")>=0&&OS.indexOf("unix")>0;  
-    }  
-      
-    public static boolean isNetWare(){  
-        return OS.indexOf("netware")>=0;  
-    }  
-      
-    public static boolean isOSF1(){  
-        return OS.indexOf("osf1")>=0;  
-    }  
-      
-    public static boolean isOpenVMS(){  
-        return OS.indexOf("openvms")>=0;  
-    }  
-      
-    /** 
-     * Get OS name 
-     * @return OS name 
-     */  
-    public static EPlatform getOSname(){  
-        if(isAix()){  
-            _instance.platform = EPlatform.AIX;  
-        }else if (isDigitalUnix()) {  
-            _instance.platform = EPlatform.Digital_Unix;  
-        }else if (isFreeBSD()) {  
-            _instance.platform = EPlatform.FreeBSD;  
-        }else if (isHPUX()) {  
-            _instance.platform = EPlatform.HP_UX;  
-        }else if (isIrix()) {  
-            _instance.platform = EPlatform.Irix;  
-        }else if (isLinux()) {  
-            _instance.platform = EPlatform.Linux;  
-        }else if (isMacOS()) {  
-            _instance.platform = EPlatform.Mac_OS;  
-        }else if (isMacOSX()) {  
-            _instance.platform = EPlatform.Mac_OS_X;  
-        }else if (isMPEiX()) {  
-            _instance.platform = EPlatform.MPEiX;  
-        }else if (isNetWare()) {  
-            _instance.platform = EPlatform.NetWare_411;  
-        }else if (isOpenVMS()) {  
-            _instance.platform = EPlatform.OpenVMS;  
-        }else if (isOS2()) {  
-            _instance.platform = EPlatform.OS2;  
-        }else if (isOS390()) {  
-            _instance.platform = EPlatform.OS390;  
-        }else if (isOSF1()) {  
-            _instance.platform = EPlatform.OSF1;  
-        }else if (isSolaris()) {  
-            _instance.platform = EPlatform.Solaris;  
-        }else if (isSunOS()) {  
-            _instance.platform = EPlatform.SunOS;  
-        }else if (isWindows()) {  
-            _instance.platform = EPlatform.Windows;  
-        }else{  
-            _instance.platform = EPlatform.Others;  
-        }  
-        return _instance.platform;  
-    }  
-    /** 
-     * @param args 
-     */  
-    public static void main(String[] args) {  
-        System.out.println( System.getProperty("os.name") );  
-        System.out.println( System.getProperty("os.version") );  
-        System.out.println( System.getProperty("os.arch") );  
-        
-        System.out.println(OSInfo.getOSname());  
-    }  
-  
-}  
+        return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0;
+    }
+
+    public static boolean isWindows() {
+        return OS.indexOf("windows") >= 0;
+    }
+
+    public static boolean isOS2() {
+        return OS.indexOf("os/2") >= 0;
+    }
+
+    public static boolean isSolaris() {
+        return OS.indexOf("solaris") >= 0;
+    }
+
+    public static boolean isSunOS() {
+        return OS.indexOf("sunos") >= 0;
+    }
+
+    public static boolean isMPEiX() {
+        return OS.indexOf("mpe/ix") >= 0;
+    }
+
+    public static boolean isHPUX() {
+        return OS.indexOf("hp-ux") >= 0;
+    }
+
+    public static boolean isAix() {
+        return OS.indexOf("aix") >= 0;
+    }
+
+    public static boolean isOS390() {
+        return OS.indexOf("os/390") >= 0;
+    }
+
+    public static boolean isFreeBSD() {
+        return OS.indexOf("freebsd") >= 0;
+    }
+
+    public static boolean isIrix() {
+        return OS.indexOf("irix") >= 0;
+    }
+
+    public static boolean isDigitalUnix() {
+        return OS.indexOf("digital") >= 0 && OS.indexOf("unix") > 0;
+    }
+
+    public static boolean isNetWare() {
+        return OS.indexOf("netware") >= 0;
+    }
+
+    public static boolean isOSF1() {
+        return OS.indexOf("osf1") >= 0;
+    }
+
+    public static boolean isOpenVMS() {
+        return OS.indexOf("openvms") >= 0;
+    }
+
+    /**
+     * Get OS name
+     * 
+     * @return OS name
+     */
+    public static EPlatform getOSname() {
+        if (isAix()) {
+            _instance.platform = EPlatform.AIX;
+        } else if (isDigitalUnix()) {
+            _instance.platform = EPlatform.Digital_Unix;
+        } else if (isFreeBSD()) {
+            _instance.platform = EPlatform.FreeBSD;
+        } else if (isHPUX()) {
+            _instance.platform = EPlatform.HP_UX;
+        } else if (isIrix()) {
+            _instance.platform = EPlatform.Irix;
+        } else if (isLinux()) {
+            _instance.platform = EPlatform.Linux;
+        } else if (isMacOS()) {
+            _instance.platform = EPlatform.Mac_OS;
+        } else if (isMacOSX()) {
+            _instance.platform = EPlatform.Mac_OS_X;
+        } else if (isMPEiX()) {
+            _instance.platform = EPlatform.MPEiX;
+        } else if (isNetWare()) {
+            _instance.platform = EPlatform.NetWare_411;
+        } else if (isOpenVMS()) {
+            _instance.platform = EPlatform.OpenVMS;
+        } else if (isOS2()) {
+            _instance.platform = EPlatform.OS2;
+        } else if (isOS390()) {
+            _instance.platform = EPlatform.OS390;
+        } else if (isOSF1()) {
+            _instance.platform = EPlatform.OSF1;
+        } else if (isSolaris()) {
+            _instance.platform = EPlatform.Solaris;
+        } else if (isSunOS()) {
+            _instance.platform = EPlatform.SunOS;
+        } else if (isWindows()) {
+            _instance.platform = EPlatform.Windows;
+        } else {
+            _instance.platform = EPlatform.Others;
+        }
+        return _instance.platform;
+    }
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        System.out.println(System.getProperty("os.name"));
+        System.out.println(System.getProperty("os.version"));
+        System.out.println(System.getProperty("os.arch"));
+
+        System.out.println(OSInfo.getOSname());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java
index 13b1d98..30c2726 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java
@@ -39,8 +39,25 @@ public class OlderFileFilter implements FileFilter {
 
         long current_time = System.currentTimeMillis();
 
-        return (pathname.isFile() && (pathname.lastModified() + seconds * 1000 <= current_time))
-                || pathname.isDirectory();
+        return (pathname.lastModified() + seconds * 1000 <= current_time) ;
+    }
+    
+    
+    public static void main(String[] args) {
+    	long current_time = System.currentTimeMillis();
+    	String test = "test";
+    	
+    	
+    	File file = new File(test);
+    	file.delete();
+    	file.mkdir();
+    	file.setLastModified(current_time);
+    	
+    	JStormUtils.sleepMs(10 * 1000);
+
+    	File newFile = new File(test);
+    	System.out.println("modify time: " + newFile.lastModified() + ", raw:" + current_time);
+    	
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java
index 49d35d6..1bc8b56 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java
@@ -43,11 +43,11 @@ public class Pair<F, S> {
     }
 
     @Override
-    public String toString(){
+    public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("first:"+ first);
+        sb.append("first:" + first);
         sb.append(":");
-        sb.append("sencond:"+ second);
+        sb.append("sencond:" + second);
         return sb.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
index 939b81b..b3732dc 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
@@ -44,7 +44,7 @@ public class PathUtils {
      */
     public static List<String> tokenize_path(String path) {
         String[] toks = path.split(SEPERATOR);
-        java.util.ArrayList<String> rtn = new ArrayList<String>();
+        ArrayList<String> rtn = new ArrayList<String>();
         for (String str : toks) {
             if (!str.isEmpty()) {
                 rtn.add(str);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java
index e3be73f..20b9535 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java
@@ -20,8 +20,7 @@ package com.alibaba.jstorm.utils;
 import java.util.ArrayList;
 
 /**
- * Shuffle the Range, This class is used in shuffle grouping, it is better than
- * random, which can't make sure balance.
+ * Shuffle the Range, This class is used in shuffle grouping, it is better than random, which can't make sure balance.
  * 
  * @author yannian
  * 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java
index 454e987..877e1d6 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java
@@ -28,9 +28,8 @@ import java.util.concurrent.LinkedBlockingDeque;
 /**
  * RotatingMap must be used under thread-safe environment
  * 
- * Expires keys that have not been updated in the configured number of seconds.
- * The algorithm used will take between expirationSecs and expirationSecs * (1 +
- * 1 / (numBuckets-1)) to actually expire the message.
+ * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1
+ * / (numBuckets-1)) to actually expire the message.
  * 
  * get, put, remove, containsKey, and size take O(numBuckets) time to run.
  * 
@@ -45,8 +44,7 @@ public class RotatingMap<K, V> implements TimeOutMap<K, V> {
 
     private final Object lock = new Object();
 
-    public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback,
-            boolean isSingleThread) {
+    public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback, boolean isSingleThread) {
         if (numBuckets < 2) {
             throw new IllegalArgumentException("numBuckets must be >= 2");
         }
@@ -121,8 +119,7 @@ public class RotatingMap<K, V> implements TimeOutMap<K, V> {
     /**
      * Remove item from Rotate
      * 
-     * On the side of performance, scanning from header is faster On the side of
-     * logic, it should scan from the end to first.
+     * On the side of performance, scanning from header is faster On the side of logic, it should scan from the end to first.
      * 
      * @param key
      * @return

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java
index ba7547b..5bc2252 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java
@@ -25,20 +25,16 @@ import org.slf4j.LoggerFactory;
 
 public class SystemOperation {
 
-    public static final Logger LOG = LoggerFactory
-            .getLogger(SystemOperation.class);
+    public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class);
 
     public static boolean isRoot() throws IOException {
         String result = SystemOperation.exec("echo $EUID").substring(0, 1);
-        return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true
-                : false;
+        return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true : false;
     };
 
-    public static void mount(String name, String target, String type,
-            String data) throws IOException {
+    public static void mount(String name, String target, String type, String data) throws IOException {
         StringBuilder sb = new StringBuilder();
-        sb.append("mount -t ").append(type).append(" -o ").append(data)
-                .append(" ").append(name).append(" ").append(target);
+        sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target);
         SystemOperation.exec(sb.toString());
     }
 
@@ -50,9 +46,7 @@ public class SystemOperation {
 
     public static String exec(String cmd) throws IOException {
         LOG.debug("Shell cmd: " + cmd);
-        Process process =
-                new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd })
-                        .start();
+        Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd }).start();
         try {
             process.waitFor();
             String output = IOUtils.toString(process.getInputStream());

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java
index c55751c..5116c4f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java
@@ -17,34 +17,22 @@
  */
 package com.alibaba.jstorm.utils;
 
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.JavaObject;
-import backtype.storm.generated.JavaObjectArg;
-import backtype.storm.generated.NullStruct;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
 import backtype.storm.generated.StormTopology._Fields;
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.generated.TopologyInitialStatus;
 import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.task.IBolt;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.cluster.StormStatus;
 import com.alibaba.jstorm.daemon.nimbus.StatusType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Thrift utils
@@ -57,8 +45,7 @@ import com.alibaba.jstorm.daemon.nimbus.StatusType;
 public class Thrift {
     private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
 
-    public static StormStatus topologyInitialStatusToStormStatus(
-            TopologyInitialStatus tStatus) {
+    public static StormStatus topologyInitialStatusToStormStatus(TopologyInitialStatus tStatus) {
         if (tStatus.equals(TopologyInitialStatus.ACTIVE)) {
             return new StormStatus(StatusType.active);
         } else {
@@ -79,16 +66,13 @@ public class Thrift {
                 paraTypes[i] = Integer.class;
             } else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) {
                 paraTypes[i] = Long.class;
-            } else if (arg.getSetField().equals(
-                    JavaObjectArg._Fields.STRING_ARG)) {
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) {
                 paraTypes[i] = String.class;
             } else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) {
                 paraTypes[i] = Boolean.class;
-            } else if (arg.getSetField().equals(
-                    JavaObjectArg._Fields.BINARY_ARG)) {
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) {
                 paraTypes[i] = ByteBuffer.class;
-            } else if (arg.getSetField().equals(
-                    JavaObjectArg._Fields.DOUBLE_ARG)) {
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) {
                 paraTypes[i] = Double.class;
             } else {
                 paraTypes[i] = Object.class;
@@ -113,8 +97,7 @@ public class Thrift {
 
     public static List<String> fieldGrouping(Grouping grouping) {
         if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
-            throw new IllegalArgumentException(
-                    "Tried to get grouping fields from non fields grouping");
+            throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping");
         }
 
         return grouping.get_fields();
@@ -152,9 +135,11 @@ public class Thrift {
         return Grouping.direct(new NullStruct());
     }
 
-    private static ComponentCommon mkComponentcommon(
-            Map<GlobalStreamId, Grouping> inputs,
-            HashMap<String, StreamInfo> output_spec, Integer parallelism_hint) {
+    public static Grouping mkAllGrouping() {
+        return Grouping.all(new NullStruct());
+    }
+
+    private static ComponentCommon mkComponentcommon(Map<GlobalStreamId, Grouping> inputs, HashMap<String, StreamInfo> output_spec, Integer parallelism_hint) {
         ComponentCommon ret = new ComponentCommon(inputs, output_spec);
         if (parallelism_hint != null) {
             ret.set_parallelism_hint(parallelism_hint);
@@ -162,8 +147,7 @@ public class Thrift {
         return ret;
     }
 
-    public static Bolt mkBolt(Map<GlobalStreamId, Grouping> inputs, IBolt bolt,
-            HashMap<String, StreamInfo> output, Integer p) {
+    public static Bolt mkBolt(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, HashMap<String, StreamInfo> output, Integer p) {
         ComponentCommon common = mkComponentcommon(inputs, output, p);
         byte[] boltSer = Utils.serialize(bolt);
         ComponentObject component = ComponentObject.serialized_java(boltSer);
@@ -171,8 +155,7 @@ public class Thrift {
     }
 
     public static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null;
-    public static StormTopology._Fields[] SPOUT_FIELDS = {
-            StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
+    public static StormTopology._Fields[] SPOUT_FIELDS = { StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
     static {
         Set<_Fields> keys = StormTopology.metaDataMap.keySet();
         STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()];

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java
index c56e307..4d2ea0f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java
@@ -24,9 +24,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 /**
- * Expires keys that have not been updated in the configured number of seconds.
- * The algorithm used will take between expirationSecs and expirationSecs * (1 +
- * 1 / (numBuckets-1)) to actually expire the message.
+ * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1
+ * / (numBuckets-1)) to actually expire the message.
  * 
  * get, put, remove, containsKey, and size take O(numBuckets) time to run.
  * 
@@ -42,8 +41,7 @@ public class TimeCacheMap<K, V> implements TimeOutMap<K, V> {
     private Thread _cleaner;
     private ExpiredCallback _callback;
 
-    public TimeCacheMap(int expirationSecs, int numBuckets,
-            ExpiredCallback<K, V> callback) {
+    public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
         if (numBuckets < 2) {
             throw new IllegalArgumentException("numBuckets must be >= 2");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java
index 8468310..00e5cf3 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java
@@ -25,14 +25,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Expires keys that have not been updated in the configured number of seconds.
- * The algorithm used will take between expirationSecs and expirationSecs * (1 +
- * 1 / (numBuckets-1)) to actually expire the message.
+ * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1
+ * / (numBuckets-1)) to actually expire the message.
  * 
  * get, put, remove, containsKey, and size take O(numBuckets) time to run.
  * 
- * The advantage of this design is that the expiration thread only locks the
- * object for O(1) time, meaning the object is essentially always available for
+ * The advantage of this design is that the expiration thread only locks the object for O(1) time, meaning the object is essentially always available for
  * poll/offer
  */
 public class TimeCacheQueue<K> {
@@ -44,8 +42,7 @@ public class TimeCacheQueue<K> {
     }
 
     public static class DefaultExpiredCallback<K> implements ExpiredCallback<K> {
-        protected static final Logger LOG = LoggerFactory
-                .getLogger(TimeCacheQueue.DefaultExpiredCallback.class);
+        protected static final Logger LOG = LoggerFactory.getLogger(DefaultExpiredCallback.class);
 
         protected String queueName;
 
@@ -54,8 +51,7 @@ public class TimeCacheQueue<K> {
         }
 
         public void expire(K entry) {
-            LOG.info("TimeCacheQueue " + queueName + " entry:" + entry
-                    + ", timeout");
+            LOG.info("TimeCacheQueue " + queueName + " entry:" + entry + ", timeout");
         }
     }
 
@@ -65,8 +61,7 @@ public class TimeCacheQueue<K> {
     protected Thread _cleaner;
     protected ExpiredCallback _callback;
 
-    public TimeCacheQueue(int expirationSecs, int numBuckets,
-            ExpiredCallback<K> callback) {
+    public TimeCacheQueue(int expirationSecs, int numBuckets, ExpiredCallback<K> callback) {
         if (numBuckets < 2) {
             throw new IllegalArgumentException("numBuckets must be >= 2");
         }
@@ -130,8 +125,7 @@ public class TimeCacheQueue<K> {
 
     public K poll() {
         synchronized (_lock) {
-            Iterator<LinkedBlockingDeque<K>> itor =
-                    _buckets.descendingIterator();
+            Iterator<LinkedBlockingDeque<K>> itor = _buckets.descendingIterator();
             while (itor.hasNext()) {
                 LinkedBlockingDeque<K> bucket = itor.next();
                 K entry = bucket.poll();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java
index a5c189f..fbae631 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java
@@ -40,29 +40,24 @@ public class TimeFormat {
 
     public static final long ONE_DAY_HOURS = 24;
 
-    public static final long ONE_MINUTE_MILLISECONDS = ONE_MINUTE_SECONDS
-            * ONE_SECOND_MILLISECONDS;
+    public static final long ONE_MINUTE_MILLISECONDS = ONE_MINUTE_SECONDS * ONE_SECOND_MILLISECONDS;
 
-    public static final long ONE_HOUR_MILLISECONDS = ONE_HOUR_MINUTES
-            * ONE_MINUTE_MILLISECONDS;
+    public static final long ONE_HOUR_MILLISECONDS = ONE_HOUR_MINUTES * ONE_MINUTE_MILLISECONDS;
 
-    public static final long ONE_DAY_MILLISECONDS = ONE_DAY_HOURS
-            * ONE_HOUR_MILLISECONDS;
+    public static final long ONE_DAY_MILLISECONDS = ONE_DAY_HOURS * ONE_HOUR_MILLISECONDS;
 
     public static Date convertDate(String dateStr, String format) {
         Date date = null;
         try {
             if (format != null) {
-                SimpleDateFormat simpleDateFormat =
-                        new SimpleDateFormat(format);
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);
                 date = simpleDateFormat.parse(dateStr);
             } else {
                 date = new Date(dateStr);
             }
 
         } catch (Exception ex) {
-            log.error("Failed to convert " + dateStr + " to Date, format:"
-                    + format);
+            log.error("Failed to convert " + dateStr + " to Date, format:" + format);
             return null;
         }
         return date;
@@ -77,8 +72,7 @@ public class TimeFormat {
             ret = sdf.format(date);
 
         } catch (Exception e) {
-            log.error("Failed to convert " + date + " to String, format:"
-                    + format);
+            log.error("Failed to convert " + date + " to String, format:" + format);
             return null;
         }
         return ret;
@@ -207,12 +201,9 @@ public class TimeFormat {
         tomorrow.set(Calendar.MINUTE, 0);
         Date startTime = tomorrow.getTime();
 
-        long hourdiff =
-                (startTime.getTime() - current.getTime())
-                        / ONE_HOUR_MILLISECONDS;
+        long hourdiff = (startTime.getTime() - current.getTime()) / ONE_HOUR_MILLISECONDS;
 
-        System.out.println("Current:" + current + ", tomorrow" + startTime
-                + ", diff hour" + hourdiff);
+        System.out.println("Current:" + current + ", tomorrow" + startTime + ", diff hour" + hourdiff);
 
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java
index 8c9bd3d..9068731 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,19 +18,23 @@
 package com.alibaba.jstorm.utils;
 
 import backtype.storm.utils.Time;
+import com.alibaba.jstorm.metric.AsmWindow;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
 
 /**
  * Time utils
- * 
+ *
  * @author yannian
- * 
  */
 public class TimeUtils {
 
+    public static final long NS_PER_MS = 1000000L;
+
     /**
      * Take care of int overflow
-     * 
-     * @return
      */
     public static int current_time_secs() {
         return (int) (Time.currentTimeMillis() / 1000);
@@ -38,8 +42,6 @@ public class TimeUtils {
 
     /**
      * Take care of int overflow
-     * 
-     * @return
      */
     public static int time_delta(int time_secs) {
         return current_time_secs() - time_secs;
@@ -48,4 +50,91 @@ public class TimeUtils {
     public static long time_delta_ms(long time_ms) {
         return System.currentTimeMillis() - time_ms;
     }
+
+    public static final long NS_PER_US = 1000l;
+
+    public static final int ONE_SEC = 1;
+    public static final int SEC_PER_MIN = 60;
+    public static final int SEC_PER_DAY = 86400;
+
+    public static boolean isTimeAligned() {
+        return current_time_secs() % SEC_PER_DAY % SEC_PER_MIN == 0;
+    }
+
+    public static int secOffset() {
+        return current_time_secs() % SEC_PER_DAY % SEC_PER_MIN;
+    }
+
+    public static int secOffset(long ts) {
+        return (int) (ts % SEC_PER_DAY % SEC_PER_MIN);
+    }
+
+    public static int winSecOffset(long ts, int window) {
+        return (int) (ts / 1000 % SEC_PER_DAY % window);
+    }
+
+    public static long alignTimeToWin(long ts, int win) {
+        if (win != AsmWindow.D1_WINDOW) {
+            long curTimeSec = ts / 1000;
+            return (curTimeSec - curTimeSec % win) * 1000;
+        } else {
+            Calendar cal = Calendar.getInstance();
+            cal.setTimeInMillis(ts);
+            int year = cal.get(Calendar.YEAR);
+            int month = cal.get(Calendar.MONTH);
+            int day = cal.get(Calendar.DAY_OF_MONTH);
+            int hour = cal.get(Calendar.HOUR);
+            int min = cal.get(Calendar.MINUTE);
+            int sec = cal.get(Calendar.SECOND);
+            if (sec + min + hour > 0) {
+                cal.set(year, month, day + 1, 0, 0, 0);
+            }
+            return cal.getTimeInMillis();
+        }
+    }
+
+    public static long alignTimeToMin(long ts) {
+        return alignTimeToWin(ts, AsmWindow.M1_WINDOW);
+    }
+
+    public static String toTimeStr(Date time) {
+        int hour = time.getHours();
+        int min = time.getMinutes();
+        StringBuilder sb = new StringBuilder();
+        if (hour < 10) {
+            sb.append(0).append(hour);
+        } else {
+            sb.append(hour);
+        }
+        sb.append(":");
+        if (min < 10) {
+            sb.append(0).append(min);
+        } else {
+            sb.append(min);
+        }
+        return sb.toString();
+    }
+
+    public static String format(int curTimeSec) {
+        return format(new Date(curTimeSec * 1000L), "yyyy-MM-dd HH:mm:ss");
+    }
+
+    public static String format(Date time, String fmt) {
+        SimpleDateFormat df = new SimpleDateFormat(fmt);
+        return df.format(time);
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.M1_WINDOW)));
+        System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.M10_WINDOW)));
+        System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.H2_WINDOW)));
+        System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.D1_WINDOW)));
+
+        Calendar cal = Calendar.getInstance();
+        cal.set(2015, 6, 23, 0, 0, 0);
+        System.out.println(new Date(alignTimeToWin(cal.getTimeInMillis(), AsmWindow.D1_WINDOW)));
+
+        System.out.println(format(TimeUtils.current_time_secs()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java
index 09c25a5..eab0212 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java
@@ -32,8 +32,7 @@ public class ZkEventTypes {
         map.put(Watcher.Event.EventType.NodeCreated, ":node-created");
         map.put(Watcher.Event.EventType.NodeDeleted, ":node-deleted");
         map.put(Watcher.Event.EventType.NodeDataChanged, ":node-data-changed");
-        map.put(Watcher.Event.EventType.NodeChildrenChanged,
-                ":node-children-changed");
+        map.put(Watcher.Event.EventType.NodeChildrenChanged, ":node-children-changed");
 
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
index b726781..a098730 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
@@ -20,34 +20,44 @@ package com.alibaba.jstorm.zk;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.alibaba.jstorm.cluster.Cluster;
+import org.apache.log4j.Logger;
 
 import backtype.storm.Config;
 import backtype.storm.utils.Utils;
 
-import com.alibaba.jstorm.cluster.Cluster;
 import com.alibaba.jstorm.cluster.ClusterState;
 import com.alibaba.jstorm.cluster.DistributedClusterState;
 import com.google.common.collect.Maps;
 
 public class ZkTool {
-    private static Logger LOG = LoggerFactory.getLogger(ZkTool.class);
+    private static Logger LOG = Logger.getLogger(ZkTool.class);
 
     public static final String READ_CMD = "read";
 
     public static final String RM_CMD = "rm";
 
+    public static final String LIST_CMD = "list";
+
+    public static final String CLEAN_CMD = "clean";
+
     public static void usage() {
         LOG.info("Read ZK node's data, please do as following:");
         LOG.info(ZkTool.class.getName() + " read zkpath");
 
         LOG.info("\nDelete topology backup assignment, please do as following:");
         LOG.info(ZkTool.class.getName() + " rm topologyname");
+
+        LOG.info("\nlist subdirectory of zkPath , please do as following:");
+        LOG.info(ZkTool.class.getName() + " list zkpath");
+
+        LOG.info("\nDelete all nodes about a topologyId of zk , please do as following:");
+        LOG.info(ZkTool.class.getName() + " clean topologyId");
+
     }
 
     public static String getData(DistributedClusterState zkClusterState,
-            String path) throws Exception {
+                                 String path) throws Exception {
         byte[] data = zkClusterState.get_data(path, false);
         if (data == null || data.length == 0) {
             return null;
@@ -58,6 +68,135 @@ public class ZkTool {
         return obj.toString();
     }
 
+
+    public static void list(String path) {
+        DistributedClusterState zkClusterState = null;
+
+        try {
+            conf.put(Config.STORM_ZOOKEEPER_ROOT, "/");
+
+            zkClusterState = new DistributedClusterState(conf);
+
+            List<String>  children = zkClusterState.get_children(path, false);
+            if (children == null || children.isEmpty() ) {
+                LOG.info("No children of " + path);
+            }
+            else
+            {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Zk node children of " + path + "\n");
+                for (String str : children){
+                    sb.append(" " + str + ",");
+                }
+                sb.append("\n");
+                LOG.info(sb.toString());
+            }
+        } catch (Exception e) {
+            if (zkClusterState == null) {
+                LOG.error("Failed to connect ZK ", e);
+            } else {
+                LOG.error("Failed to list children of  " + path + "\n", e);
+            }
+        } finally {
+            if (zkClusterState != null) {
+                zkClusterState.close();
+            }
+        }
+    }
+    /**
+     * warnning! use this method cann't delete zkCache right now because of
+     *  new DistributedClusterState(conf)
+     */
+    public static void cleanTopology( String topologyId){
+        DistributedClusterState zkClusterState = null;
+        try {
+            zkClusterState = new DistributedClusterState(conf);
+            String rootDir = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
+            String assignmentPath = "/assignments/"+ topologyId;
+            String stormBase = "/topology/"+ topologyId;
+            String taskbeats = "/taskbeats/"+ topologyId;
+            String tasks = "/tasks/"+ topologyId;
+            String taskerrors = "/taskerrors/"+ topologyId;
+            String monitor = "/monitor/"+ topologyId;
+            if (zkClusterState.node_existed(assignmentPath, false)){
+                try {
+                    zkClusterState.delete_node(assignmentPath);
+                } catch (Exception e) {
+                    LOG.error("Could not remove assignments for " + topologyId, e);
+                }
+            }else {
+                LOG.info(" node of " + rootDir + assignmentPath + " isn't existed ");
+
+            }
+
+            if (zkClusterState.node_existed(stormBase, false)){
+                try {
+                    zkClusterState.delete_node(stormBase);
+                } catch (Exception e) {
+                    LOG.error("Failed to remove storm base for " + topologyId, e);
+                }
+            }else {
+                LOG.info(" node of " + rootDir + stormBase + " isn't existed ");
+
+            }
+
+            if (zkClusterState.node_existed(taskbeats, false)){
+                try {
+                    zkClusterState.delete_node(taskbeats);
+                } catch (Exception e) {
+                    LOG.error("Failed to remove taskbeats for " + topologyId, e);
+                }
+            }else {
+                LOG.info(" node of " + rootDir + taskbeats + " isn't existed ");
+
+            }
+
+            if (zkClusterState.node_existed(tasks, false)){
+                try {
+                    zkClusterState.delete_node(tasks);
+                } catch (Exception e) {
+                    LOG.error("Failed to remove tasks for " + topologyId, e);
+                }
+            }else {
+                LOG.info(" node of " + rootDir + tasks + " isn't existed ");
+
+            }
+
+            if (zkClusterState.node_existed(taskerrors, false)){
+                try {
+                    zkClusterState.delete_node(taskerrors);
+                } catch (Exception e) {
+                    LOG.error("Failed to remove taskerrors for " + topologyId, e);
+                }
+            }else {
+                LOG.info(" node of " + rootDir + taskerrors + " isn't existed ");
+
+            }
+
+            if (zkClusterState.node_existed(monitor, false)){
+                try {
+                    zkClusterState.delete_node(monitor);
+                } catch (Exception e) {
+                    LOG.error("Failed to remove monitor for " + topologyId, e);
+                }
+            }else {
+                LOG.info(" node of " + rootDir + monitor + " isn't existed ");
+
+            }
+        } catch (Exception e) {
+            if (zkClusterState == null) {
+                LOG.error("Failed to connect ZK ", e);
+            } else {
+                LOG.error("Failed to clean  topolodyId: " + topologyId + "\n", e);
+            }
+        } finally {
+            if (zkClusterState != null) {
+                zkClusterState.close();
+            }
+        }
+
+    }
+
     public static void readData(String path) {
 
         DistributedClusterState zkClusterState = null;
@@ -110,8 +249,7 @@ public class ZkTool {
                 if (tid.equals(topologyName)) {
                     LOG.info("Find backup " + topologyName);
 
-                    String topologyPath =
-                            Cluster.assignment_bak_path(topologyName);
+                    String topologyPath = assignment_bak_path(topologyName);
                     zkClusterState.delete_node(topologyPath);
 
                     LOG.info("Successfully delete topology " + topologyName
@@ -161,12 +299,21 @@ public class ZkTool {
 
         } else if (args[0].equalsIgnoreCase(RM_CMD)) {
             rmBakTopology(args[1]);
+        } else if (args[0].equalsIgnoreCase(LIST_CMD)) {
+            list(args[1]);
+        } else if (args[0].equalsIgnoreCase(CLEAN_CMD)) {
+            cleanTopology(args[1]);
         }
 
     }
 
     /*******************************************************************/
 
+    public static String assignment_bak_path(String id) {
+        return Cluster.ASSIGNMENTS_BAK_SUBTREE + Cluster.ZK_SEPERATOR
+                + id;
+    }
+
     @SuppressWarnings("rawtypes")
     public static ClusterState mk_distributed_cluster_state(Map _conf)
             throws Exception {
@@ -177,7 +324,8 @@ public class ZkTool {
             throws Exception {
         Map<String, String> ret = Maps.newHashMap();
         List<String> followers =
-                cluster_state.get_children(Cluster.NIMBUS_SLAVE_SUBTREE, false);
+                cluster_state.get_children(Cluster.NIMBUS_SLAVE_SUBTREE,
+                        false);
         if (followers == null || followers.size() == 0) {
             return ret;
         }