You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:56 UTC

[18/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java
deleted file mode 100755
index 63a725a..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.window;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.callback.Callback;
-import com.alibaba.jstorm.common.metric.operator.Sampling;
-import com.alibaba.jstorm.common.metric.operator.convert.Convertor;
-import com.alibaba.jstorm.common.metric.operator.merger.Merger;
-import com.alibaba.jstorm.common.metric.operator.updater.Updater;
-import com.alibaba.jstorm.utils.IntervalCheck;
-
-public class Metric<T, V> implements Sampling<Map<Integer, T>> {
-    private static final long serialVersionUID = -1362345159511508074L;
-    private static final Logger LOG = LoggerFactory.getLogger(Metric.class);
-
-    protected static boolean enable;
-
-    public static void setEnable(boolean e) {
-        enable = e;
-    }
-
-    protected List<RollingWindow<V>> rollingWindows;
-    protected AllWindow<V> allWindow;
-
-    protected int[] windowSeconds = { StatBuckets.MINUTE_WINDOW,
-            StatBuckets.HOUR_WINDOW, StatBuckets.DAY_WINDOW };
-    protected int bucketSize = StatBuckets.NUM_STAT_BUCKETS;
-    protected V defaultValue;
-    protected Updater<V> updater;
-    protected Merger<V> merger;
-    protected Convertor<V, T> convertor;
-    protected Callback callback;
-
-    protected int interval; // unit is second
-    protected IntervalCheck intervalCheck;
-    protected V unflushed;
-
-    public Metric() {
-    }
-
-    public int getInterval() {
-        if (windowSeconds == null || windowSeconds.length == 0) {
-            return StatBuckets.NUM_STAT_BUCKETS;
-        }
-
-        int intervals[] = new int[windowSeconds.length];
-        int smallest = Integer.MAX_VALUE;
-        for (int i = 0; i < windowSeconds.length; i++) {
-            int interval = windowSeconds[i] / bucketSize;
-            intervals[i] = interval;
-            if (interval < smallest) {
-                smallest = interval;
-            }
-        }
-
-        for (int goodInterval = smallest; goodInterval > 1; goodInterval--) {
-            boolean good = true;
-            for (int interval : intervals) {
-                if (interval % goodInterval != 0) {
-                    good = false;
-                    break;
-                }
-            }
-
-            if (good == true) {
-                return goodInterval;
-            }
-        }
-
-        return 1;
-    }
-
-    public void init() {
-        if (defaultValue == null || updater == null || merger == null
-                || convertor == null) {
-            throw new IllegalArgumentException("Invalid argements");
-        }
-
-        rollingWindows = new ArrayList<RollingWindow<V>>();
-        if (windowSeconds != null) {
-            rollingWindows.clear();
-            for (int windowSize : windowSeconds) {
-                RollingWindow<V> rollingWindow =
-                        new RollingWindow<V>(defaultValue, windowSize
-                                / bucketSize, windowSize, updater, merger);
-
-                rollingWindows.add(rollingWindow);
-            }
-
-        }
-        allWindow = new AllWindow<V>(defaultValue, updater, merger);
-
-        this.interval = getInterval();
-        this.intervalCheck = new IntervalCheck();
-        this.intervalCheck.setInterval(interval);
-    }
-
-    /**
-     * In order to improve performance 
-     * Do 
-     */
-    @Override
-    public void update(Number obj) {
-        if (enable == false) {
-            return;
-        }
-
-        if (intervalCheck.check()) {
-            flush();
-        }
-        synchronized (this) {
-            unflushed = updater.update(obj, unflushed);
-        }
-    }
-
-    public synchronized void flush() {
-        if (unflushed == null) {
-            return;
-        }
-        for (RollingWindow<V> rollingWindow : rollingWindows) {
-            rollingWindow.updateBatch(unflushed);
-        }
-        allWindow.updateBatch(unflushed);
-        unflushed = null;
-    }
-
-    @Override
-    public Map<Integer, T> getSnapshot() {
-        // TODO Auto-generated method stub
-        flush();
-
-        Map<Integer, T> ret = new TreeMap<Integer, T>();
-        for (RollingWindow<V> rollingWindow : rollingWindows) {
-            V value = rollingWindow.getSnapshot();
-
-            ret.put(rollingWindow.getWindowSecond(), convertor.convert(value));
-        }
-
-        ret.put(StatBuckets.ALL_TIME_WINDOW,
-                convertor.convert(allWindow.getSnapshot()));
-
-        if (callback != null) {
-            callback.execute(this);
-        }
-        return ret;
-    }
-
-    public T getAllTimeValue() {
-        return convertor.convert(allWindow.getSnapshot());
-    }
-
-    public int[] getWindowSeconds() {
-        return windowSeconds;
-    }
-
-    public void setWindowSeconds(int[] windowSeconds) {
-        this.windowSeconds = windowSeconds;
-    }
-
-    public int getBucketSize() {
-        return bucketSize;
-    }
-
-    public void setBucketSize(int bucketSize) {
-        this.bucketSize = bucketSize;
-    }
-
-    public V getDefaultValue() {
-        return defaultValue;
-    }
-
-    public void setDefaultValue(V defaultValue) {
-        this.defaultValue = defaultValue;
-    }
-
-    public Updater<V> getUpdater() {
-        return updater;
-    }
-
-    public void setUpdater(Updater<V> updater) {
-        this.updater = updater;
-    }
-
-    public Merger<V> getMerger() {
-        return merger;
-    }
-
-    public void setMerger(Merger<V> merger) {
-        this.merger = merger;
-    }
-
-    public Convertor<V, T> getConvertor() {
-        return convertor;
-    }
-
-    public void setConvertor(Convertor<V, T> convertor) {
-        this.convertor = convertor;
-    }
-
-    public Callback getCallback() {
-        return callback;
-    }
-
-    public void setCallback(Callback callback) {
-        this.callback = callback;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java
deleted file mode 100755
index 54047a6..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.window;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.common.metric.operator.Sampling;
-import com.alibaba.jstorm.common.metric.operator.StartTime;
-import com.alibaba.jstorm.common.metric.operator.merger.Merger;
-import com.alibaba.jstorm.common.metric.operator.updater.Updater;
-import com.alibaba.jstorm.utils.IntervalCheck;
-import com.alibaba.jstorm.utils.TimeUtils;
-
-public class RollingWindow<V> implements Sampling<V>, StartTime {
-    private static final long serialVersionUID = 3794478417380003279L;
-    private static final Logger LOG = LoggerFactory
-            .getLogger(RollingWindow.class);
-
-    protected long startTime;
-    protected Integer currBucketTime;
-    protected int interval; // unit is second
-    protected int windowSecond;
-    protected IntervalCheck intervalCheck;
-
-    protected TreeMap<Integer, V> buckets;
-    protected Integer bucketNum;
-    protected V unflushed;
-    protected V defaultValue;
-
-    protected Updater<V> updater;
-    protected Merger<V> merger;
-
-    RollingWindow(V defaultValue, int interval, int windowSecond,
-            Updater<V> updater, Merger<V> merger) {
-        this.startTime = System.currentTimeMillis();
-        this.interval = interval;
-        this.intervalCheck = new IntervalCheck();
-        this.intervalCheck.setInterval(interval);
-        this.currBucketTime = getCurrBucketTime();
-
-        this.bucketNum = windowSecond / interval;
-        this.windowSecond = (bucketNum) * interval;
-
-        this.buckets = new TreeMap<Integer, V>();
-
-        this.updater = updater;
-        this.merger = merger;
-
-        this.defaultValue = defaultValue;
-
-    }
-
-    
-    @Override
-    public void update(Number obj) {
-        // TODO Auto-generated method stub
-
-        if (intervalCheck.check()) {
-            rolling();
-        }
-        synchronized (this) {
-            unflushed = updater.update(obj, unflushed);
-
-        }
-
-    }
-    
-    /**
-     * In order to improve performance 
-     * Flush one batch to rollingWindow
-     * 
-     */
-    public void updateBatch(V batch) {
-
-        if (intervalCheck.check()) {
-            rolling();
-        }
-        synchronized (this) {
-            unflushed = updater.updateBatch(batch, unflushed);
-        }
-            
-    }
-
-    @Override
-    public V getSnapshot() {
-        // TODO Auto-generated method stub
-        if (intervalCheck.check()) {
-            rolling();
-        }
-
-        cleanExpiredBuckets();
-        // @@@ Testing
-        //LOG.info("Raw Data:" + buckets + ",unflushed:" + unflushed);
-
-        Collection<V> values = buckets.values();
-
-        V ret = merger.merge(values, unflushed, this);
-        if (ret == null) {
-
-            // @@@ testing
-            //LOG.warn("!!!!Exist null data !!!!!");
-            return defaultValue;
-        }
-        return ret;
-    }
-
-    /*
-     * Move the "current bucket time" index and clean the expired buckets
-     */
-    protected void rolling() {
-        synchronized (this) {
-            if (unflushed != null) {
-                buckets.put(currBucketTime, unflushed);
-                unflushed = null;
-            }
-        	
-        	currBucketTime = getCurrBucketTime();
-        	
-        	return ;
-        }
-    }
-
-    protected void cleanExpiredBuckets() {
-        int nowSec = TimeUtils.current_time_secs();
-        int startRemove = nowSec - (interval - 1) - windowSecond;
-
-        List<Integer> removeList = new ArrayList<Integer>();
-
-        for (Integer keyTime : buckets.keySet()) {
-            if (keyTime < startRemove) {
-                removeList.add(keyTime);
-            } else if (keyTime >= startRemove) {
-                break;
-            }
-        }
-
-        for (Integer removeKey : removeList) {
-            buckets.remove(removeKey);
-            // @@@ Testing
-            //LOG.info("Remove key:" + removeKey + ", diff:" + (nowSec - removeKey));
-
-        }
-
-        if (buckets.isEmpty() == false) {
-            Integer first = buckets.firstKey();
-            startTime = first.longValue() * 1000;
-        }
-    }
-
-    public int getWindowSecond() {
-        return windowSecond;
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public int getInterval() {
-        return interval;
-    }
-
-    public Integer getBucketNum() {
-        return bucketNum;
-    }
-
-    public V getDefaultValue() {
-        return defaultValue;
-    }
-
-    private Integer getCurrBucketTime() {
-        return (TimeUtils.current_time_secs() / interval) * interval;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java
deleted file mode 100755
index 3e9b021..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.window;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-public class StatBuckets {
-
-    public static final Integer NUM_STAT_BUCKETS = 20;
-
-    public static final Integer MINUTE_WINDOW = 600;
-    public static final Integer HOUR_WINDOW = 10800;
-    public static final Integer DAY_WINDOW = 86400;
-    public static final Integer ALL_TIME_WINDOW = 0;
-    public static Set<Integer> TIME_WINDOWS = new TreeSet<Integer>();
-    static {
-        TIME_WINDOWS.add(ALL_TIME_WINDOW);
-        TIME_WINDOWS.add(MINUTE_WINDOW);
-        TIME_WINDOWS.add(HOUR_WINDOW);
-        TIME_WINDOWS.add(DAY_WINDOW);
-    }
-
-    public static final String MINUTE_WINDOW_STR = "0d0h10m0s";
-    public static final String HOUR_WINDOW_STR = "0d3h0m0s";
-    public static final String DAY_WINDOW_STR = "1d0h0m0s";
-    public static final String ALL_WINDOW_STR = "All-time";
-
-    public static Integer[] STAT_BUCKETS = { MINUTE_WINDOW / NUM_STAT_BUCKETS,
-            HOUR_WINDOW / NUM_STAT_BUCKETS, DAY_WINDOW / NUM_STAT_BUCKETS };
-
-    private static final String[][] PRETTYSECDIVIDERS = {
-            new String[] { "s", "60" }, new String[] { "m", "60" },
-            new String[] { "h", "24" }, new String[] { "d", null } };
-
-    /**
-     * Service b
-     * 
-     * @param key
-     * @return
-     */
-    public static String parseTimeKey(Integer key) {
-        if (key == 0) {
-            return ALL_WINDOW_STR;
-        } else {
-            return String.valueOf(key);
-        }
-    }
-
-    /**
-     * 
-     * Default is the latest result
-     * 
-     * @param showKey
-     * @return
-     */
-    public static Integer getTimeKey(String showKey) {
-        Integer window = null;
-        if (showKey == null) {
-            window = (MINUTE_WINDOW);
-        } else if (showKey.equals(MINUTE_WINDOW_STR)) {
-            window = (MINUTE_WINDOW);
-        } else if (showKey.equals(HOUR_WINDOW_STR)) {
-            window = (HOUR_WINDOW);
-        } else if (showKey.equals(DAY_WINDOW_STR)) {
-            window = (DAY_WINDOW);
-        } else if (showKey.equals(ALL_WINDOW_STR)) {
-            window = ALL_TIME_WINDOW;
-        } else {
-            window = MINUTE_WINDOW;
-        }
-
-        return window;
-    }
-
-    /**
-     * Default is the latest result
-     * 
-     * @param showStr
-     * @return
-     */
-    public static String getShowTimeStr(Integer time) {
-        if (time == null) {
-            return MINUTE_WINDOW_STR;
-        } else if (time.equals(MINUTE_WINDOW)) {
-            return MINUTE_WINDOW_STR;
-        } else if (time.equals(HOUR_WINDOW)) {
-            return HOUR_WINDOW_STR;
-        } else if (time.equals(DAY_WINDOW)) {
-            return DAY_WINDOW_STR;
-        } else if (time.equals(ALL_TIME_WINDOW)) {
-            return ALL_WINDOW_STR;
-        } else {
-            return MINUTE_WINDOW_STR;
-        }
-
-    }
-
-    /**
-     * seconds to string like 1d20h30m40s
-     * 
-     * @param secs
-     * @return
-     */
-    public static String prettyUptimeStr(int secs) {
-        int diversize = PRETTYSECDIVIDERS.length;
-
-        List<String> tmp = new ArrayList<String>();
-        int div = secs;
-        for (int i = 0; i < diversize; i++) {
-            if (PRETTYSECDIVIDERS[i][1] != null) {
-                Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]);
-                tmp.add(div % d + PRETTYSECDIVIDERS[i][0]);
-                div = div / d;
-            } else {
-                tmp.add(div + PRETTYSECDIVIDERS[i][0]);
-            }
-        }
-
-        String rtn = "";
-        int tmpSzie = tmp.size();
-        for (int j = tmpSzie - 1; j > -1; j--) {
-            rtn += tmp.get(j);
-        }
-        return rtn;
-    }
-
-    /**
-     * @param args
-     */
-    public static void main(String[] args) {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
deleted file mode 100755
index 2dbab6f..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.stats;
-
-public enum StaticsType {
-    emitted, send_tps, recv_tps, acked, failed, transferred, process_latencies;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java
index d9148db..15b1cfa 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java
@@ -100,9 +100,8 @@ public class CgroupCenter implements CgroupOperation {
                 SubSystemType type = SubSystemType.getSubSystem(split[0]);
                 if (type == null)
                     continue;
-                subSystems.add(new SubSystem(type, Integer.valueOf(split[1]),
-                        Integer.valueOf(split[2]), Integer.valueOf(split[3])
-                                .intValue() == 1 ? true : false));
+                subSystems.add(new SubSystem(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]), Integer.valueOf(split[3]).intValue() == 1 ? true
+                        : false));
             }
             return subSystems;
         } catch (Exception e) {
@@ -168,8 +167,7 @@ public class CgroupCenter implements CgroupOperation {
         if (!CgroupUtils.dirExists(hierarchy.getDir()))
             new File(hierarchy.getDir()).mkdirs();
         String subSystems = CgroupUtils.reAnalyse(subsystems);
-        SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup",
-                subSystems);
+        SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems);
 
     }
 
@@ -217,8 +215,7 @@ public class CgroupCenter implements CgroupOperation {
     }
 
     public static void main(String args[]) {
-        System.out.println(CgroupCenter.getInstance().getHierarchies().get(0)
-                .getRootCgroups().getChildren().size());
+        System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java
index 4de2d5a..0cc45cc 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java
@@ -82,8 +82,7 @@ public class CgroupUtils {
         return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE);
     }
 
-    public static List<String> readFileByLine(String fileDir)
-            throws IOException {
+    public static List<String> readFileByLine(String fileDir) throws IOException {
         List<String> result = new ArrayList<String>();
         FileReader fileReader = null;
         BufferedReader reader = null;
@@ -101,8 +100,7 @@ public class CgroupUtils {
         return result;
     }
 
-    public static void writeFileByLine(String fileDir, List<String> strings)
-            throws IOException {
+    public static void writeFileByLine(String fileDir, List<String> strings) throws IOException {
         FileWriter writer = null;
         BufferedWriter bw = null;
         try {
@@ -123,8 +121,7 @@ public class CgroupUtils {
         }
     }
 
-    public static void writeFileByLine(String fileDir, String string)
-            throws IOException {
+    public static void writeFileByLine(String fileDir, String string) throws IOException {
         FileWriter writer = null;
         BufferedWriter bw = null;
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java
index 1655e49..20d4ec0 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java
@@ -27,8 +27,7 @@ public class SubSystem {
 
     private boolean enable;
 
-    public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum,
-            boolean enable) {
+    public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum, boolean enable) {
         this.type = type;
         this.hierarchyID = hierarchyID;
         this.cgroupsNum = cgroupNum;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java
index 0a772f6..224d05d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java
@@ -59,9 +59,7 @@ public class CgroupCommon implements CgroupCommonOperation {
         this.parent = parent;
         this.dir = parent.getDir() + "/" + name;
         this.init();
-        cores =
-                CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(),
-                        this.dir);
+        cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir);
         this.isRoot = false;
     }
 
@@ -74,23 +72,19 @@ public class CgroupCommon implements CgroupCommonOperation {
         this.parent = null;
         this.dir = dir;
         this.init();
-        cores =
-                CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(),
-                        this.dir);
+        cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir);
         this.isRoot = true;
     }
 
     @Override
     public void addTask(int taskId) throws IOException {
         // TODO Auto-generated method stub
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS),
-                String.valueOf(taskId));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId));
     }
 
     @Override
     public Set<Integer> getTasks() throws IOException {
-        List<String> stringTasks =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
+        List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
         Set<Integer> tasks = new HashSet<Integer>();
         for (String task : stringTasks) {
             tasks.add(Integer.valueOf(task));
@@ -101,16 +95,13 @@ public class CgroupCommon implements CgroupCommonOperation {
     @Override
     public void addProcs(int pid) throws IOException {
         // TODO Auto-generated method stub
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS),
-                String.valueOf(pid));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid));
     }
 
     @Override
     public Set<Integer> getPids() throws IOException {
         // TODO Auto-generated method stub
-        List<String> stringPids =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        CGROUP_PROCS));
+        List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS));
         Set<Integer> pids = new HashSet<Integer>();
         for (String task : stringPids) {
             pids.add(Integer.valueOf(task));
@@ -121,16 +112,12 @@ public class CgroupCommon implements CgroupCommonOperation {
     @Override
     public void setNotifyOnRelease(boolean flag) throws IOException {
         // TODO Auto-generated method stub
-        CgroupUtils
-                .writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE),
-                        flag ? "1" : "0");
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0");
     }
 
     @Override
     public boolean getNotifyOnRelease() throws IOException {
-        return CgroupUtils
-                .readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE))
-                .get(0).equals("1") ? true : false;
+        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
     }
 
     @Override
