You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:51 UTC

[13/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java
index 8d2ba24..78cfe73 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java
@@ -36,10 +36,12 @@ import com.alibaba.jstorm.container.cgroup.core.CgroupCore;
 import com.alibaba.jstorm.container.cgroup.core.CpuCore;
 import com.alibaba.jstorm.utils.JStormUtils;
 
+/**
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
+ */
 public class CgroupManager {
 
-    public static final Logger LOG = LoggerFactory
-            .getLogger(CgroupManager.class);
+    public static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class);
 
     public static final String JSTORM_HIERARCHY_NAME = "jstorm_cpu";
 
@@ -61,20 +63,16 @@ public class CgroupManager {
         // "/cgroup/cpu"
         rootDir = ConfigExtension.getCgroupRootDir(conf);
         if (rootDir == null)
-            throw new RuntimeException(
-                    "Check configuration file. The supervisor.cgroup.rootdir is missing.");
+            throw new RuntimeException("Check configuration file. The supervisor.cgroup.rootdir is missing.");
 
         File file = new File(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir);
         if (!file.exists()) {
-            LOG.error(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir
-                    + " is not existing.");
-            throw new RuntimeException(
-                    "Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file.");
+            LOG.error(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir + " is not existing.");
+            throw new RuntimeException("Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file.");
         }
         center = CgroupCenter.getInstance();
         if (center == null)
-            throw new RuntimeException(
-                    "Cgroup error, please check /proc/cgroups");
+            throw new RuntimeException("Cgroup error, please check /proc/cgroups");
         this.prepareSubSystem();
     }
 
@@ -90,13 +88,10 @@ public class CgroupManager {
         return value;
     }
 
-    private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit)
-            throws IOException {
+    private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) throws IOException {
         /*
-         * User cfs_period & cfs_quota to control the upper limit use of cpu
-         * core e.g. If making a process to fully use two cpu cores, set
-         * cfs_period_us to 100000 and set cfs_quota_us to 200000 The highest
-         * value of "cpu core upper limit" is 10
+         * User cfs_period & cfs_quota to control the upper limit use of cpu core e.g. If making a process to fully use two cpu cores, set cfs_period_us to
+         * 100000 and set cfs_quota_us to 200000 The highest value of "cpu core upper limit" is 10
          */
         cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit);
 
@@ -109,16 +104,13 @@ public class CgroupManager {
         }
     }
 