@@ -138,16 +125,14 @@ public class CgroupCommon implements CgroupCommonOperation {
         // TODO Auto-generated method stub
         if (!this.isRoot)
             return;
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT),
-                command);
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command);
     }
 
     @Override
     public String getReleaseAgent() throws IOException {
         if (!this.isRoot)
             return null;
-        return CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, RELEASE_AGENT)).get(0);
+        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0);
     }
 
     @Override
@@ -155,21 +140,16 @@ public class CgroupCommon implements CgroupCommonOperation {
         // TODO Auto-generated method stub
         if (!this.cores.keySet().contains(SubSystemType.cpuset))
             return;
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir,
-                CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
     }
 
     @Override
     public boolean getCgroupCloneChildren() throws IOException {
-        return CgroupUtils
-                .readFileByLine(
-                        Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN))
-                .get(0).equals("1") ? true : false;
+        return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
     }
 
     @Override
-    public void setEventControl(String eventFd, String controlFd,
-            String... args) throws IOException {
+    public void setEventControl(String eventFd, String controlFd, String... args) throws IOException {
         // TODO Auto-generated method stub
         StringBuilder sb = new StringBuilder();
         sb.append(eventFd);
@@ -179,10 +159,7 @@ public class CgroupCommon implements CgroupCommonOperation {
             sb.append(' ');
             sb.append(arg);
         }
-        CgroupUtils
-                .writeFileByLine(
-                        Constants.getDir(this.dir, CGROUP_EVENT_CONTROL),
-                        sb.toString());
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString());
     }
 
     public Hierarchy getHierarchy() {
@@ -240,8 +217,7 @@ public class CgroupCommon implements CgroupCommonOperation {
             return;
         for (File child : files) {
             if (child.isDirectory()) {
-                this.children.add(new CgroupCommon(child.getName(),
-                        this.hierarchy, this));
+                this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java
index 3f9090f..a76b09c 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java
@@ -42,7 +42,6 @@ public interface CgroupCommonOperation {
 
     public boolean getCgroupCloneChildren() throws IOException;
 
-    public void setEventControl(String eventFd, String controlFd,
-            String... args) throws IOException;
+    public void setEventControl(String eventFd, String controlFd, String... args) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java
index 279366a..2b3f3a8 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java
@@ -35,10 +35,8 @@ import com.alibaba.jstorm.container.cgroup.core.NetPrioCore;
 
 public class CgroupCoreFactory {
 
-    public static Map<SubSystemType, CgroupCore> getInstance(
-            Set<SubSystemType> types, String dir) {
-        Map<SubSystemType, CgroupCore> result =
-                new HashMap<SubSystemType, CgroupCore>();
+    public static Map<SubSystemType, CgroupCore> getInstance(Set<SubSystemType> types, String dir) {
+        Map<SubSystemType, CgroupCore> result = new HashMap<SubSystemType, CgroupCore>();
         for (SubSystemType type : types) {
             switch (type) {
             case blkio:

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java
index 5d487ec..9958114 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java
@@ -33,25 +33,18 @@ public class BlkioCore implements CgroupCore {
     public static final String BLKIO_WEIGHT_DEVICE = "/blkio.weight_device";
     public static final String BLKIO_RESET_STATS = "/blkio.reset_stats";
 
-    public static final String BLKIO_THROTTLE_READ_BPS_DEVICE =
-            "/blkio.throttle.read_bps_device";
-    public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE =
-            "/blkio.throttle.write_bps_device";
-    public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE =
-            "/blkio.throttle.read_iops_device";
-    public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE =
-            "/blkio.throttle.write_iops_device";
-
-    public static final String BLKIO_THROTTLE_IO_SERVICED =
-            "/blkio.throttle.io_serviced";
-    public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES =
-            "/blkio.throttle.io_service_bytes";
+    public static final String BLKIO_THROTTLE_READ_BPS_DEVICE = "/blkio.throttle.read_bps_device";
+    public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE = "/blkio.throttle.write_bps_device";
+    public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE = "/blkio.throttle.read_iops_device";
+    public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE = "/blkio.throttle.write_iops_device";
+
+    public static final String BLKIO_THROTTLE_IO_SERVICED = "/blkio.throttle.io_serviced";
+    public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES = "/blkio.throttle.io_service_bytes";
 
     public static final String BLKIO_TIME = "/blkio.time";
     public static final String BLKIO_SECTORS = "/blkio.sectors";
     public static final String BLKIO_IO_SERVICED = "/blkio.io_serviced";
-    public static final String BLKIO_IO_SERVICE_BYTES =
-            "/blkio.io_service_bytes";
+    public static final String BLKIO_IO_SERVICE_BYTES = "/blkio.io_service_bytes";
     public static final String BLKIO_IO_SERVICE_TIME = "/blkio.io_service_time";
     public static final String BLKIO_IO_WAIT_TIME = "/blkio.io_wait_time";
     public static final String BLKIO_IO_MERGED = "/blkio.io_merged";
@@ -71,28 +64,19 @@ public class BlkioCore implements CgroupCore {
 
     /* weight: 100-1000 */
     public void setBlkioWeight(int weight) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT),
-                String.valueOf(weight));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT), String.valueOf(weight));
     }
 
     public int getBlkioWeight() throws IOException {
-        return Integer.valueOf(
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, BLKIO_WEIGHT)).get(0))
-                .intValue();
+        return Integer.valueOf(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT)).get(0)).intValue();
     }
 
-    public void setBlkioWeightDevice(Device device, int weight)
-            throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE),
-                makeContext(device, weight));
+    public void setBlkioWeightDevice(Device device, int weight) throws IOException {
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE), makeContext(device, weight));
     }
 
     public Map<Device, Integer> getBlkioWeightDevice() throws IOException {
-        List<String> strings =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        BLKIO_WEIGHT_DEVICE));
+        List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE));
         Map<Device, Integer> result = new HashMap<Device, Integer>();
         for (String string : strings) {
             String[] strArgs = string.split(" ");
@@ -104,15 +88,11 @@ public class BlkioCore implements CgroupCore {
     }
 
     public void setReadBps(Device device, long bps) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE),
-                makeContext(device, bps));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps));
     }
 
     public Map<Device, Long> getReadBps() throws IOException {
-        List<String> strings =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        BLKIO_THROTTLE_READ_BPS_DEVICE));
+        List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE));
         Map<Device, Long> result = new HashMap<Device, Long>();
         for (String string : strings) {
             String[] strArgs = string.split(" ");
@@ -124,15 +104,11 @@ public class BlkioCore implements CgroupCore {
     }
 
     public void setWriteBps(Device device, long bps) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE),
-                makeContext(device, bps));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps));
     }
 
     public Map<Device, Long> getWriteBps() throws IOException {
-        List<String> strings =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        BLKIO_THROTTLE_WRITE_BPS_DEVICE));
+        List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE));
         Map<Device, Long> result = new HashMap<Device, Long>();
         for (String string : strings) {
             String[] strArgs = string.split(" ");
@@ -144,15 +120,11 @@ public class BlkioCore implements CgroupCore {
     }
 
     public void setReadIOps(Device device, long iops) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE),
-                makeContext(device, iops));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops));
     }
 
     public Map<Device, Long> getReadIOps() throws IOException {
-        List<String> strings =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        BLKIO_THROTTLE_READ_IOPS_DEVICE));
+        List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE));
         Map<Device, Long> result = new HashMap<Device, Long>();
         for (String string : strings) {
             String[] strArgs = string.split(" ");
@@ -164,15 +136,11 @@ public class BlkioCore implements CgroupCore {
     }
 
     public void setWriteIOps(Device device, long iops) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE),