-    public String startNewWorker(Map conf, int cpuNum, String workerId)
-            throws SecurityException, IOException {
-        CgroupCommon workerGroup =
-                new CgroupCommon(workerId, h, this.rootCgroup);
+    public String startNewWorker(Map conf, int cpuNum, String workerId) throws SecurityException, IOException {
+        CgroupCommon workerGroup = new CgroupCommon(workerId, h, this.rootCgroup);
         this.center.create(workerGroup);
         CgroupCore cpu = workerGroup.getCores().get(SubSystemType.cpu);
         CpuCore cpuCore = (CpuCore) cpu;
         cpuCore.setCpuShares(cpuNum * ONE_CPU_SLOT);
-        setCpuUsageUpperLimit(cpuCore,
-                ConfigExtension.getWorkerCpuCoreUpperLimit(conf));
+        setCpuUsageUpperLimit(cpuCore, ConfigExtension.getWorkerCpuCoreUpperLimit(conf));
 
         StringBuilder sb = new StringBuilder();
         sb.append("cgexec -g cpu:").append(workerGroup.getName()).append(" ");
@@ -126,8 +118,7 @@ public class CgroupManager {
     }
 
     public void shutDownWorker(String workerId, boolean isKilled) {
-        CgroupCommon workerGroup =
-                new CgroupCommon(workerId, h, this.rootCgroup);
+        CgroupCommon workerGroup = new CgroupCommon(workerId, h, this.rootCgroup);
         try {
             if (isKilled == false) {
                 for (Integer pid : workerGroup.getTasks()) {
@@ -151,9 +142,7 @@ public class CgroupManager {
         if (h == null) {
             Set<SubSystemType> types = new HashSet<SubSystemType>();
             types.add(SubSystemType.cpu);
-            h =
-                    new Hierarchy(JSTORM_HIERARCHY_NAME, types,
-                            JSTORM_CPU_HIERARCHY_DIR);
+            h = new Hierarchy(JSTORM_HIERARCHY_NAME, types, JSTORM_CPU_HIERARCHY_DIR);
         }
         rootCgroup = new CgroupCommon(rootDir, h, h.getRootCgroups());
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java
index e55aabe..d003a14 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java
@@ -36,6 +36,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
 
 /**
  * supervisor Heartbeat, just write SupervisorInfo to ZK
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
  */
 class Heartbeat extends RunnableCallback {
 
@@ -67,8 +68,7 @@ class Heartbeat extends RunnableCallback {
      * @param myHostName
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public Heartbeat(Map conf, StormClusterState stormClusterState,
-            String supervisorId) {
+    public Heartbeat(Map conf, StormClusterState stormClusterState, String supervisorId) {
 
         String myHostName = JStormServerUtils.getHostName(conf);
 
@@ -77,15 +77,12 @@ class Heartbeat extends RunnableCallback {
         this.conf = conf;
         this.myHostName = myHostName;
         this.startTime = TimeUtils.current_time_secs();
-        this.frequence =
-                JStormUtils.parseInt(conf
-                        .get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+        this.frequence = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
         this.hbUpdateTrigger = new AtomicBoolean(true);
 
         initSupervisorInfo(conf);
 
-        LOG.info("Successfully init supervisor heartbeat thread, "
-                + supervisorInfo);
+        LOG.info("Successfully init supervisor heartbeat thread, " + supervisorInfo);
     }
 
     private void initSupervisorInfo(Map conf) {
@@ -96,32 +93,28 @@ class Heartbeat extends RunnableCallback {
 
                 boolean isLocaliP = false;
                 isLocaliP = myHostName.equals("127.0.0.1");
-                if(isLocaliP){
+                if (isLocaliP) {
                     throw new Exception("the hostname which  supervisor get is localhost");
                 }
-            }catch(Exception e1){
+            } catch (Exception e1) {
                 LOG.error("get supervisor host error!", e1);
                 throw new RuntimeException(e1);
             }
             Set<Integer> ports = JStormUtils.listToSet(portList);
-            supervisorInfo =
-                    new SupervisorInfo(myHostName, supervisorId, ports);
+            supervisorInfo = new SupervisorInfo(myHostName, supervisorId, ports);
         } else {
-            Set<Integer> ports = JStormUtils.listToSet(portList.subList(0, 1));
-            supervisorInfo =
-                    new SupervisorInfo(myHostName, supervisorId, ports);
+            Set<Integer> ports = JStormUtils.listToSet(portList);
+            supervisorInfo = new SupervisorInfo(myHostName, supervisorId, ports);
         }
     }
 
     @SuppressWarnings("unchecked")
     public void update() {
         supervisorInfo.setTimeSecs(TimeUtils.current_time_secs());
-        supervisorInfo
-                .setUptimeSecs((int) (TimeUtils.current_time_secs() - startTime));
+        supervisorInfo.setUptimeSecs((int) (TimeUtils.current_time_secs() - startTime));
 
         try {
-            stormClusterState
-                    .supervisor_heartbeat(supervisorId, supervisorInfo);
+            stormClusterState.supervisor_heartbeat(supervisorId, supervisorInfo);
         } catch (Exception e) {
             LOG.error("Failed to update SupervisorInfo to ZK");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java
index fad1346..4ece066 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java
@@ -62,6 +62,9 @@ import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
 import com.sun.net.httpserver.HttpServer;
 
+/**
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
+ */
 public class Httpserver implements Shutdownable {
 
     private static Logger LOG = LoggerFactory.getLogger(Httpserver.class);
@@ -119,13 +122,11 @@ public class Httpserver implements Shutdownable {
 
         }
 
-        public void handlFailure(HttpExchange t, String errorMsg)
-                throws IOException {
+        public void handlFailure(HttpExchange t, String errorMsg) throws IOException {
             LOG.error(errorMsg);
 
             byte[] data = errorMsg.getBytes();
-            t.sendResponseHeaders(HttpURLConnection.HTTP_BAD_REQUEST,
-                    data.length);
+            t.sendResponseHeaders(HttpURLConnection.HTTP_BAD_REQUEST, data.length);
             OutputStream os = t.getResponseBody();
             os.write(data);
             os.close();
@@ -136,8 +137,7 @@ public class Httpserver implements Shutdownable {
             Map<String, String> paramMap = parseRawQuery(uri.getRawQuery());
             LOG.info("Receive command " + paramMap);
 
-            String cmd =
-                    paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD);
+            String cmd = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD);
             if (StringUtils.isBlank(cmd) == true) {
                 handlFailure(t, "Bad Request, Not set command type");
                 return;
@@ -146,16 +146,13 @@ public class Httpserver implements Shutdownable {
             if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW.equals(cmd)) {
                 handleShowLog(t, paramMap);
                 return;
-            } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_LIST
-                    .equals(cmd)) {
+            } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_LIST.equals(cmd)) {
                 handleListDir(t, paramMap);
                 return;
-            } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK
-                    .equals(cmd)) {
+            } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK.equals(cmd)) {
                 handleJstack(t, paramMap);
                 return;
-            } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF
-                    .equals(cmd)) {
+            } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF.equals(cmd)) {
                 handleShowConf(t, paramMap);
                 return;
             }
@@ -178,8 +175,7 @@ public class Httpserver implements Shutdownable {
 
             if (isChild == false) {
                 LOG.error("Access one disallowed path: " + canonicalPath);
-                throw new IOException(
-                        "Destination file/path is not accessible.");
+                throw new IOException("Destination file/path is not accessible.");
             }
         }
 
@@ -196,34 +192,27 @@ public class Httpserver implements Shutdownable {
             return paramMap;
         }
 
-        private void handleShowLog(HttpExchange t, Map<String, String> paramMap)
-                throws IOException {
+        private void handleShowLog(HttpExchange t, Map<String, String> paramMap) throws IOException {
             Pair<Long, byte[]> logPair = queryLog(t, paramMap);
             if (logPair == null) {
                 return;
             }
 
-            String size =
-                    String.format(
-                            HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT,
-                            logPair.getFirst());
+            String size = String.format(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT, logPair.getFirst());
             byte[] sizeByts = size.getBytes();
 
             byte[] logData = logPair.getSecond();
 
-            t.sendResponseHeaders(HttpURLConnection.HTTP_OK, sizeByts.length
-                    + logData.length);
+            t.sendResponseHeaders(HttpURLConnection.HTTP_OK, sizeByts.length + logData.length);
             OutputStream os = t.getResponseBody();
             os.write(sizeByts);
             os.write(logData);
             os.close();
         }
 
-        private Pair<Long, byte[]> queryLog(HttpExchange t,
-                Map<String, String> paramMap) throws IOException {
+        private Pair<Long, byte[]> queryLog(HttpExchange t, Map<String, String> paramMap) throws IOException {
 
-            String fileParam =
-                    paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_LOGFILE);
+            String fileParam = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_LOGFILE);
             if (StringUtils.isBlank(fileParam)) {
                 handlFailure(t, "Bad Request, Params Error, no log file name.");
                 return null;
@@ -242,8 +231,7 @@ public class Httpserver implements Shutdownable {
 
                 long position = fileSize - pageSize;
                 try {
-                    String posStr =
-                            paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_POS);
+                    String posStr = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_POS);
                     if (StringUtils.isBlank(posStr) == false) {
                         long pos = Long.valueOf(posStr);
 
@@ -258,15 +246,12 @@ public class Httpserver implements Shutdownable {
 
                 long size = Math.min(fileSize - position, pageSize);
 
-                LOG.info("logview " + logFile + ", position=" + position
-                        + ", size=" + size);
+                LOG.info("logview " + logFile + ", position=" + position + ", size=" + size);
                 fout = fc.map(FileChannel.MapMode.READ_ONLY, position, size);
 
                 ret = new byte[(int) size];
                 fout.get(ret);
-                String str =
-                        new String(ret,
-                                ConfigExtension.getLogViewEncoding(conf));
+                String str = new String(ret, ConfigExtension.getLogViewEncoding(conf));
                 return new Pair<Long, byte[]>(fileSize, str.getBytes());
 
             } catch (FileNotFoundException e) {
@@ -288,8 +273,7 @@ public class Httpserver implements Shutdownable {
         }
 
         byte[] getJSonFiles(String dir) throws Exception {
-            Map<String, FileAttribute> fileMap =
-                    new HashMap<String, FileAttribute>();
+            Map<String, FileAttribute> fileMap = new HashMap<String, FileAttribute>();
 
             String path = logDir;
             if (dir != null) {
@@ -332,13 +316,11 @@ public class Httpserver implements Shutdownable {
             return fileJsonStr.getBytes();
         }
 
-        void handleListDir(HttpExchange t, Map<String, String> paramMap)
-                throws IOException {
+        void handleListDir(HttpExchange t, Map<String, String> paramMap) throws IOException {
             byte[] filesJson = "Failed to get file list".getBytes();
 
             try {
-                String dir =
-                        paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_DIR);
+                String dir = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_DIR);
                 filesJson = getJSonFiles(dir);
             } catch (Exception e) {
                 LOG.error("Failed to list files", e);
@@ -358,15 +340,12 @@ public class Httpserver implements Shutdownable {
 
             try {
                 LOG.info("Begin to execute " + cmd);
-                Process process =
-                        JStormUtils.launch_process(cmd,
-                                new HashMap<String, String>(), false);
+                Process process = JStormUtils.launch_process(cmd, new HashMap<String, String>(), false);
 
                 // Process process = Runtime.getRuntime().exec(sb.toString());
 
                 InputStream stdin = process.getInputStream();
-                BufferedReader reader =
-                        new BufferedReader(new InputStreamReader(stdin));
+                BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
 
                 JStormUtils.sleepMs(1000);
 
@@ -398,10 +377,8 @@ public class Httpserver implements Shutdownable {
             }
         }
 
-        void handleJstack(HttpExchange t, Map<String, String> paramMap)
-                throws IOException {
-            String workerPort =
-                    paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT);
+        void handleJstack(HttpExchange t, Map<String, String> paramMap) throws IOException {
+            String workerPort = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT);
             if (workerPort == null) {
                 handlFailure(t, "Not set worker's port");
                 return;
@@ -425,8 +402,7 @@ public class Httpserver implements Shutdownable {
             os.close();
         }
 
-        void handleShowConf(HttpExchange t, Map<String, String> paramMap)
-                throws IOException {
+        void handleShowConf(HttpExchange t, Map<String, String> paramMap) throws IOException {
             byte[] json = "Failed to get configuration".getBytes();
 
             try {
@@ -452,8 +428,7 @@ public class Httpserver implements Shutdownable {
 
         try {
             hs = HttpServer.create(socketAddr, 0);
-            hs.createContext(HttpserverUtils.HTTPSERVER_CONTEXT_PATH_LOGVIEW,
-                    new LogHandler(conf));
+            hs.createContext(HttpserverUtils.HTTPSERVER_CONTEXT_PATH_LOGVIEW, new LogHandler(conf));
             hs.setExecutor(executor);
             hs.start();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java
index dfee522..8b52607 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java
@@ -50,8 +50,7 @@ import com.alibaba.jstorm.cluster.StormConfig;
  * @version
  */
 public class SandBoxMaker {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(SandBoxMaker.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SandBoxMaker.class);
 
     public static final String SANBOX_TEMPLATE_NAME = "sandbox.policy";
 
@@ -66,8 +65,7 @@ public class SandBoxMaker {
 
     private final boolean isEnable;
 
-    private final Map<String, String> replaceBaseMap =
-            new HashMap<String, String>();
+    private final Map<String, String> replaceBaseMap = new HashMap<String, String>();
 
     public SandBoxMaker(Map conf) {
         this.conf = conf;
@@ -83,8 +81,7 @@ public class SandBoxMaker {
 
         replaceBaseMap.put(JSTORM_HOME_KEY, jstormHome);
 
-        replaceBaseMap.put(LOCAL_DIR_KEY,
-                (String) conf.get(Config.STORM_LOCAL_DIR));
+        replaceBaseMap.put(LOCAL_DIR_KEY, (String) conf.get(Config.STORM_LOCAL_DIR));
 
         LOG.info("JSTORM_HOME is " + jstormHome);
     }
@@ -127,26 +124,19 @@ public class SandBoxMaker {
         return line;
     }
 
-    public String generatePolicyFile(Map<String, String> replaceMap)
-            throws IOException {
+    public String generatePolicyFile(Map<String, String> replaceMap) throws IOException {
         // dynamic generate policy file, no static file
-        String tmpPolicy =
-                StormConfig.supervisorTmpDir(conf) + File.separator
-                        + UUID.randomUUID().toString();
+        String tmpPolicy = StormConfig.supervisorTmpDir(conf) + File.separator + UUID.randomUUID().toString();
 
-        InputStream inputStream =
-                SandBoxMaker.class.getClassLoader().getResourceAsStream(
-                        SANBOX_TEMPLATE_NAME);
+        InputStream inputStream = SandBoxMaker.class.getClassLoader().getResourceAsStream(SANBOX_TEMPLATE_NAME);
 
-        PrintWriter writer =
-                new PrintWriter(new BufferedWriter(new FileWriter(tmpPolicy)));
+        PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(tmpPolicy)));
 
         try {
 
             InputStreamReader inputReader = new InputStreamReader(inputStream);
 
-            BufferedReader reader =
-                    new BufferedReader(new LineNumberReader(inputReader));
+            BufferedReader reader = new BufferedReader(new LineNumberReader(inputReader));
 
             String line = null;
             while ((line = reader.readLine()) != null) {
@@ -177,8 +167,7 @@ public class SandBoxMaker {
      * @return
      * @throws IOException
      */
-    public String sandboxPolicy(String workerId, Map<String, String> replaceMap)
-            throws IOException {
+    public String sandboxPolicy(String workerId, Map<String, String> replaceMap) throws IOException {
         if (isEnable == false) {
             return "";
         }
@@ -188,9 +177,7 @@ public class SandBoxMaker {
         String tmpPolicy = generatePolicyFile(replaceMap);
 
         File file = new File(tmpPolicy);
-        String policyPath =
-                StormConfig.worker_root(conf, workerId) + File.separator
-                        + SANBOX_TEMPLATE_NAME;
+        String policyPath = StormConfig.worker_root(conf, workerId) + File.separator + SANBOX_TEMPLATE_NAME;
         File dest = new File(policyPath);
         file.renameTo(dest);
 
@@ -210,9 +197,7 @@ public class SandBoxMaker {
         SandBoxMaker maker = new SandBoxMaker(conf);
 
         try {
-            System.out.println("sandboxPolicy:"
-                    + maker.sandboxPolicy("simple",
-                            new HashMap<String, String>()));
+            System.out.println("sandboxPolicy:" + maker.sandboxPolicy("simple", new HashMap<String, String>()));
         } catch (IOException e) {
             e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java
index 0b906e3..71859a1 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java
@@ -37,6 +37,9 @@ import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.PathUtils;
 import com.alibaba.jstorm.utils.TimeUtils;
 
+/**
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
+ */
 public class ShutdownWork extends RunnableCallback {
 
     private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class);
@@ -54,14 +57,9 @@ public class ShutdownWork extends RunnableCallback {
      * 
      * @return the topologys whose workers are shutdown successfully
      */
-    public void shutWorker(Map conf, String supervisorId,
-            Map<String, String> removed,
-            ConcurrentHashMap<String, String> workerThreadPids,
-            CgroupManager cgroupManager, boolean block,
-            Map<String, Integer> killingWorkers,
-            Map<String, Integer> taskCleanupTimeoutMap) {
-        Map<String, List<String>> workerId2Pids =
-                new HashMap<String, List<String>>();
+    public void shutWorker(Map conf, String supervisorId, Map<String, String> removed, ConcurrentHashMap<String, String> workerThreadPids,
+            CgroupManager cgroupManager, boolean block, Map<String, Integer> killingWorkers, Map<String, Integer> taskCleanupTimeoutMap) {
+        Map<String, List<String>> workerId2Pids = new HashMap<String, List<String>>();
 
         boolean localMode = false;
 
@@ -78,8 +76,7 @@ public class ShutdownWork extends RunnableCallback {
             try {
                 pids = getPid(conf, workerId);
             } catch (IOException e1) {
-                LOG.error("Failed to get pid for " + workerId + " of "
-                        + topologyId);
+                LOG.error("Failed to get pid for " + workerId + " of " + topologyId);
             }
             workerId2Pids.put(workerId, pids);
 
@@ -100,15 +97,10 @@ public class ShutdownWork extends RunnableCallback {
                         JStormUtils.process_killed(Integer.parseInt(pid));
                     }
 
-                    if (taskCleanupTimeoutMap != null
-                            && taskCleanupTimeoutMap.get(topologyId) != null) {
-                        maxWaitTime =
-                                Math.max(maxWaitTime,
-                                        taskCleanupTimeoutMap.get(topologyId));
+                    if (taskCleanupTimeoutMap != null && taskCleanupTimeoutMap.get(topologyId) != null) {
+                        maxWaitTime = Math.max(maxWaitTime, taskCleanupTimeoutMap.get(topologyId));
                     } else {
-                        maxWaitTime =
-                                Math.max(maxWaitTime, ConfigExtension
-                                        .getTaskCleanupTimeoutSec(conf));
+                        maxWaitTime = Math.max(maxWaitTime, ConfigExtension.getTaskCleanupTimeoutSec(conf));
                     }
                 } catch (Exception e) {
                     LOG.info("Failed to shutdown ", e);
@@ -126,8 +118,7 @@ public class ShutdownWork extends RunnableCallback {
             List<String> pids = workerId2Pids.get(workerId);
 
             int cleanupTimeout;
-            if (taskCleanupTimeoutMap != null
-                    && taskCleanupTimeoutMap.get(topologyId) != null) {
+            if (taskCleanupTimeoutMap != null && taskCleanupTimeoutMap.get(topologyId) != null) {
                 cleanupTimeout = taskCleanupTimeoutMap.get(topologyId);
             } else {
                 cleanupTimeout = ConfigExtension.getTaskCleanupTimeoutSec(conf);
@@ -137,8 +128,7 @@ public class ShutdownWork extends RunnableCallback {
             if (TimeUtils.current_time_secs() - initCleaupTime > cleanupTimeout) {
                 if (localMode == false) {
                     for (String pid : pids) {
-                        JStormUtils
-                                .ensure_process_killed(Integer.parseInt(pid));
+                        JStormUtils.ensure_process_killed(Integer.parseInt(pid));
                         if (cgroupManager != null) {
                             cgroupManager.shutDownWorker(workerId, true);
                         }
@@ -169,14 +159,12 @@ public class ShutdownWork extends RunnableCallback {
             // delete workerid dir, LOCAL_DIR/worker/workerid
             PathUtils.rmr(StormConfig.worker_root(conf, workerId));
         } catch (Exception e) {
-            LOG.warn(e + "Failed to cleanup worker " + workerId
-                    + ". Will retry later");
+            LOG.warn(e + "Failed to cleanup worker " + workerId + ". Will retry later");
         }
     }
 
     /**
-     * When worker has been started by manually and supervisor, it will return
-     * multiple pid
+     * When worker has been started by manually and supervisor, it will return multiple pid
      * 
      * @param conf
      * @param workerId

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java
index c159f4b..c6bed45a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java
@@ -47,7 +47,6 @@ public class StateHeartbeat {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java
index abc2448..c6c2877 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java
@@ -24,6 +24,7 @@ import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.alibaba.jstorm.daemon.worker.WorkerReportError;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,33 +52,28 @@ import com.alibaba.jstorm.utils.JStormUtils;
  * 
  * Supevisor workflow 1. write SupervisorInfo to ZK
  * 
- * 2. Every 10 seconds run SynchronizeSupervisor 2.1 download new topology 2.2
- * release useless worker 2.3 assgin new task to
- * /local-dir/supervisor/localstate 2.4 add one syncProcesses event
+ * 2. Every 10 seconds run SynchronizeSupervisor 2.1 download new topology 2.2 release useless worker 2.3 assgin new task to /local-dir/supervisor/localstate
+ * 2.4 add one syncProcesses event
  * 
- * 3. Every supervisor.monitor.frequency.secs run SyncProcesses 3.1 kill useless
- * worker 3.2 start new worker
+ * 3. Every supervisor.monitor.frequency.secs run SyncProcesses 3.1 kill useless worker 3.2 start new worker
  * 
- * 4. create heartbeat thread every supervisor.heartbeat.frequency.secs, write
- * SupervisorInfo to ZK
+ * 4. create heartbeat thread every supervisor.heartbeat.frequency.secs, write SupervisorInfo to ZK
+ *  @author Johnfang (xiaojian.fxj@alibaba-inc.com)
  */
 
 public class Supervisor {
 
     private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
 
-
     /**
      * create and start one supervisor
      * 
      * @param conf : configurationdefault.yaml storm.yaml
      * @param sharedContext : null (right now)
-     * @return SupervisorManger: which is used to shutdown all workers and
-     *         supervisor
+     * @return SupervisorManger: which is used to shutdown all workers and supervisor
      */
     @SuppressWarnings("rawtypes")
-    public SupervisorManger mkSupervisor(Map conf, IContext sharedContext)
-            throws Exception {
+    public SupervisorManger mkSupervisor(Map conf, IContext sharedContext) throws Exception {
 
         LOG.info("Starting Supervisor with conf " + conf);
 
@@ -91,13 +87,15 @@ public class Supervisor {
          * Step 2: create ZK operation instance StromClusterState
          */
 
-        StormClusterState stormClusterState =
-                Cluster.mk_storm_cluster_state(conf);
+        StormClusterState stormClusterState = Cluster.mk_storm_cluster_state(conf);
+
+        String hostName = JStormServerUtils.getHostName(conf);
+        WorkerReportError workerReportError =
+                new WorkerReportError(stormClusterState, hostName);
+
 
         /*
-         * Step 3, create LocalStat LocalStat is one KV database 4.1 create
-         * LocalState instance; 4.2 get supervisorId, if no supervisorId, create
-         * one
+         * Step 3, create LocalStat LocalStat is one KV database 4.1 create LocalState instance; 4.2 get supervisorId, if no supervisorId, create one
          */
 
         LocalState localState = StormConfig.supervisorState(conf);
@@ -115,13 +113,11 @@ public class Supervisor {
         // sync hearbeat to nimbus
         Heartbeat hb = new Heartbeat(conf, stormClusterState, supervisorId);
         hb.update();
-        AsyncLoopThread heartbeat =
-                new AsyncLoopThread(hb, false, null, Thread.MIN_PRIORITY, true);
+        AsyncLoopThread heartbeat = new AsyncLoopThread(hb, false, null, Thread.MIN_PRIORITY, true);
         threads.add(heartbeat);
 
         // Sync heartbeat to Apsara Container
-        AsyncLoopThread syncContainerHbThread =
-                SyncContainerHb.mkSupervisorInstance(conf);
+        AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkSupervisorInstance(conf);
         if (syncContainerHbThread != null) {
             threads.add(syncContainerHbThread);
         }
@@ -129,34 +125,22 @@ public class Supervisor {
         // Step 6 create and start sync Supervisor thread
         // every supervisor.monitor.frequency.secs second run SyncSupervisor
         EventManagerImp processEventManager = new EventManagerImp();
-        AsyncLoopThread processEventThread =
-                new AsyncLoopThread(processEventManager);
+        AsyncLoopThread processEventThread = new AsyncLoopThread(processEventManager);
         threads.add(processEventThread);
 
-        ConcurrentHashMap<String, String> workerThreadPids =
-                new ConcurrentHashMap<String, String>();
-        SyncProcessEvent syncProcessEvent =
-                new SyncProcessEvent(supervisorId, conf, localState,
-                        workerThreadPids, sharedContext);
+        ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<String, String>();
+        SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorId, conf, localState, workerThreadPids, sharedContext, workerReportError);
 
         EventManagerImp syncSupEventManager = new EventManagerImp();
-        AsyncLoopThread syncSupEventThread =
-                new AsyncLoopThread(syncSupEventManager);
+        AsyncLoopThread syncSupEventThread = new AsyncLoopThread(syncSupEventManager);
         threads.add(syncSupEventThread);
 
         SyncSupervisorEvent syncSupervisorEvent =
-                new SyncSupervisorEvent(supervisorId, conf,
-                        processEventManager, syncSupEventManager,
-                        stormClusterState, localState, syncProcessEvent, hb);
-
-        int syncFrequence =
-                JStormUtils.parseInt(conf
-                        .get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS));
-        EventManagerPusher syncSupervisorPusher =
-                new EventManagerPusher(syncSupEventManager,
-                        syncSupervisorEvent, syncFrequence);
-        AsyncLoopThread syncSupervisorThread =
-                new AsyncLoopThread(syncSupervisorPusher);
+                new SyncSupervisorEvent(supervisorId, conf, processEventManager, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb);
+
+        int syncFrequence = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS));
+        EventManagerPusher syncSupervisorPusher = new EventManagerPusher(syncSupEventManager, syncSupervisorEvent, syncFrequence);
+        AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher);
         threads.add(syncSupervisorThread);
 
         Httpserver httpserver = null;
@@ -168,9 +152,7 @@ public class Supervisor {
         }
 
         // SupervisorManger which can shutdown all supervisor and workers
-        return new SupervisorManger(conf, supervisorId, threads,
-                syncSupEventManager, processEventManager, httpserver,
-                stormClusterState, workerThreadPids);
+        return new SupervisorManger(conf, supervisorId, threads, syncSupEventManager, processEventManager, httpserver, stormClusterState, workerThreadPids);
     }
 
     /**
@@ -210,7 +192,7 @@ public class Supervisor {
             JStormUtils.redirectOutput("/dev/null");
 
             initShutdownHook(supervisorManager);
-            
+
             while (supervisorManager.isFinishShutdown() == false) {
                 try {
                     Thread.sleep(1000);
@@ -222,11 +204,10 @@ public class Supervisor {
         } catch (Exception e) {
             LOG.error("Failed to start supervisor\n", e);
             System.exit(1);
-        }finally {
-        	LOG.info("Shutdown supervisor!!!");
+        } finally {
+            LOG.info("Shutdown supervisor!!!");
         }
 
-        
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java
index f53ef72..ae89607 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang.builder.ToStringStyle;
 
 /**
  * Object stored in ZK /ZK-DIR/supervisors
- *
+ * 
  * @author Xin.Zhou/Longda
  */
 public class SupervisorInfo implements Serializable {
@@ -46,8 +46,7 @@ public class SupervisorInfo implements Serializable {
 
     private transient Set<Integer> availableWorkerPorts;
 
-    public SupervisorInfo(String hostName, String supervisorId,
-                          Set<Integer> workerPorts) {
+    public SupervisorInfo(String hostName, String supervisorId, Set<Integer> workerPorts) {
         this.hostName = hostName;
         this.supervisorId = supervisorId;
         this.workerPorts = workerPorts;
@@ -80,16 +79,19 @@ public class SupervisorInfo implements Serializable {
     public Set<Integer> getWorkerPorts() {
         return workerPorts;
     }
-    public void setAvailableWorkerPorts(Set<Integer> workerPorts){
+
+    public void setAvailableWorkerPorts(Set<Integer> workerPorts) {
         if (availableWorkerPorts == null)
             availableWorkerPorts = new HashSet<Integer>();
         availableWorkerPorts.addAll(workerPorts);
     }
+
     public Set<Integer> getAvailableWorkerPorts() {
         if (availableWorkerPorts == null)
             availableWorkerPorts = new HashSet<Integer>();
         return availableWorkerPorts;
     }
+
     public void setWorkerPorts(Set<Integer> workerPorts) {
         this.workerPorts = workerPorts;
     }
@@ -98,20 +100,11 @@ public class SupervisorInfo implements Serializable {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result =
-                prime * result + ((hostName == null) ? 0 : hostName.hashCode());
-        result =
-                prime
-                        * result
-                        + ((supervisorId == null) ? 0 : supervisorId.hashCode());
-        result =
-                prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode());
-        result =
-                prime * result
-                        + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode());
-        result =
-                prime * result
-                        + ((workerPorts == null) ? 0 : workerPorts.hashCode());
+        result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
+        result = prime * result + ((supervisorId == null) ? 0 : supervisorId.hashCode());
+        result = prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode());
+        result = prime * result + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode());
+        result = prime * result + ((workerPorts == null) ? 0 : workerPorts.hashCode());
         return result;
     }
 
@@ -154,19 +147,17 @@ public class SupervisorInfo implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
     /**
      * get Map<supervisorId, hostname>
-     *
+     * 
      * @param stormClusterState
      * @param callback
      * @return
      */
-    public static Map<String, String> getNodeHost(
-            Map<String, SupervisorInfo> supInfos) {
+    public static Map<String, String> getNodeHost(Map<String, SupervisorInfo> supInfos) {
 
         Map<String, String> rtn = new HashMap<String, String>();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java
index a2806de..99c2c76 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java
@@ -39,9 +39,9 @@ import com.alibaba.jstorm.utils.PathUtils;
 
 /**
  * supervisor shutdown manager which can shutdown supervisor
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
  */
-public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
-        DaemonCommon, Runnable {
+public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable {
 
     private static Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
 
@@ -67,11 +67,8 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
 
     private volatile boolean isFinishShutdown = false;
 
-    public SupervisorManger(Map conf, String supervisorId,
-            Vector<AsyncLoopThread> threads,
-            EventManager processesEventManager, EventManager eventManager,
-            Httpserver httpserver, StormClusterState stormClusterState,
-            ConcurrentHashMap<String, String> workerThreadPidsAtom) {
+    public SupervisorManger(Map conf, String supervisorId, Vector<AsyncLoopThread> threads, EventManager processesEventManager, EventManager eventManager,
+            Httpserver httpserver, StormClusterState stormClusterState, ConcurrentHashMap<String, String> workerThreadPidsAtom) {
         this.conf = conf;
         this.supervisorId = supervisorId;
         this.shutdown = new AtomicBoolean(false);
@@ -104,8 +101,7 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
             // } catch (InterruptedException e) {
             // LOG.error(e.getMessage(), e);
             // }
-            LOG.info("Successfully shutdown thread:"
-                    + thread.getThread().getName());
+            LOG.info("Successfully shutdown thread:" + thread.getThread().getName());
         }
         eventManager.shutdown();
         processesEventManager.shutdown();
@@ -144,15 +140,13 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
             return;
         }
         List<String> myWorkerIds = PathUtils.read_dir_contents(path);
-        HashMap<String, String> workerId2topologyIds =
-                new HashMap<String, String>();
+        HashMap<String, String> workerId2topologyIds = new HashMap<String, String>();
 
         for (String workerId : myWorkerIds) {
             workerId2topologyIds.put(workerId, null);
         }
 
-        shutWorker(conf, supervisorId, workerId2topologyIds,
-                workerThreadPidsAtom, null, true, null, null);
+        shutWorker(conf, supervisorId, workerId2topologyIds, workerThreadPidsAtom, null, true, null, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java
index d90eb29..01f2a3a 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import java.util.regex.Pattern;
 
+import com.alibaba.jstorm.daemon.worker.*;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,12 +46,6 @@ import backtype.storm.utils.LocalState;
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.daemon.worker.LocalAssignment;
-import com.alibaba.jstorm.daemon.worker.ProcessSimulator;
-import com.alibaba.jstorm.daemon.worker.State;
-import com.alibaba.jstorm.daemon.worker.Worker;
-import com.alibaba.jstorm.daemon.worker.WorkerHeartbeat;
-import com.alibaba.jstorm.daemon.worker.WorkerShutdown;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.Pair;
 import com.alibaba.jstorm.utils.PathUtils;
@@ -59,6 +54,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
 
 /**
  * SyncProcesses (1) kill bad worker (2) start new worker
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
  */
 class SyncProcessEvent extends ShutdownWork {
     private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
@@ -78,8 +74,7 @@ class SyncProcessEvent extends ShutdownWork {
     private SandBoxMaker sandBoxMaker;
 
     /**
-     * Due to the worker startTime is put in Supervisor memory, When supervisor
-     * restart, the starting worker is likely to be killed
+     * Due to the worker startTime is put in Supervisor memory, When supervisor restart, the starting worker is likely to be killed
      */
     private Map<String, Pair<Integer, Integer>> workerIdToStartTimeAndPort;
     /**
@@ -95,6 +90,8 @@ class SyncProcessEvent extends ShutdownWork {
     // private Supervisor supervisor;
     private int lastTime;
 
+    private WorkerReportError workerReportError;
+
     /**
      * @param conf
      * @param localState
@@ -104,10 +101,8 @@ class SyncProcessEvent extends ShutdownWork {
      * @param workerThreadPidsReadLock
      * @param workerThreadPidsWriteLock
      */
-    public SyncProcessEvent(String supervisorId, Map conf,
-            LocalState localState,
-            ConcurrentHashMap<String, String> workerThreadPids,
-            IContext sharedContext) {
+    public SyncProcessEvent(String supervisorId, Map conf, LocalState localState, ConcurrentHashMap<String, String> workerThreadPids,
+                            IContext sharedContext, WorkerReportError workerReportError) {
 
         this.supervisorId = supervisorId;
 
@@ -122,8 +117,7 @@ class SyncProcessEvent extends ShutdownWork {
 
         this.sandBoxMaker = new SandBoxMaker(conf);
 
-        this.workerIdToStartTimeAndPort =
-                new HashMap<String, Pair<Integer, Integer>>();
+        this.workerIdToStartTimeAndPort = new HashMap<String, Pair<Integer, Integer>>();
 
         this.needDownloadTopologys = new AtomicReference<Set>();
 
@@ -132,30 +126,27 @@ class SyncProcessEvent extends ShutdownWork {
         }
 
         killingWorkers = new HashMap<String, Integer>();
+        this.workerReportError = workerReportError;
     }
 
     /**
-     * @@@ Change the old logic In the old logic, it will store
-     *     LS_LOCAL_ASSIGNMENTS Map<String, Integer> into LocalState
-     * 
-     *     But I don't think LS_LOCAL_ASSIGNMENTS is useful, so remove this
-     *     logic
+     * @@@ Change the old logic In the old logic, it will store LS_LOCAL_ASSIGNMENTS Map<String, Integer> into LocalState
+     *
+     *     But I don't think LS_LOCAL_ASSIGNMENTS is useful, so remove this logic
      */
     @SuppressWarnings("unchecked")
     @Override
     public void run() {
-        
+
     }
 
-    public void run(Map<Integer, LocalAssignment> localAssignments) {
-        LOG.debug("Syncing processes, interval seconds:"
-                + TimeUtils.time_delta(lastTime));
+    public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds ) {
+        LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime));
         lastTime = TimeUtils.current_time_secs();
         try {
 
             /**
-             * Step 1: get assigned tasks from localstat Map<port(type Integer),
-             * LocalAssignment>
+             * Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment>
              */
             if (localAssignments == null) {
                 localAssignments = new HashMap<Integer, LocalAssignment>();
@@ -163,13 +154,11 @@ class SyncProcessEvent extends ShutdownWork {
             LOG.debug("Assigned tasks: " + localAssignments);
 
             /**
-             * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat
-             * Map<workerid [WorkerHeartbeat, state]>
+             * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]>
              */
             Map<String, StateHeartbeat> localWorkerStats = null;
             try {
-                localWorkerStats =
-                        getLocalWorkerStats(conf, localState, localAssignments);
+                localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments);
             } catch (Exception e) {
                 LOG.error("Failed to get Local worker stats");
                 throw e;
@@ -177,20 +166,14 @@ class SyncProcessEvent extends ShutdownWork {
             LOG.debug("Allocated: " + localWorkerStats);
 
             /**
-             * Step 3: kill Invalid Workers and remove killed worker from
-             * localWorkerStats
+             * Step 3: kill Invalid Workers and remove killed worker from localWorkerStats
              */
             Map<String, Integer> taskCleaupTimeoutMap = null;
             Set<Integer> keepPorts = null;
             try {
-                taskCleaupTimeoutMap =
-                        (Map<String, Integer>) localState
-                                .get(Common.LS_TASK_CLEANUP_TIMEOUT);
-                keepPorts =
-                        killUselessWorkers(localWorkerStats, localAssignments,
-                                taskCleaupTimeoutMap);
-                localState.put(Common.LS_TASK_CLEANUP_TIMEOUT,
-                        taskCleaupTimeoutMap);
+                taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
+                keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
+                localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap);
             } catch (IOException e) {
                 LOG.error("Failed to kill workers", e);
             }
@@ -202,7 +185,7 @@ class SyncProcessEvent extends ShutdownWork {
             checkNeedUpdateTopologys(localWorkerStats, localAssignments);
 
             // start new workers
-            startNewWorkers(keepPorts, localAssignments);
+            startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds);
 
         } catch (Exception e) {
             LOG.error("Failed Sync Process", e);
@@ -215,14 +198,13 @@ class SyncProcessEvent extends ShutdownWork {
      * check all workers is failed or not
      */
     @SuppressWarnings("unchecked")
-    public void checkNeedUpdateTopologys(
-            Map<String, StateHeartbeat> localWorkerStats,
-            Map<Integer, LocalAssignment> localAssignments) throws Exception {
+    public void checkNeedUpdateTopologys(Map<String, StateHeartbeat> localWorkerStats, Map<Integer, LocalAssignment> localAssignments) throws Exception {
         Set<String> topologys = new HashSet<String>();
+        Map<String, Long> topologyAssignTimeStamps = new HashMap<String, Long>();
 
-        for (Map.Entry<Integer, LocalAssignment> entry : localAssignments
-                .entrySet()) {
+        for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
             topologys.add(entry.getValue().getTopologyId());
+            topologyAssignTimeStamps.put(entry.getValue().getTopologyId(), entry.getValue().getTimeStamp());
         }
 
         for (StateHeartbeat stateHb : localWorkerStats.values()) {
@@ -236,32 +218,27 @@ class SyncProcessEvent extends ShutdownWork {
         Set<String> needRemoveTopologys = new HashSet<String>();
         for (String topologyId : topologys) {
             try {
-                long lastModifytime =
-                        StormConfig.get_supervisor_topology_Bianrymodify_time(
-                                conf, topologyId);
-                if ((currTime - lastModifytime) / 1000 < (JStormUtils.MIN_1 * 2)) {
+                long newAssignTime = topologyAssignTimeStamps.get(topologyId);
+                if ((currTime - newAssignTime) / 1000 < (JStormUtils.MIN_1 * 2)) {
                     LOG.debug("less 2 minite ,so removed " + topologyId);
                     needRemoveTopologys.add(topologyId);
                 }
             } catch (Exception e) {
-                LOG.error(
-                        "Failed to get the time of file last modification for topology"
-                                + topologyId, e);
+                LOG.error("Failed to get the time of file last modification for topology" + topologyId, e);
                 needRemoveTopologys.add(topologyId);
             }
         }
         topologys.removeAll(needRemoveTopologys);
 
         if (topologys.size() > 0) {
-            LOG.debug("Following topologys is going to re-download the jars, "
-                    + topologys);
+            LOG.debug("Following topologys is going to re-download the jars, " + topologys);
         }
         needDownloadTopologys.set(topologys);
     }
 
     /**
      * mark all new Workers
-     * 
+     *
      * @param workerIds
      * @pdOid 52b11418-7474-446d-bff5-0ecd68f4954f
      */
@@ -271,40 +248,32 @@ class SyncProcessEvent extends ShutdownWork {
 
         for (Entry<Integer, String> entry : workerIds.entrySet()) {
             String oldWorkerIds = portToWorkerId.get(entry.getKey());
-            if(oldWorkerIds != null){
+            if (oldWorkerIds != null) {
                 workerIdToStartTimeAndPort.remove(oldWorkerIds);
                 // update portToWorkerId
-                LOG.info("exit port is still occupied by old wokerId, so remove unuseful " +
-                        oldWorkerIds+ " form workerIdToStartTimeAndPort");
+                LOG.info("exit port is still occupied by old wokerId, so remove unuseful " + oldWorkerIds + " form workerIdToStartTimeAndPort");
             }
             portToWorkerId.put(entry.getKey(), entry.getValue());
-            workerIdToStartTimeAndPort.put(entry.getValue(),
-                    new Pair<Integer, Integer>(startTime, entry.getKey()));
+            workerIdToStartTimeAndPort.put(entry.getValue(), new Pair<Integer, Integer>(startTime, entry.getKey()));
         }
     }
 
     /**
-     * check new workers if the time is not > *
-     * SUPERVISOR_WORKER_START_TIMEOUT_SECS, otherwise info failed
-     * 
+     * check new workers if the time is not > * SUPERVISOR_WORKER_START_TIMEOUT_SECS, otherwise info failed
+     *
      * @param conf
      * @pdOid f0a6ab43-8cd3-44e1-8fd3-015a2ec51c6a
      */
-    public void checkNewWorkers(Map conf) throws IOException,
-            InterruptedException {
+    public void checkNewWorkers(Map conf) throws IOException, InterruptedException {
 
         Set<String> workers = new HashSet<String>();
-        for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort
-                .entrySet()) {
+        for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort.entrySet()) {
             String workerId = entry.getKey();
             int startTime = entry.getValue().getFirst();
             LocalState ls = StormConfig.worker_state(conf, workerId);
-            WorkerHeartbeat whb =
-                    (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
+            WorkerHeartbeat whb = (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
             if (whb == null) {
-                if ((TimeUtils.current_time_secs() - startTime) < JStormUtils
-                        .parseInt(conf
-                                .get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS))) {
+                if ((TimeUtils.current_time_secs() - startTime) < JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS))) {
                     LOG.info(workerId + " still hasn't started");
                 } else {
                     LOG.error("Failed to start Worker " + workerId);
@@ -321,15 +290,15 @@ class SyncProcessEvent extends ShutdownWork {
             this.portToWorkerId.remove(port);
         }
     }
-    public Map<Integer, String> getPortToWorkerId(){
+
+    public Map<Integer, String> getPortToWorkerId() {
         return portToWorkerId;
     }
 
     /**
      * get localstat approved workerId's map
-     * 
-     * @return Map<workerid [workerheart, state]> [workerheart, state] is also a
-     *         map, key is "workheartbeat" and "state"
+     *
+     * @return Map<workerid [workerheart, state]> [workerheart, state] is also a map, key is "workheartbeat" and "state"
      * @param conf
      * @param localState
      * @param assignedTasks
@@ -337,22 +306,17 @@ class SyncProcessEvent extends ShutdownWork {
      * @pdOid 11c9bebb-d082-4c51-b323-dd3d5522a649
      */
     @SuppressWarnings("unchecked")
-    public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf,
-            LocalState localState, Map<Integer, LocalAssignment> assignedTasks)
-            throws Exception {
+    public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf, LocalState localState, Map<Integer, LocalAssignment> assignedTasks) throws Exception {
 
-        Map<String, StateHeartbeat> workeridHbstate =
-                new HashMap<String, StateHeartbeat>();
+        Map<String, StateHeartbeat> workeridHbstate = new HashMap<String, StateHeartbeat>();
 
         int now = TimeUtils.current_time_secs();
 
         /**
-         * Get Map<workerId, WorkerHeartbeat> from
-         * local_dir/worker/ids/heartbeat
+         * Get Map<workerId, WorkerHeartbeat> from local_dir/worker/ids/heartbeat
          */
         Map<String, WorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
-        for (Map.Entry<String, WorkerHeartbeat> entry : idToHeartbeat
-                .entrySet()) {
+        for (Entry<String, WorkerHeartbeat> entry : idToHeartbeat.entrySet()) {
 
             String workerid = entry.getKey().toString();
 
@@ -366,10 +330,9 @@ class SyncProcessEvent extends ShutdownWork {
                 if (timeToPort != null) {
                     LocalAssignment localAssignment = assignedTasks.get(timeToPort.getSecond());
                     if (localAssignment == null) {
-                        LOG.info("Following worker don't exit assignment, so remove this port="
-                                + timeToPort.getSecond());
+                        LOG.info("Following worker don't exit assignment, so remove this port=" + timeToPort.getSecond());
                         state = State.disallowed;
-                        //workerId is disallowed ,so remove it from  workerIdToStartTimeAndPort
+                        // workerId is disallowed ,so remove it from workerIdToStartTimeAndPort
                         Integer port = this.workerIdToStartTimeAndPort.get(workerid).getSecond();
                         this.workerIdToStartTimeAndPort.remove(workerid);
                         this.portToWorkerId.remove(port);
@@ -381,12 +344,21 @@ class SyncProcessEvent extends ShutdownWork {
                 // isn't assigned task
                 state = State.disallowed;
 
-            } else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf
-                    .get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {//
+            } else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
+                if (killingWorkers.containsKey(workerid) == false) {
+                    String outTimeInfo = " it is likely to be out of memory, the worker is time out ";
+                    workerReportError.report(whb.getTopologyId(), whb.getPort(),
+                            whb.getTaskIds(), outTimeInfo);
+                }
 
                 state = State.timedOut;
             } else {
                 if (isWorkerDead(workerid)) {
+                    if (killingWorkers.containsKey(workerid) == false){
+                        String workeDeadInfo = "Worker is dead ";
+                        workerReportError.report(whb.getTopologyId(), whb.getPort(),
+                                whb.getTaskIds(), workeDeadInfo);
+                    }
                     state = State.timedOut;
                 } else {
                     state = State.valid;
@@ -395,13 +367,10 @@ class SyncProcessEvent extends ShutdownWork {
 
             if (state != State.valid) {
                 if (killingWorkers.containsKey(workerid) == false)
-                    LOG.info("Worker:" + workerid + " state:" + state
-                            + " WorkerHeartbeat:" + whb + " assignedTasks:"
-                            + assignedTasks + " at supervisor time-secs " + now);
+                    LOG.info("Worker:" + workerid + " state:" + state + " WorkerHeartbeat:" + whb + " assignedTasks:" + assignedTasks
+                            + " at supervisor time-secs " + now);
             } else {
-                LOG.debug("Worker:" + workerid + " state:" + state
-                        + " WorkerHeartbeat: " + whb
-                        + " at supervisor time-secs " + now);
+                LOG.debug("Worker:" + workerid + " state:" + state + " WorkerHeartbeat: " + whb + " at supervisor time-secs " + now);
             }
 
             workeridHbstate.put(workerid, new StateHeartbeat(state, whb));
@@ -412,32 +381,26 @@ class SyncProcessEvent extends ShutdownWork {
 
     /**
      * check whether the workerheartbeat is allowed in the assignedTasks
-     * 
+     *
      * @param whb : WorkerHeartbeat
      * @param assignedTasks
-     * @return boolean if true, the assignments(LS-LOCAL-ASSIGNMENTS) is match
-     *         with workerheart if fasle, is not matched
+     * @return boolean if true, the assignments(LS-LOCAL-ASSIGNMENTS) is match with workerheart if fasle, is not matched
      */
-    public boolean matchesAssignment(WorkerHeartbeat whb,
-            Map<Integer, LocalAssignment> assignedTasks) {
+    public boolean matchesAssignment(WorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedTasks) {
 
         boolean isMatch = true;
         LocalAssignment localAssignment = assignedTasks.get(whb.getPort());
 
         if (localAssignment == null) {
-            LOG.debug("Following worker has been removed, port="
-                    + whb.getPort() + ", assignedTasks=" + assignedTasks);
+            LOG.debug("Following worker has been removed, port=" + whb.getPort() + ", assignedTasks=" + assignedTasks);
             isMatch = false;
         } else if (!whb.getTopologyId().equals(localAssignment.getTopologyId())) {
             // topology id not equal
-            LOG.info("topology id not equal whb=" + whb.getTopologyId()
-                    + ",localAssignment=" + localAssignment.getTopologyId());
+            LOG.info("topology id not equal whb=" + whb.getTopologyId() + ",localAssignment=" + localAssignment.getTopologyId());
             isMatch = false;
         }/*
-          * else if (!(whb.getTaskIds().equals(localAssignment.getTaskIds()))) {
-          * // task-id isn't equal LOG.info("task-id isn't equal whb=" +
-          * whb.getTaskIds() + ",localAssignment=" +
-          * localAssignment.getTaskIds()); isMatch = false; }
+          * else if (!(whb.getTaskIds().equals(localAssignment.getTaskIds()))) { // task-id isn't equal LOG.info("task-id isn't equal whb=" + whb.getTaskIds() +
+          * ",localAssignment=" + localAssignment.getTaskIds()); isMatch = false; }
           */
 
         return isMatch;
@@ -445,17 +408,15 @@ class SyncProcessEvent extends ShutdownWork {
 
     /**
      * get all workers heartbeats of the supervisor
-     * 
+     *
      * @param conf
      * @return Map<workerId, WorkerHeartbeat>
      * @throws IOException
      * @throws IOException
      */
-    public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map conf)
-            throws Exception {
+    public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
 
-        Map<String, WorkerHeartbeat> workerHeartbeats =
-                new HashMap<String, WorkerHeartbeat>();
+        Map<String, WorkerHeartbeat> workerHeartbeats = new HashMap<String, WorkerHeartbeat>();
 
         // get the path: STORM-LOCAL-DIR/workers
         String path = StormConfig.worker_root(conf);
@@ -480,20 +441,19 @@ class SyncProcessEvent extends ShutdownWork {
 
     /**
      * get worker heartbeat by workerid
-     * 
+     *
      * @param conf
      * @param workerId
      * @returns WorkerHeartbeat
      * @throws IOException
      */
-    public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId)
-            throws Exception {
+    public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) throws Exception {
 
         try {
             LocalState ls = StormConfig.worker_state(conf, workerId);
 
             return (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
-        } catch (IOException e) {
+        } catch (Exception e) {
             LOG.error("Failed to get worker Heartbeat", e);
             return null;
         }
@@ -502,7 +462,7 @@ class SyncProcessEvent extends ShutdownWork {
 
     /**
      * launch a worker in local mode
-     * 
+     *
      * @param conf
      * @param sharedcontext
      * @param topologyId
@@ -512,17 +472,12 @@ class SyncProcessEvent extends ShutdownWork {
      * @param workerThreadPidsAtom
      * @throws Exception
      */
-    public void launchWorker(Map conf, IContext sharedcontext,
-            String topologyId, String supervisorId, Integer port,
-            String workerId,
-            ConcurrentHashMap<String, String> workerThreadPidsAtom)
-            throws Exception {
+    public void launchWorker(Map conf, IContext sharedcontext, String topologyId, String supervisorId, Integer port, String workerId,
+            ConcurrentHashMap<String, String> workerThreadPidsAtom) throws Exception {
 
         String pid = UUID.randomUUID().toString();
 
-        WorkerShutdown worker =
-                Worker.mk_worker(conf, sharedcontext, topologyId, supervisorId,
-                        port, workerId, null);
+        WorkerShutdown worker = Worker.mk_worker(conf, sharedcontext, topologyId, supervisorId, port, workerId, null);
 
         ProcessSimulator.registerProcess(pid, worker);
 
@@ -534,13 +489,11 @@ class SyncProcessEvent extends ShutdownWork {
     private Set<String> setFilterJars(Map totalConf) {
         Set<String> filterJars = new HashSet<String>();
 
-        boolean enableClassloader =
-                ConfigExtension.isEnableTopologyClassLoader(totalConf);
+        boolean enableClassloader = ConfigExtension.isEnableTopologyClassLoader(totalConf);
         if (enableClassloader == false) {
             // avoid logback vs log4j conflict
             boolean enableLog4j = false;
-            String userDefLog4jConf =
-                    ConfigExtension.getUserDefinedLog4jConf(totalConf);
+            String userDefLog4jConf = ConfigExtension.getUserDefinedLog4jConf(totalConf);
             if (StringUtils.isBlank(userDefLog4jConf) == false) {
                 enableLog4j = true;
             }
@@ -601,8 +554,7 @@ class SyncProcessEvent extends ShutdownWork {
         }
 
         if (stormHome != null) {
-            List<String> stormHomeFiles =
-                    PathUtils.read_dir_contents(stormHome);
+            List<String> stormHomeFiles = PathUtils.read_dir_contents(stormHome);
 
             for (String file : stormHomeFiles) {
                 if (file.endsWith(".jar")) {
@@ -610,13 +562,10 @@ class SyncProcessEvent extends ShutdownWork {
                 }
             }
 
-            List<String> stormLibFiles =
-                    PathUtils.read_dir_contents(stormHome + File.separator
-                            + "lib");
+            List<String> stormLibFiles = PathUtils.read_dir_contents(stormHome + File.separator + "lib");
             for (String file : stormLibFiles) {
                 if (file.endsWith(".jar")) {
-                    classSet.add(stormHome + File.separator + "lib"
-                            + File.separator + file);
+                    classSet.add(stormHome + File.separator + "lib" + File.separator + file);
                 }
             }
 
@@ -646,8 +595,7 @@ class SyncProcessEvent extends ShutdownWork {
         String childopts = " ";
 
         if (stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS) != null) {
-            childopts +=
-                    (String) stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
+            childopts += (String) stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
         } else if (ConfigExtension.getWorkerGc(stormConf) != null) {
             childopts += ConfigExtension.getWorkerGc(stormConf);
         }
@@ -655,8 +603,7 @@ class SyncProcessEvent extends ShutdownWork {
         return childopts;
     }
 
-    public String getLogParameter(Map conf, String stormHome,
-            String topologyName, int port) {
+    public String getLogParameter(Map conf, String stormHome, String topologyName, int port) {
         final String LOGBACK_CONF_TAG = "logback.configurationFile";
         final String LOGBACK_CONF_TAG_CMD = " -D" + LOGBACK_CONF_TAG + "=";
         final String DEFAULT_LOG_CONF = "jstorm.logback.xml";
@@ -664,13 +611,15 @@ class SyncProcessEvent extends ShutdownWork {
         String logFileName = JStormUtils.genLogName(topologyName, port);
         // String logFileName = topologyId + "-worker-" + port + ".log";
 
+
         StringBuilder commandSB = new StringBuilder();
         commandSB.append(" -Dlogfile.name=");
         commandSB.append(logFileName);
+        commandSB.append(" -Dtopology.name=").append(topologyName);
+
         // commandSB.append(" -Dlog4j.ignoreTCL=true");
 
-        String userDefLogbackConf =
-                ConfigExtension.getUserDefinedLogbackConf(conf);
+        String userDefLogbackConf = ConfigExtension.getUserDefinedLogbackConf(conf);
         String logConf = System.getProperty(LOGBACK_CONF_TAG);
 
         if (StringUtils.isBlank(userDefLogbackConf) == false) {
@@ -679,9 +628,7 @@ class SyncProcessEvent extends ShutdownWork {
         } else if (StringUtils.isBlank(logConf) == false) {
             commandSB.append(LOGBACK_CONF_TAG_CMD).append(logConf);
         } else if (StringUtils.isBlank(stormHome) == false) {
-            commandSB.append(LOGBACK_CONF_TAG_CMD).append(stormHome)
-                    .append(File.separator).append("conf")
-                    .append(File.separator).append(DEFAULT_LOG_CONF);
+            commandSB.append(LOGBACK_CONF_TAG_CMD).append(stormHome).append(File.separator).append("conf").append(File.separator).append(DEFAULT_LOG_CONF);
         } else {
             commandSB.append(LOGBACK_CONF_TAG_CMD + DEFAULT_LOG_CONF);
         }
@@ -690,38 +637,35 @@ class SyncProcessEvent extends ShutdownWork {
         String userDefLog4jConf = ConfigExtension.getUserDefinedLog4jConf(conf);
         if (StringUtils.isBlank(userDefLog4jConf) == false) {
             LOG.info("Use user fined log4j conf " + userDefLog4jConf);
-            commandSB.append(" -D" + LOG4J_CONF_TAG + "=").append(
-                    userDefLog4jConf);
+            commandSB.append(" -D" + LOG4J_CONF_TAG + "=").append(userDefLog4jConf);
         }
 
         return commandSB.toString();
     }
 
-    private String getGcDumpParam(Map totalConf) {
+    private String getGcDumpParam(String topologyName, Map totalConf) {
         // String gcPath = ConfigExtension.getWorkerGcPath(totalConf);
         String gcPath = JStormUtils.getLogDir();
 
         Date now = new Date();
         String nowStr = TimeFormat.getSecond(now);
 
-        StringBuilder gc = new StringBuilder();
-
+        StringBuilder gc = new StringBuilder(256);
         gc.append(" -Xloggc:");
-        gc.append(gcPath);
-        gc.append(File.separator);
-        gc.append("%TOPOLOGYID%-worker-%ID%-");
-        gc.append(nowStr);
+        gc.append(gcPath).append(File.separator);
+        gc.append(topologyName).append(File.separator);
+        gc.append("%TOPOLOGYID%-worker-%ID%");
         gc.append("-gc.log -verbose:gc -XX:HeapDumpPath=");
-        gc.append(gcPath).append(File.separator).append("java-%TOPOLOGYID%-")
-                .append(nowStr).append(".hprof");
+        gc.append(gcPath).append(File.separator).append(topologyName).append(File.separator).append("java-%TOPOLOGYID%-").append(nowStr).append(".hprof");
         gc.append(" ");
 
+
         return gc.toString();
     }
 
     /**
      * launch a worker in distributed mode
-     * 
+     *
      * @param conf
      * @param sharedcontext
      * @param topologyId
@@ -731,20 +675,17 @@ class SyncProcessEvent extends ShutdownWork {
      * @throws IOException
      * @pdOid 6ea369dd-5ce2-4212-864b-1f8b2ed94abb
      */
-    public void launchWorker(Map conf, IContext sharedcontext,
-            String topologyId, String supervisorId, Integer port,
-            String workerId, LocalAssignment assignment) throws IOException {
+    public void launchWorker(Map conf, IContext sharedcontext, String topologyId, String supervisorId, Integer port, String workerId, LocalAssignment assignment)
+            throws IOException {
 
         // STORM-LOCAL-DIR/supervisor/stormdist/topologyId
-        String stormroot =
-                StormConfig.supervisor_stormdist_root(conf, topologyId);
+        String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
 
         // STORM-LOCAL-DIR/supervisor/stormdist/topologyId/stormjar.jar
         String stormjar = StormConfig.stormjar_path(stormroot);
 
         // get supervisor conf
-        Map stormConf =
-                StormConfig.read_supervisor_topology_conf(conf, topologyId);
+        Map stormConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
 
         Map totalConf = new HashMap();
         totalConf.putAll(conf);
@@ -761,12 +702,13 @@ class SyncProcessEvent extends ShutdownWork {
         String stormhome = System.getProperty("jstorm.home");
 
         long memSize = assignment.getMem();
+        long memMinSize = ConfigExtension.getMemMinSizePerWorker(totalConf);
         int cpuNum = assignment.getCpu();
         long memGsize = memSize / JStormUtils.SIZE_1_G;
         int gcThreadsNum = memGsize > 4 ? (int) (memGsize * 1.5) : 4;
         String childopts = getChildOpts(totalConf);
 
-        childopts += getGcDumpParam(totalConf);
+        childopts += getGcDumpParam(Common.getTopologyNameById(topologyId), totalConf);
 
         Map<String, String> environment = new HashMap<String, String>();
 
@@ -776,15 +718,13 @@ class SyncProcessEvent extends ShutdownWork {
             environment.put("REDIRECT", "false");
         }
 
-        environment.put("LD_LIBRARY_PATH",
-                (String) totalConf.get(Config.JAVA_LIBRARY_PATH));
+        environment.put("LD_LIBRARY_PATH", (String) totalConf.get(Config.JAVA_LIBRARY_PATH));
 
         StringBuilder commandSB = new StringBuilder();
 
         try {
             if (this.cgroupManager != null) {
-                commandSB.append(cgroupManager.startNewWorker(totalConf,
-                        cpuNum, workerId));
+                commandSB.append(cgroupManager.startNewWorker(totalConf, cpuNum, workerId));
             }
         } catch (Exception e) {
             LOG.error("fail to prepare cgroup to workerId: " + workerId, e);
@@ -793,15 +733,21 @@ class SyncProcessEvent extends ShutdownWork {
 
         // commandSB.append("java -server -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n ");
         commandSB.append("java -server ");
-        commandSB.append(" -Xms" + memSize);
+        commandSB.append(" -Xms" + memMinSize);
         commandSB.append(" -Xmx" + memSize + " ");
-        commandSB.append(" -Xmn" + memSize / 3 + " ");
-        commandSB.append(" -XX:PermSize=" + memSize / 16);
-        commandSB.append(" -XX:MaxPermSize=" + memSize / 8);
+        if (memMinSize < (memSize / 2))
+            commandSB.append(" -Xmn" + memMinSize + " ");
+        else
+            commandSB.append(" -Xmn" + memSize / 2 + " ");
+        if (memGsize >= 2) {
+            commandSB.append(" -XX:PermSize=" + memSize / 32);
+        } else {
+            commandSB.append(" -XX:PermSize=" + memSize / 16);
+        }
+        commandSB.append(" -XX:MaxPermSize=" + memSize / 16);
         commandSB.append(" -XX:ParallelGCThreads=" + gcThreadsNum);
         commandSB.append(" " + childopts);
-        commandSB.append(" "
-                + (assignment.getJvm() == null ? "" : assignment.getJvm()));
+        commandSB.append(" " + (assignment.getJvm() == null ? "" : assignment.getJvm()));
 
         commandSB.append(" -Djava.library.path=");
         commandSB.append((String) totalConf.get(Config.JAVA_LIBRARY_PATH));
@@ -811,20 +757,18 @@ class SyncProcessEvent extends ShutdownWork {
             commandSB.append(stormhome);
         }
 
-        commandSB.append(getLogParameter(totalConf, stormhome,
-                assignment.getTopologyName(), port));
+        String logDir = System.getProperty("jstorm.log.dir");
+        if (logDir != null)
+             commandSB.append(" -Djstorm.log.dir=").append(logDir);
+        commandSB.append(getLogParameter(totalConf, stormhome, assignment.getTopologyName(), port));
 
         String classpath = getClassPath(stormjar, stormhome, totalConf);
-        String workerClassPath =
-                (String) totalConf.get(Config.TOPOLOGY_CLASSPATH);
-        List<String> otherLibs =
-                (List<String>) stormConf
-                        .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
+        String workerClassPath = (String) totalConf.get(Config.TOPOLOGY_CLASSPATH);
+        List<String> otherLibs = (List<String>) stormConf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
         StringBuilder sb = new StringBuilder();
         if (otherLibs != null) {
             for (String libName : otherLibs) {
-                sb.append(StormConfig.stormlib_path(stormroot, libName))
-                        .append(":");
+                sb.append(StormConfig.stormlib_path(stormroot, libName)).append(":");
             }
         }
         workerClassPath = workerClassPath + ":" + sb.toString();
@@ -832,8 +776,7 @@ class SyncProcessEvent extends ShutdownWork {
         Map<String, String> policyReplaceMap = new HashMap<String, String>();
         String realClassPath = classpath + ":" + workerClassPath;
         policyReplaceMap.put(SandBoxMaker.CLASS_PATH_KEY, realClassPath);
-        commandSB
-                .append(sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap));
+        commandSB.append(sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap));
 
         commandSB.append(" -cp ");
         // commandSB.append(workerClassPath + ":");
@@ -871,9 +814,7 @@ class SyncProcessEvent extends ShutdownWork {
         JStormUtils.launch_process(cmd, environment, true);
     }
 
-    private Set<Integer> killUselessWorkers(
-            Map<String, StateHeartbeat> localWorkerStats,
-            Map<Integer, LocalAssignment> localAssignments,
+    private Set<Integer> killUselessWorkers(Map<String, StateHeartbeat> localWorkerStats, Map<Integer, LocalAssignment> localAssignments,
             Map<String, Integer> taskCleanupTimeoutMap) {
         Map<String, String> removed = new HashMap<String, String>();
         Set<Integer> keepPorts = new HashSet<Integer>();
@@ -882,8 +823,7 @@ class SyncProcessEvent extends ShutdownWork {
 
             String workerid = entry.getKey();
             StateHeartbeat hbstate = entry.getValue();
-            if (workerIdToStartTimeAndPort.containsKey(workerid)
-                    && hbstate.getState().equals(State.notStarted))
+            if (workerIdToStartTimeAndPort.containsKey(workerid) && hbstate.getState().equals(State.notStarted))
                 continue;
 
             if (hbstate.getState().equals(State.valid)) {
@@ -891,8 +831,7 @@ class SyncProcessEvent extends ShutdownWork {
                 keepPorts.add(hbstate.getHeartbeat().getPort());
             } else {
                 if (hbstate.getHeartbeat() != null) {
-                    removed.put(workerid, hbstate.getHeartbeat()
-                            .getTopologyId());
+                    removed.put(workerid, hbstate.getHeartbeat().getTopologyId());
                 } else {
                     removed.put(workerid, null);
                 }
@@ -910,14 +849,12 @@ class SyncProcessEvent extends ShutdownWork {
             }
         }
 
-        shutWorker(conf, supervisorId, removed, workerThreadPids,
-                cgroupManager, false, killingWorkers, taskCleanupTimeoutMap);
+        shutWorker(conf, supervisorId, removed, workerThreadPids, cgroupManager, false, killingWorkers, taskCleanupTimeoutMap);
         Set<String> activeTopologys = new HashSet<String>();
         if (killingWorkers.size() == 0) {
             // When all workers under killing are killed successfully,
             // clean the task cleanup timeout map correspondingly.
-            for (Entry<Integer, LocalAssignment> entry : localAssignments
-                    .entrySet()) {
+            for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
                 activeTopologys.add(entry.getValue().getTopologyId());
             }
 
@@ -936,8 +873,7 @@ class SyncProcessEvent extends ShutdownWork {
             localWorkerStats.remove(removedWorkerId);
         }
         // Keep the workers which are still under starting
-        for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort
-                .entrySet()) {
+        for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort.entrySet()) {
             String workerId = entry.getKey();
             StateHeartbeat hbstate = localWorkerStats.get(workerId);
             if (hbstate != null)
@@ -948,14 +884,12 @@ class SyncProcessEvent extends ShutdownWork {
         return keepPorts;
     }
 
-    private void startNewWorkers(Set<Integer> keepPorts,
-            Map<Integer, LocalAssignment> localAssignments) throws Exception {
+    private void startNewWorkers(Set<Integer> keepPorts, Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds)
+            throws Exception {
         /**
-         * Step 4: get reassigned tasks, which is in assignedTasks, but not in
-         * keeperPorts Map<port(type Integer), LocalAssignment>
+         * Step 4: get reassigned tasks, which is in assignedTasks, but not in keeperPorts Map<port(type Integer), LocalAssignment>
          */
-        Map<Integer, LocalAssignment> newWorkers =
-                JStormUtils.select_keys_pred(keepPorts, localAssignments);
+        Map<Integer, LocalAssignment> newWorkers = JStormUtils.select_keys_pred(keepPorts, localAssignments);
 
         /**
          * Step 5: generate new work ids
@@ -965,7 +899,10 @@ class SyncProcessEvent extends ShutdownWork {
         for (Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) {
             Integer port = entry.getKey();
             LocalAssignment assignment = entry.getValue();
-
+            if (assignment != null && assignment.getTopologyId() != null && downloadFailedTopologyIds.contains(assignment.getTopologyId())) {
+                LOG.info("Can't start this worker: " + port + " about the topology: " + assignment.getTopologyId() + ", due to the damaged binary !!");
+                continue;
+            }
             String workerId = UUID.randomUUID().toString();
 
             newWorkerIds.put(port, workerId);
@@ -994,18 +931,14 @@ class SyncProcessEvent extends ShutdownWork {
                 String clusterMode = StormConfig.cluster_mode(conf);
 
                 if (clusterMode.equals("distributed")) {
-                    launchWorker(conf, sharedContext,
-                            assignment.getTopologyId(), supervisorId, port,
-                            workerId, assignment);
+                    launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, assignment);
                 } else if (clusterMode.equals("local")) {
-                    launchWorker(conf, sharedContext,
-                            assignment.getTopologyId(), supervisorId, port,
-                            workerId, workerThreadPids);
+                    launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, workerThreadPids);
                 }
             } catch (Exception e) {
-                String errorMsg =
-                        "Failed to launchWorker workerId:" + workerId + ":"
-                                + port;
+                workerReportError.report(assignment.getTopologyId(), port,
+                        assignment.getTaskIds(), new String(JStormUtils.getErrorInfo(e)));
+                String errorMsg = "Failed to launchWorker workerId:" + workerId + ":" + port;
                 LOG.error(errorMsg, e);
                 throw e;
             }
@@ -1013,8 +946,7 @@ class SyncProcessEvent extends ShutdownWork {
         }
 
         /**
-         * FIXME, workerIds should be Set, not Collection, but here simplify the
-         * logic
+         * FIXME, workerIds should be Set, not Collection, but here simplify the logic
          */
         markAllNewWorkers(newWorkerIds);
         // try {
@@ -1027,6 +959,9 @@ class SyncProcessEvent extends ShutdownWork {
     }
 
     boolean isWorkerDead(String workerId) {
+        if (ConfigExtension.isCheckWorkerAliveBySystemInfo(conf) == false) {
+            return false;
+        }
 
         try {
             List<String> pids = getPid(conf, workerId);
@@ -1046,9 +981,7 @@ class SyncProcessEvent extends ShutdownWork {
 
             return true;
         } catch (IOException e) {
-            LOG.info(
-                    "Failed to check whether worker is dead through /proc/pid",
-                    e);
+            LOG.info("Failed to check whether worker is dead through /proc/pid", e);
             return false;
         }