-                makeContext(device, iops));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops));
     }
 
     public Map<Device, Long> getWriteIOps() throws IOException {
-        List<String> strings =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        BLKIO_THROTTLE_WRITE_IOPS_DEVICE));
+        List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE));
         Map<Device, Long> result = new HashMap<Device, Long>();
         for (String string : strings) {
             String[] strArgs = string.split(" ");
@@ -183,23 +151,17 @@ public class BlkioCore implements CgroupCore {
         return result;
     }
 
-    public Map<Device, Map<RecordType, Long>> getThrottleIOServiced()
-            throws IOException {
-        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
-                this.dir, BLKIO_THROTTLE_IO_SERVICED)));
+    public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICED)));
     }
 
-    public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte()
-            throws IOException {
-        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
-                this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES)));
+    public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES)));
     }
 
     public Map<Device, Long> getBlkioTime() throws IOException {
         Map<Device, Long> result = new HashMap<Device, Long>();
-        List<String> strs =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        BLKIO_TIME));
+        List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_TIME));
         for (String str : strs) {
             String[] strArgs = str.split(" ");
             result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1]));
@@ -209,9 +171,7 @@ public class BlkioCore implements CgroupCore {
 
     public Map<Device, Long> getBlkioSectors() throws IOException {
         Map<Device, Long> result = new HashMap<Device, Long>();
-        List<String> strs =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        BLKIO_SECTORS));
+        List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_SECTORS));
         for (String str : strs) {
             String[] strArgs = str.split(" ");
             result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1]));
@@ -219,43 +179,32 @@ public class BlkioCore implements CgroupCore {
         return result;
     }
 
-    public Map<Device, Map<RecordType, Long>> getIOServiced()
-            throws IOException {
-        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
-                this.dir, BLKIO_IO_SERVICED)));
+    public Map<Device, Map<RecordType, Long>> getIOServiced() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICED)));
     }
 
-    public Map<Device, Map<RecordType, Long>> getIOServiceBytes()
-            throws IOException {
-        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
-                this.dir, BLKIO_IO_SERVICE_BYTES)));
+    public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_BYTES)));
     }
 
-    public Map<Device, Map<RecordType, Long>> getIOServiceTime()
-            throws IOException {
-        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
-                this.dir, BLKIO_IO_SERVICE_TIME)));
+    public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_TIME)));
     }
 
-    public Map<Device, Map<RecordType, Long>> getIOWaitTime()
-            throws IOException {
-        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
-                this.dir, BLKIO_IO_WAIT_TIME)));
+    public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_WAIT_TIME)));
     }
 
     public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException {
-        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
-                this.dir, BLKIO_IO_MERGED)));
+        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_MERGED)));
     }
 
     public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException {
-        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
-                this.dir, BLKIO_IO_QUEUED)));
+        return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_QUEUED)));
     }
 
     public void resetStats() throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, BLKIO_RESET_STATS), "1");
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_RESET_STATS), "1");
     }
 
     private String makeContext(Device device, Object data) {
@@ -265,8 +214,7 @@ public class BlkioCore implements CgroupCore {
     }
 
     private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> strs) {
-        Map<Device, Map<RecordType, Long>> result =
-                new HashMap<Device, Map<RecordType, Long>>();
+        Map<Device, Map<RecordType, Long>> result = new HashMap<Device, Map<RecordType, Long>>();
         for (String str : strs) {
             String[] strArgs = str.split(" ");
             if (strArgs.length != 3)

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java
index 609898e..6a723a0 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java
@@ -46,62 +46,47 @@ public class CpuCore implements CgroupCore {
     }
 
     public void setCpuShares(int weight) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES),
-                String.valueOf(weight));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight));
     }
 
     public int getCpuShares() throws IOException {
-        return Integer.parseInt(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, CPU_SHARES)).get(0));
+        return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0));
     }
 
     public void setCpuRtRuntimeUs(long us) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPU_RT_RUNTIME_US),
-                String.valueOf(us));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US), String.valueOf(us));
     }
 
     public long getCpuRtRuntimeUs() throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0));
     }
 
     public void setCpuRtPeriodUs(long us) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPU_RT_PERIOD_US),
-                String.valueOf(us));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US), String.valueOf(us));
     }
 
     public Long getCpuRtPeriodUs() throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, CPU_RT_PERIOD_US)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US)).get(0));
     }
 
     public void setCpuCfsPeriodUs(long us) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPU_CFS_PERIOD_US),
-                String.valueOf(us));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us));
     }
 
     public Long getCpuCfsPeriodUs(long us) throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0));
     }
 
     public void setCpuCfsQuotaUs(long us) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPU_CFS_QUOTA_US),
-                String.valueOf(us));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us));
     }
 
     public Long getCpuCfsQuotaUs(long us) throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0));
     }
 
     public Stat getCpuStat() throws IOException {
-        return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                CPU_STAT)));
+        return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_STAT)));
     }
 
     public static class Stat {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java
index c54421b..8bec196 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java
@@ -45,14 +45,11 @@ public class CpuacctCore implements CgroupCore {
     }
 
     public Long getCpuUsage() throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, CPUACCT_USAGE)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE)).get(0));
     }
 
     public Map<StatType, Long> getCpuStat() throws IOException {
-        List<String> strs =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        CPUACCT_STAT));
+        List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_STAT));
         Map<StatType, Long> result = new HashMap<StatType, Long>();
         result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1]));
         result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1]));
@@ -60,10 +57,7 @@ public class CpuacctCore implements CgroupCore {
     }
 
     public Long[] getPerCpuUsage() throws IOException {
-        String str =
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUACCT_USAGE_PERCPU))
-                        .get(0);
+        String str = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE_PERCPU)).get(0);
         String[] strArgs = str.split(" ");
         Long[] result = new Long[strArgs.length];
         for (int i = 0; i < result.length; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java
index d693b6c..02bcace 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java
@@ -32,18 +32,12 @@ public class CpusetCore implements CgroupCore {
     public static final String CPUSET_CPU_EXCLUSIVE = "/cpuset.cpu_exclusive";
     public static final String CPUSET_MEM_EXCLUSIVE = "/cpuset.mem_exclusive";
     public static final String CPUSET_MEM_HARDWALL = "/cpuset.mem_hardwall";
-    public static final String CPUSET_MEMORY_PRESSURE =
-            "/cpuset.memory_pressure";
-    public static final String CPUSET_MEMORY_PRESSURE_ENABLED =
-            "/cpuset.memory_pressure_enabled";
-    public static final String CPUSET_MEMORY_SPREAD_PAGE =
-            "/cpuset.memory_spread_page";
-    public static final String CPUSET_MEMORY_SPREAD_SLAB =
-            "/cpuset.memory_spread_slab";
-    public static final String CPUSET_SCHED_LOAD_BALANCE =
-            "/cpuset.sched_load_balance";
-    public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL =
-            "/cpuset.sched_relax_domain_level";
+    public static final String CPUSET_MEMORY_PRESSURE = "/cpuset.memory_pressure";
+    public static final String CPUSET_MEMORY_PRESSURE_ENABLED = "/cpuset.memory_pressure_enabled";
+    public static final String CPUSET_MEMORY_SPREAD_PAGE = "/cpuset.memory_spread_page";
+    public static final String CPUSET_MEMORY_SPREAD_SLAB = "/cpuset.memory_spread_slab";
+    public static final String CPUSET_SCHED_LOAD_BALANCE = "/cpuset.sched_load_balance";
+    public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL = "/cpuset.sched_relax_domain_level";
 
     private final String dir;
 
@@ -64,14 +58,11 @@ public class CpusetCore implements CgroupCore {
             sb.append(',');
         }
         sb.deleteCharAt(sb.length() - 1);
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS),
-                sb.toString());
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS), sb.toString());
     }
 
     public int[] getCpus() throws IOException {
-        String output =
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_CPUS)).get(0);
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPUS)).get(0);
         return parseNums(output);
     }
 
@@ -82,147 +73,97 @@ public class CpusetCore implements CgroupCore {
             sb.append(',');
         }
         sb.deleteCharAt(sb.length() - 1);
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS),
-                sb.toString());
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS), sb.toString());
     }
 
     public int[] getMems() throws IOException {
-        String output =
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_MEMS)).get(0);
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMS)).get(0);
         return parseNums(output);
     }
 
     public void setMemMigrate(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isMemMigrate() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(
-                        0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(0));
         return output > 0;
     }
 
     public void setCpuExclusive(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isCpuExclusive() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE))
-                        .get(0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)).get(0));
         return output > 0;
     }
 
     public void setMemExclusive(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isMemExclusive() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE))
-                        .get(0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)).get(0));
         return output > 0;
     }
 
     public void setMemHardwall(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_MEM_HARDWALL),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isMemHardwall() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0));
         return output > 0;
     }
 
     public int getMemPressure() throws IOException {
-        String output =
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE))
-                        .get(0);
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE)).get(0);
         return Integer.parseInt(output);
     }
 
     public void setMemPressureEnabled(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isMemPressureEnabled() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir,
-                                CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
         return output > 0;
     }
 
     public void setMemSpreadPage(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isMemSpreadPage() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE))
-                        .get(0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)).get(0));
         return output > 0;
     }
 
     public void setMemSpreadSlab(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isMemSpreadSlab() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB))
-                        .get(0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)).get(0));
         return output > 0;
     }
 
     public void setSchedLoadBlance(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isSchedLoadBlance() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE))
-                        .get(0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)).get(0));
         return output > 0;
     }
 
     public void setSchedRelaxDomainLevel(int value) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL),
-                String.valueOf(value));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value));
     }
 
     public int getSchedRelaxDomainLevel() throws IOException {
-        String output =
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir,
-                                CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
         return Integer.parseInt(output);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java
index 1832668..491fc8f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java
@@ -110,8 +110,7 @@ public class DevicesCore implements CgroupCore {
             final int prime = 31;
             int result = 1;
             result = prime * result + accesses;
-            result =
-                    prime * result + ((device == null) ? 0 : device.hashCode());
+            result = prime * result + ((device == null) ? 0 : device.hashCode());
             result = prime * result + type;
             return result;
         }
@@ -161,27 +160,21 @@ public class DevicesCore implements CgroupCore {
         }
     }
 
-    private void setPermission(String prop, char type, Device device,
-            int accesses) throws IOException {
+    private void setPermission(String prop, char type, Device device, int accesses) throws IOException {
         Record record = new Record(type, device, accesses);
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop),
-                record.toString());
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop), record.toString());
     }
 
-    public void setAllow(char type, Device device, int accesses)
-            throws IOException {
+    public void setAllow(char type, Device device, int accesses) throws IOException {
         setPermission(DEVICES_ALLOW, type, device, accesses);
     }
 
-    public void setDeny(char type, Device device, int accesses)
-            throws IOException {
+    public void setDeny(char type, Device device, int accesses) throws IOException {
         setPermission(DEVICES_DENY, type, device, accesses);
     }
 
     public Record[] getList() throws IOException {
-        List<String> output =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        DEVICES_LIST));
+        List<String> output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, DEVICES_LIST));
         return Record.parseRecordList(output);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java
index c601c3e..e0ad3da 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java
@@ -40,13 +40,11 @@ public class FreezerCore implements CgroupCore {
     }
 
     public void setState(State state) throws IOException {
-        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE),
-                state.name().toUpperCase());
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE), state.name().toUpperCase());
     }
 
     public State getState() throws IOException {
-        return State.getStateValue(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, FREEZER_STATE)).get(0));
+        return State.getStateValue(CgroupUtils.readFileByLine(Constants.getDir(this.dir, FREEZER_STATE)).get(0));
     }
 
     public enum State {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java
index a2db78c..1b37bd3 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java
@@ -27,15 +27,11 @@ public class MemoryCore implements CgroupCore {
 
     public static final String MEMORY_STAT = "/memory.stat";
     public static final String MEMORY_USAGE_IN_BYTES = "/memory.usage_in_bytes";
-    public static final String MEMORY_MEMSW_USAGE_IN_BYTES =
-            "/memory.memsw.usage_in_bytes";
-    public static final String MEMORY_MAX_USAGE_IN_BYTES =
-            "/memory.max_usage_in_bytes";
-    public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES =
-            "/memory.memsw.max_usage_in_bytes";
+    public static final String MEMORY_MEMSW_USAGE_IN_BYTES = "/memory.memsw.usage_in_bytes";
+    public static final String MEMORY_MAX_USAGE_IN_BYTES = "/memory.max_usage_in_bytes";
+    public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES = "/memory.memsw.max_usage_in_bytes";
     public static final String MEMORY_LIMIT_IN_BYTES = "/memory.limit_in_bytes";
-    public static final String MEMORY_MEMSW_LIMIT_IN_BYTES =
-            "/memory.memsw.limit_in_bytes";
+    public static final String MEMORY_MEMSW_LIMIT_IN_BYTES = "/memory.memsw.limit_in_bytes";
     public static final String MEMORY_FAILCNT = "/memory.failcnt";
     public static final String MEMORY_MEMSW_FAILCNT = "/memory.memsw.failcnt";
     public static final String MEMORY_FORCE_EMPTY = "/memory.force_empty";
@@ -115,113 +111,78 @@ public class MemoryCore implements CgroupCore {
     }
 
     public Stat getStat() throws IOException {
-        String output =
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, MEMORY_STAT)).get(0);
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_STAT)).get(0);
         Stat stat = new Stat(output);
         return stat;
     }
 
     public long getPhysicalUsage() throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0));
     }
 
     public long getWithSwapUsage() throws IOException {
-        return Long
-                .parseLong(CgroupUtils
-                        .readFileByLine(
-                                Constants.getDir(this.dir,
-                                        MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
     }
 
     public long getMaxPhysicalUsage() throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0));
     }
 
     public long getMaxWithSwapUsage() throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES))
-                .get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0));
     }
 
     public void setPhysicalUsageLimit(long value) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES),
-                String.valueOf(value));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), String.valueOf(value));
     }
 
     public long getPhysicalUsageLimit() throws IOException {
-        return Long.parseLong(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0));
     }
 
     public void setWithSwapUsageLimit(long value) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES),
-                String.valueOf(value));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value));
     }
 
     public long getWithSwapUsageLimit() throws IOException {
-        return Long
-                .parseLong(CgroupUtils
-                        .readFileByLine(
-                                Constants.getDir(this.dir,
-                                        MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
+        return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
     }
 
     public int getPhysicalFailCount() throws IOException {
-        return Integer.parseInt(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, MEMORY_FAILCNT)).get(0));
+        return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_FAILCNT)).get(0));
     }
 
     public int getWithSwapFailCount() throws IOException {
-        return Integer.parseInt(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0));
+        return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0));
     }
 
     public void clearForceEmpty() throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, MEMORY_FORCE_EMPTY),
-                String.valueOf(0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_FORCE_EMPTY), String.valueOf(0));
     }
 
     public void setSwappiness(int value) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, MEMORY_SWAPPINESS),
-                String.valueOf(value));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS), String.valueOf(value));
     }
 
     public int getSwappiness() throws IOException {
-        return Integer.parseInt(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, MEMORY_SWAPPINESS)).get(0));
+        return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS)).get(0));
     }
 
     public void setUseHierarchy(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, MEMORY_USE_HIERARCHY),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isUseHierarchy() throws IOException {
-        int output =
-                Integer.parseInt(CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, MEMORY_USE_HIERARCHY))
-                        .get(0));
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY)).get(0));
         return output > 0;
     }
 
     public void setOomControl(boolean flag) throws IOException {
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, MEMORY_OOM_CONTROL),
-                String.valueOf(flag ? 1 : 0));
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0));
     }
 
     public boolean isOomControl() throws IOException {
-        String output =
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0);
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0);
         output = output.split("\n")[0].split("[\\s]")[1];
         int value = Integer.parseInt(output);
         return value > 0;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java
index dd80c0a..e7c376d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java
@@ -58,14 +58,11 @@ public class NetClsCore implements CgroupCore {
         StringBuilder sb = new StringBuilder("0x");
         sb.append(toHex(major));
         sb.append(toHex(minor));
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, NET_CLS_CLASSID), sb.toString());
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID), sb.toString());
     }
 
     public Device getClassId() throws IOException {
-        String output =
-                CgroupUtils.readFileByLine(
-                        Constants.getDir(this.dir, NET_CLS_CLASSID)).get(0);
+        String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID)).get(0);
         output = Integer.toHexString(Integer.parseInt(output));
         int major = Integer.parseInt(output.substring(0, output.length() - 4));
         int minor = Integer.parseInt(output.substring(output.length() - 4));

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java
index fd7e899..6b9b344 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java
@@ -44,8 +44,7 @@ public class NetPrioCore implements CgroupCore {
     }
 
     public int getPrioId() throws IOException {
-        return Integer.parseInt(CgroupUtils.readFileByLine(
-                Constants.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0));
+        return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0));
     }
 
     public void setIfPrioMap(String iface, int priority) throws IOException {
@@ -53,15 +52,12 @@ public class NetPrioCore implements CgroupCore {
         sb.append(iface);
         sb.append(' ');
         sb.append(priority);
-        CgroupUtils.writeFileByLine(
-                Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString());
+        CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString());
     }
 
     public Map<String, Integer> getIfPrioMap() throws IOException {
         Map<String, Integer> result = new HashMap<String, Integer>();
-        List<String> strs =
-                CgroupUtils.readFileByLine(Constants.getDir(this.dir,
-                        NET_PRIO_IFPRIOMAP));
+        List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP));
         for (String str : strs) {
             String[] strArgs = str.split(" ");
             result.put(strArgs[0], Integer.valueOf(strArgs[1]));

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java
index 6c2bd21..f788996 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java
@@ -37,9 +37,8 @@ public class DefaultInimbus implements INimbus {
     }
 
     @Override
-    public Collection<WorkerSlot> allSlotsAvailableForScheduling(
-            Collection<SupervisorDetails> existingSupervisors,
-            Topologies topologies, Set<String> topologiesMissingAssignments) {
+    public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies,
+            Set<String> topologiesMissingAssignments) {
         // TODO Auto-generated method stub
         Collection<WorkerSlot> result = new HashSet<WorkerSlot>();
         for (SupervisorDetails detail : existingSupervisors) {
@@ -50,15 +49,13 @@ public class DefaultInimbus implements INimbus {
     }
 
     @Override
-    public void assignSlots(Topologies topologies,
-            Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
+    public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
         // TODO Auto-generated method stub
 
     }
 
     @Override
-    public String getHostName(
-            Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
+    public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
         // TODO Auto-generated method stub
         return null;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java
index 3858595..97dd079 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java
@@ -17,104 +17,87 @@
  */
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.cache.JStormCache;
 import com.alibaba.jstorm.cache.RocksDBCache;
 import com.alibaba.jstorm.cache.TimeoutMemCache;
-import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.StormBase;
 import com.alibaba.jstorm.cluster.StormClusterState;
 import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.task.TaskInfo;
-import com.alibaba.jstorm.task.error.TaskError;
 import com.alibaba.jstorm.utils.OSInfo;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
 
-public class NimbusCache{
-    private static final long serialVersionUID = 1685576554130463610L;
-    
+public class NimbusCache {
     private static final Logger LOG = LoggerFactory.getLogger(NimbusCache.class);
-    
-    
+
     public static final String TIMEOUT_MEM_CACHE_CLASS = TimeoutMemCache.class.getName();
     public static final String ROCKS_DB_CACHE_CLASS = RocksDBCache.class.getName();
-    
+
     protected JStormCache memCache;
     protected JStormCache dbCache;
     protected StormClusterState zkCluster;
-    
+
     public String getNimbusCacheClass(Map conf) {
         boolean isLinux = OSInfo.isLinux();
         boolean isMac = OSInfo.isMac();
         boolean isLocal = StormConfig.local_mode(conf);
-        
+
         if (isLocal == true) {
             return TIMEOUT_MEM_CACHE_CLASS;
         }
-        
+
         if (isLinux == false && isMac == false) {
             return TIMEOUT_MEM_CACHE_CLASS;
         }
-        
+
         String nimbusCacheClass = ConfigExtension.getNimbusCacheClass(conf);
         if (StringUtils.isBlank(nimbusCacheClass) == false) {
             return nimbusCacheClass;
         }
-        
+
         return ROCKS_DB_CACHE_CLASS;
-        
+
     }
-    
+
     public NimbusCache(Map conf, StormClusterState zkCluster) {
         super();
-        
+
         String dbCacheClass = getNimbusCacheClass(conf);
         LOG.info("NimbusCache db Cache will use {}", dbCacheClass);
-        
+
         try {
-            dbCache = (JStormCache)Utils.newInstance(dbCacheClass);
-            
+            dbCache = (JStormCache) Utils.newInstance(dbCacheClass);
+
             String dbDir = StormConfig.masterDbDir(conf);
             conf.put(RocksDBCache.ROCKSDB_ROOT_DIR, dbDir);
-            
+
             conf.put(RocksDBCache.ROCKSDB_RESET, ConfigExtension.getNimbusCacheReset(conf));
-            
+
             dbCache.init(conf);
-            
+
             if (dbCache instanceof TimeoutMemCache) {
                 memCache = dbCache;
-            }else {
+            } else {
                 memCache = new TimeoutMemCache();
                 memCache.init(conf);
             }
-        }catch(java.lang.UnsupportedClassVersionError e) {
-        	
-        	if (e.getMessage().indexOf("Unsupported major.minor version") >= 0) {
-        		LOG.error("!!!Please update jdk version to 7 or higher!!!");
-        		
-        	}
-        	LOG.error("Failed to create NimbusCache!", e);
+        } catch (UnsupportedClassVersionError e) {
+
+            if (e.getMessage().indexOf("Unsupported major.minor version") >= 0) {
+                LOG.error("!!!Please update jdk version to 7 or higher!!!");
+
+            }
+            LOG.error("Failed to create NimbusCache!", e);
             throw new RuntimeException(e);
         } catch (Exception e) {
             LOG.error("Failed to create NimbusCache!", e);
             throw new RuntimeException(e);
         }
-        
+
         this.zkCluster = zkCluster;
     }
 
@@ -128,19 +111,15 @@ public class NimbusCache{
 
     public void cleanup() {
         dbCache.cleanup();
-        
+
     }
-    
-    
+
     /**
      * 
-     * In the old design, 
-     * DBCache will cache all taskInfo/taskErrors, this will be useful for huge topology
+     * In the old design, DBCache will cache all taskInfo/taskErrors, this will be useful for huge topology
      * 
-     * But the latest zk design, taskInfo is only one znode, taskErros has few znode
-     * So remove them from DBCache
-     * Skip timely refresh taskInfo/taskErrors
+     * But the latest zk design, taskInfo is only one znode, taskErros has few znode So remove them from DBCache Skip timely refresh taskInfo/taskErrors
      * 
      */
-    
+
 }