You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:50 UTC

[12/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java
index 32aa0f1..9ca1d8a 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java
@@ -17,33 +17,11 @@
  */
 package com.alibaba.jstorm.daemon.supervisor;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.io.FileExistsException;
-import org.apache.commons.io.FileUtils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
 import backtype.storm.utils.LocalState;
-
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Cluster;
 import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.StormBase;
 import com.alibaba.jstorm.cluster.StormClusterState;
 import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.daemon.worker.LocalAssignment;
@@ -55,16 +33,26 @@ import com.alibaba.jstorm.utils.JStormServerUtils;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.PathUtils;
 import com.alibaba.jstorm.utils.TimeUtils;
+import org.apache.commons.io.FileExistsException;
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.*;
+import java.util.Map.Entry;
 
 /**
- * supervisor SynchronizeSupervisor workflow (1) writer local assignment to
- * LocalState (2) download new Assignment's topology (3) remove useless Topology
- * (4) push one SyncProcessEvent to SyncProcessEvent's EventManager
+ * supervisor SynchronizeSupervisor workflow (1) writer local assignment to LocalState (2) download new Assignment's topology (3) remove useless Topology (4)
+ * push one SyncProcessEvent to SyncProcessEvent's EventManager
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
  */
 class SyncSupervisorEvent extends RunnableCallback {
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(SyncSupervisorEvent.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class);
 
     // private Supervisor supervisor;
 
@@ -95,10 +83,8 @@ class SyncSupervisorEvent extends RunnableCallback {
      * @param localState
      * @param syncProcesses
      */
-    public SyncSupervisorEvent(String supervisorId, Map conf,
-            EventManager processEventManager, EventManager syncSupEventManager,
-            StormClusterState stormClusterState, LocalState localState,
-            SyncProcessEvent syncProcesses, Heartbeat heartbeat) {
+    public SyncSupervisorEvent(String supervisorId, Map conf, EventManager processEventManager, EventManager syncSupEventManager,
+            StormClusterState stormClusterState, LocalState localState, SyncProcessEvent syncProcesses, Heartbeat heartbeat) {
 
         this.syncProcesses = syncProcesses;
         this.processEventManager = processEventManager;
@@ -112,38 +98,30 @@ class SyncSupervisorEvent extends RunnableCallback {
 
     @Override
     public void run() {
-        LOG.debug("Synchronizing supervisor, interval seconds:"
-                + TimeUtils.time_delta(lastTime));
+        LOG.debug("Synchronizing supervisor, interval seconds:" + TimeUtils.time_delta(lastTime));
         lastTime = TimeUtils.current_time_secs();
 
         try {
 
-            RunnableCallback syncCallback =
-                    new EventManagerZkPusher(this, syncSupEventManager);
+            RunnableCallback syncCallback = new EventManagerZkPusher(this, syncSupEventManager);
 
             /**
-             * Step 1: get all assignments and register /ZK-dir/assignment and
-             * every assignment watch
+             * Step 1: get all assignments and register /ZK-dir/assignment and every assignment watch
              * 
              */
-            Map<String, Assignment> assignments =
-                    Cluster.get_all_assignment(stormClusterState, syncCallback);
+            Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, syncCallback);
             LOG.debug("Get all assignments " + assignments);
 
             /**
-             * Step 2: get topologyIds list from
-             * STORM-LOCAL-DIR/supervisor/stormdist/
+             * Step 2: get topologyIds list from STORM-LOCAL-DIR/supervisor/stormdist/
              */
-            List<String> downloadedTopologyIds =
-                    StormConfig.get_supervisor_toplogy_list(conf);
+            List<String> downloadedTopologyIds = StormConfig.get_supervisor_toplogy_list(conf);
             LOG.debug("Downloaded storm ids: " + downloadedTopologyIds);
 
             /**
-             * Step 3: get <port,LocalAssignments> from ZK local node's
-             * assignment
+             * Step 3: get <port,LocalAssignments> from ZK local node's assignment
              */
-            Map<Integer, LocalAssignment> zkAssignment =
-                    getLocalAssign(stormClusterState, supervisorId, assignments);
+            Map<Integer, LocalAssignment> zkAssignment = getLocalAssign(stormClusterState, supervisorId, assignments);
             Map<Integer, LocalAssignment> localAssignment;
             Set<String> updateTopologys;
 
@@ -152,35 +130,31 @@ class SyncSupervisorEvent extends RunnableCallback {
              */
             try {
                 LOG.debug("Writing local assignment " + zkAssignment);
-                localAssignment =
-                        (Map<Integer, LocalAssignment>) localState
-                                .get(Common.LS_LOCAL_ASSIGNMENTS);
+                localAssignment = (Map<Integer, LocalAssignment>) localState.get(Common.LS_LOCAL_ASSIGNMENTS);
                 if (localAssignment == null) {
                     localAssignment = new HashMap<Integer, LocalAssignment>();
                 }
                 localState.put(Common.LS_LOCAL_ASSIGNMENTS, zkAssignment);
 
-                updateTopologys =
-                        getUpdateTopologys(localAssignment, zkAssignment);
-                Set<String> reDownloadTopologys =
-                        getNeedReDownloadTopologys(localAssignment);
+                updateTopologys = getUpdateTopologys(localAssignment, zkAssignment, assignments);
+                Set<String> reDownloadTopologys = getNeedReDownloadTopologys(localAssignment);
                 if (reDownloadTopologys != null) {
                     updateTopologys.addAll(reDownloadTopologys);
                 }
             } catch (IOException e) {
-                LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment
-                        + " of localState failed");
+                LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment + " of localState failed");
                 throw e;
             }
 
             /**
              * Step 5: download code from ZK
              */
-            Map<String, String> topologyCodes =
-                    getTopologyCodeLocations(assignments, supervisorId);
+            Map<String, String> topologyCodes = getTopologyCodeLocations(assignments, supervisorId);
+
+            //  downloadFailedTopologyIds which can't finished download binary from nimbus
+            Set<String> downloadFailedTopologyIds = new HashSet<String>();
 
-            downloadTopology(topologyCodes, downloadedTopologyIds,
-                    updateTopologys, assignments);
+            downloadTopology(topologyCodes, downloadedTopologyIds, updateTopologys, assignments, downloadFailedTopologyIds);
 
             /**
              * Step 6: remove any downloaded useless topology
@@ -191,7 +165,7 @@ class SyncSupervisorEvent extends RunnableCallback {
              * Step 7: push syncProcesses Event
              */
             // processEventManager.add(syncProcesses);
-            syncProcesses.run(zkAssignment);
+            syncProcesses.run(zkAssignment, downloadFailedTopologyIds);
 
             // If everything is OK, set the trigger to update heartbeat of
             // supervisor
@@ -209,11 +183,9 @@ class SyncSupervisorEvent extends RunnableCallback {
      * @param conf
      * @param topologyId
      * @param masterCodeDir
-     * @param clusterMode
      * @throws IOException
      */
-    private void downloadStormCode(Map conf, String topologyId,
-            String masterCodeDir) throws IOException, TException {
+    private void downloadStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
         String clusterMode = StormConfig.cluster_mode(conf);
 
         if (clusterMode.endsWith("distributed")) {
@@ -224,17 +196,14 @@ class SyncSupervisorEvent extends RunnableCallback {
         }
     }
 
-    private void downloadLocalStormCode(Map conf, String topologyId,
-            String masterCodeDir) throws IOException, TException {
+    private void downloadLocalStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
 
         // STORM-LOCAL-DIR/supervisor/stormdist/storm-id
-        String stormroot =
-                StormConfig.supervisor_stormdist_root(conf, topologyId);
+        String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
 
         FileUtils.copyDirectory(new File(masterCodeDir), new File(stormroot));
 
-        ClassLoader classloader =
-                Thread.currentThread().getContextClassLoader();
+        ClassLoader classloader = Thread.currentThread().getContextClassLoader();
 
         String resourcesJar = resourcesJar();
 
@@ -244,20 +213,16 @@ class SyncSupervisorEvent extends RunnableCallback {
 
         if (resourcesJar != null) {
 
-            LOG.info("Extracting resources from jar at " + resourcesJar
-                    + " to " + targetDir);
+            LOG.info("Extracting resources from jar at " + resourcesJar + " to " + targetDir);
 
-            JStormUtils.extract_dir_from_jar(resourcesJar,
-                    StormConfig.RESOURCES_SUBDIR, stormroot);// extract dir
+            JStormUtils.extract_dir_from_jar(resourcesJar, StormConfig.RESOURCES_SUBDIR, stormroot);// extract dir
             // from jar;;
             // util.clj
         } else if (url != null) {
 
-            LOG.info("Copying resources at " + url.toString() + " to "
-                    + targetDir);
+            LOG.info("Copying resources at " + url.toString() + " to " + targetDir);
 
-            FileUtils.copyDirectory(new File(url.getFile()), (new File(
-                    targetDir)));
+            FileUtils.copyDirectory(new File(url.getFile()), (new File(targetDir)));
 
         }
     }
@@ -271,27 +236,21 @@ class SyncSupervisorEvent extends RunnableCallback {
      * @throws IOException
      * @throws TException
      */
-    private void downloadDistributeStormCode(Map conf, String topologyId,
-            String masterCodeDir) throws IOException, TException {
+    private void downloadDistributeStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
 
         // STORM_LOCAL_DIR/supervisor/tmp/(UUID)
-        String tmproot =
-                StormConfig.supervisorTmpDir(conf) + File.separator
-                        + UUID.randomUUID().toString();
+        String tmproot = StormConfig.supervisorTmpDir(conf) + File.separator + UUID.randomUUID().toString();
 
         // STORM_LOCAL_DIR/supervisor/stormdist/topologyId
-        String stormroot =
-                StormConfig.supervisor_stormdist_root(conf, topologyId);
+        String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
 
-        JStormServerUtils.downloadCodeFromMaster(conf, tmproot, masterCodeDir,
-                topologyId, true);
+        JStormServerUtils.downloadCodeFromMaster(conf, tmproot, masterCodeDir, topologyId, true);
 
         // tmproot/stormjar.jar
         String localFileJarTmp = StormConfig.stormjar_path(tmproot);
 
         // extract dir from jar
-        JStormUtils.extract_dir_from_jar(localFileJarTmp,
-                StormConfig.RESOURCES_SUBDIR, tmproot);
+        JStormUtils.extract_dir_from_jar(localFileJarTmp, StormConfig.RESOURCES_SUBDIR, tmproot);
 
         File srcDir = new File(tmproot);
         File destDir = new File(stormroot);
@@ -325,8 +284,7 @@ class SyncSupervisorEvent extends RunnableCallback {
         List<String> rtn = new ArrayList<String>();
         int size = jarPaths.size();
         for (int i = 0; i < size; i++) {
-            if (JStormUtils.zipContainsDir(jarPaths.get(i),
-                    StormConfig.RESOURCES_SUBDIR)) {
+            if (JStormUtils.zipContainsDir(jarPaths.get(i), StormConfig.RESOURCES_SUBDIR)) {
                 rtn.add(jarPaths.get(i));
             }
         }
@@ -342,24 +300,19 @@ class SyncSupervisorEvent extends RunnableCallback {
      * 
      * @param stormClusterState
      * @param supervisorId
-     * @param callback
      * @throws Exception
      * @returns map: {port,LocalAssignment}
      */
-    private Map<Integer, LocalAssignment> getLocalAssign(
-            StormClusterState stormClusterState, String supervisorId,
-            Map<String, Assignment> assignments) throws Exception {
+    private Map<Integer, LocalAssignment> getLocalAssign(StormClusterState stormClusterState, String supervisorId, Map<String, Assignment> assignments)
+            throws Exception {
 
-        Map<Integer, LocalAssignment> portLA =
-                new HashMap<Integer, LocalAssignment>();
+        Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>();
 
         for (Entry<String, Assignment> assignEntry : assignments.entrySet()) {
             String topologyId = assignEntry.getKey();
             Assignment assignment = assignEntry.getValue();
 
-            Map<Integer, LocalAssignment> portTasks =
-                    readMyTasks(stormClusterState, topologyId, supervisorId,
-                            assignment);
+            Map<Integer, LocalAssignment> portTasks = readMyTasks(stormClusterState, topologyId, supervisorId, assignment);
             if (portTasks == null) {
                 continue;
             }
@@ -374,8 +327,7 @@ class SyncSupervisorEvent extends RunnableCallback {
                 if (!portLA.containsKey(port)) {
                     portLA.put(port, la);
                 } else {
-                    throw new RuntimeException(
-                            "Should not have multiple topologys assigned to one port");
+                    throw new RuntimeException("Should not have multiple topologys assigned to one port");
                 }
             }
         }
@@ -389,30 +341,27 @@ class SyncSupervisorEvent extends RunnableCallback {
      * @param stormClusterState
      * @param topologyId
      * @param supervisorId
-     * @param callback
      * @return Map: {port, LocalAssignment}
      * @throws Exception
      */
-    private Map<Integer, LocalAssignment> readMyTasks(
-            StormClusterState stormClusterState, String topologyId,
-            String supervisorId, Assignment assignmenInfo) throws Exception {
+    private Map<Integer, LocalAssignment> readMyTasks(StormClusterState stormClusterState, String topologyId, String supervisorId, Assignment assignmentInfo)
+            throws Exception {
 
-        Map<Integer, LocalAssignment> portTasks =
-                new HashMap<Integer, LocalAssignment>();
+        Map<Integer, LocalAssignment> portTasks = new HashMap<Integer, LocalAssignment>();
 
-        Set<ResourceWorkerSlot> workers = assignmenInfo.getWorkers();
+        Set<ResourceWorkerSlot> workers = assignmentInfo.getWorkers();
         if (workers == null) {
-            LOG.error("No worker of assignement's " + assignmenInfo);
+            LOG.error("No worker of assignment's " + assignmentInfo);
             return portTasks;
         }
 
         for (ResourceWorkerSlot worker : workers) {
             if (!supervisorId.equals(worker.getNodeId()))
                 continue;
-            portTasks.put(worker.getPort(), new LocalAssignment(topologyId,
-                    worker.getTasks(), Common.topologyIdToName(topologyId),
-                    worker.getMemSize(), worker.getCpu(), worker.getJvm(),
-                    assignmenInfo.getTimeStamp()));
+            portTasks.put(
+                    worker.getPort(),
+                    new LocalAssignment(topologyId, worker.getTasks(), Common.topologyIdToName(topologyId), worker.getMemSize(), worker.getCpu(), worker
+                            .getJvm(), assignmentInfo.getTimeStamp()));
         }
 
         return portTasks;
@@ -421,14 +370,10 @@ class SyncSupervisorEvent extends RunnableCallback {
     /**
      * get mastercodedir for every topology
      * 
-     * @param stormClusterState
-     * @param callback
      * @throws Exception
      * @returns Map: <topologyId, master-code-dir> from zookeeper
      */
-    public static Map<String, String> getTopologyCodeLocations(
-            Map<String, Assignment> assignments, String supervisorId)
-            throws Exception {
+    public static Map<String, String> getTopologyCodeLocations(Map<String, Assignment> assignments, String supervisorId) throws Exception {
 
         Map<String, String> rtn = new HashMap<String, String>();
         for (Entry<String, Assignment> entry : assignments.entrySet()) {
@@ -448,9 +393,8 @@ class SyncSupervisorEvent extends RunnableCallback {
         return rtn;
     }
 
-    public void downloadTopology(Map<String, String> topologyCodes,
-            List<String> downloadedTopologyIds, Set<String> updateTopologys,
-            Map<String, Assignment> assignments) throws Exception {
+    public void downloadTopology(Map<String, String> topologyCodes, List<String> downloadedTopologyIds, Set<String> updateTopologys,
+                                 Map<String, Assignment> assignments, Set<String> downloadFailedTopologyIds) throws Exception {
 
         Set<String> downloadTopologys = new HashSet<String>();
 
@@ -459,38 +403,53 @@ class SyncSupervisorEvent extends RunnableCallback {
             String topologyId = entry.getKey();
             String masterCodeDir = entry.getValue();
 
-            if (!downloadedTopologyIds.contains(topologyId)
-                    || updateTopologys.contains(topologyId)) {
+            if (!downloadedTopologyIds.contains(topologyId) || updateTopologys.contains(topologyId)) {
 
-                LOG.info("Downloading code for storm id " + topologyId
-                        + " from " + masterCodeDir);
+                LOG.info("Downloading code for storm id " + topologyId + " from " + masterCodeDir);
 
-                try {
-                    downloadStormCode(conf, topologyId, masterCodeDir);
-                    // Update assignment timeStamp
-                    StormConfig.write_supervisor_topology_timestamp(conf,
-                            topologyId, assignments.get(topologyId)
-                                    .getTimeStamp());
-                } catch (IOException e) {
-                    LOG.error(e + " downloadStormCode failed " + "topologyId:"
-                            + topologyId + "masterCodeDir:" + masterCodeDir);
+                int retry = 0;
+                while (retry < 3) {
+                    try {
+                        downloadStormCode(conf, topologyId, masterCodeDir);
+                        // Update assignment timeStamp
+                        StormConfig.write_supervisor_topology_timestamp(conf, topologyId, assignments.get(topologyId).getTimeStamp());
+                        break;
+                    } catch (IOException e) {
+                        LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + masterCodeDir);
 
-                } catch (TException e) {
-                    LOG.error(e + " downloadStormCode failed " + "topologyId:"
-                            + topologyId + "masterCodeDir:" + masterCodeDir);
+                    } catch (TException e) {
+                        LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + masterCodeDir);
+                    }
+                    retry++;
+                }
+                if (retry < 3) {
+                    LOG.info("Finished downloading code for storm id " + topologyId + " from " + masterCodeDir);
+                    downloadTopologys.add(topologyId);
+                } else {
+                    LOG.error("Cann't  download code for storm id " + topologyId + " from " + masterCodeDir);
+                    downloadFailedTopologyIds.add(topologyId);
                 }
-                LOG.info("Finished downloading code for storm id " + topologyId
-                        + " from " + masterCodeDir);
 
-                downloadTopologys.add(topologyId);
+            }
+        }
+        // clear directory of topologyId is dangerous , so it only clear the topologyId which
+        // isn't contained by downloadedTopologyIds
+        for (String topologyId : downloadFailedTopologyIds) {
+            if (!downloadedTopologyIds.contains(topologyId)) {
+                try {
+                    String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
+                    File destDir = new File(stormroot);
+                    FileUtils.deleteQuietly(destDir);
+                } catch (Exception e) {
+                    LOG.error("Cann't  clear directory about storm id " + topologyId + " on supervisor ");
+                }
             }
         }
 
         updateTaskCleanupTimeout(downloadTopologys);
     }
 
-    public void removeUselessTopology(Map<String, String> topologyCodes,
-            List<String> downloadedTopologyIds) {
+    public void removeUselessTopology(Map<String, String> topologyCodes, List<String> downloadedTopologyIds) {
         for (String topologyId : downloadedTopologyIds) {
 
             if (!topologyCodes.containsKey(topologyId)) {
@@ -499,9 +458,7 @@ class SyncSupervisorEvent extends RunnableCallback {
 
                 String path = null;
                 try {
-                    path =
-                            StormConfig.supervisor_stormdist_root(conf,
-                                    topologyId);
+                    path = StormConfig.supervisor_stormdist_root(conf, topologyId);
                     PathUtils.rmr(path);
                 } catch (IOException e) {
                     String errMsg = "rmr the path:" + path + "failed\n";
@@ -511,13 +468,11 @@ class SyncSupervisorEvent extends RunnableCallback {
         }
     }
 
-    private Set<String> getUpdateTopologys(
-            Map<Integer, LocalAssignment> localAssignments,
-            Map<Integer, LocalAssignment> zkAssignments) {
+    private Set<String> getUpdateTopologys(Map<Integer, LocalAssignment> localAssignments, Map<Integer, LocalAssignment> zkAssignments,
+            Map<String, Assignment> assignments) {
         Set<String> ret = new HashSet<String>();
         if (localAssignments != null && zkAssignments != null) {
-            for (Entry<Integer, LocalAssignment> entry : localAssignments
-                    .entrySet()) {
+            for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
                 Integer port = entry.getKey();
                 LocalAssignment localAssignment = entry.getValue();
 
@@ -526,14 +481,11 @@ class SyncSupervisorEvent extends RunnableCallback {
                 if (localAssignment == null || zkAssignment == null)
                     continue;
 
-                if (localAssignment.getTopologyId().equals(
-                        zkAssignment.getTopologyId())
-                        && localAssignment.getTimeStamp() < zkAssignment
-                                .getTimeStamp())
+                Assignment assignment = assignments.get(localAssignment.getTopologyId());
+                if (localAssignment.getTopologyId().equals(zkAssignment.getTopologyId()) && assignment != null
+                        && assignment.isTopologyChange(localAssignment.getTimeStamp()))
                     if (ret.add(localAssignment.getTopologyId())) {
-                        LOG.info("Topology-" + localAssignment.getTopologyId()
-                                + " has been updated. LocalTs="
-                                + localAssignment.getTimeStamp() + ", ZkTs="
+                        LOG.info("Topology-" + localAssignment.getTopologyId() + " has been updated. LocalTs=" + localAssignment.getTimeStamp() + ", ZkTs="
                                 + zkAssignment.getTimeStamp());
                     }
             }
@@ -542,49 +494,37 @@ class SyncSupervisorEvent extends RunnableCallback {
         return ret;
     }
 
-    private Set<String> getNeedReDownloadTopologys(
-            Map<Integer, LocalAssignment> localAssignment) {
-        Set<String> reDownloadTopologys =
-                syncProcesses.getTopologyIdNeedDownload().getAndSet(null);
+    private Set<String> getNeedReDownloadTopologys(Map<Integer, LocalAssignment> localAssignment) {
+        Set<String> reDownloadTopologys = syncProcesses.getTopologyIdNeedDownload().getAndSet(null);
         if (reDownloadTopologys == null || reDownloadTopologys.size() == 0)
             return null;
         Set<String> needRemoveTopologys = new HashSet<String>();
-        Map<Integer, String> portToStartWorkerId =
-                syncProcesses.getPortToWorkerId();
-        for (Entry<Integer, LocalAssignment> entry : localAssignment
-                .entrySet()) {
+        Map<Integer, String> portToStartWorkerId = syncProcesses.getPortToWorkerId();
+        for (Entry<Integer, LocalAssignment> entry : localAssignment.entrySet()) {
             if (portToStartWorkerId.containsKey(entry.getKey()))
                 needRemoveTopologys.add(entry.getValue().getTopologyId());
         }
-        LOG.debug(
-                "worker is starting on these topology, so delay download topology binary: "
-                        + needRemoveTopologys);
+        LOG.debug("worker is starting on these topology, so delay download topology binary: " + needRemoveTopologys);
         reDownloadTopologys.removeAll(needRemoveTopologys);
         if (reDownloadTopologys.size() > 0)
-            LOG.info("Following topologys is going to re-download the jars, "
-                    + reDownloadTopologys);
+            LOG.info("Following topologys is going to re-download the jars, " + reDownloadTopologys);
         return reDownloadTopologys;
     }
 
     private void updateTaskCleanupTimeout(Set<String> topologys) {
         Map topologyConf = null;
-        Map<String, Integer> taskCleanupTimeouts =
-                new HashMap<String, Integer>();
+        Map<String, Integer> taskCleanupTimeouts = new HashMap<String, Integer>();
 
         for (String topologyId : topologys) {
             try {
-                topologyConf =
-                        StormConfig.read_supervisor_topology_conf(conf,
-                                topologyId);
+                topologyConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
             } catch (IOException e) {
                 LOG.info("Failed to read conf for " + topologyId);
             }
 
             Integer cleanupTimeout = null;
             if (topologyConf != null) {
-                cleanupTimeout =
-                        JStormUtils.parseInt(topologyConf
-                                .get(ConfigExtension.TASK_CLEANUP_TIMEOUT_SEC));
+                cleanupTimeout = JStormUtils.parseInt(topologyConf.get(ConfigExtension.TASK_CLEANUP_TIMEOUT_SEC));
             }
 
             if (cleanupTimeout == null) {
@@ -596,9 +536,7 @@ class SyncSupervisorEvent extends RunnableCallback {
 
         Map<String, Integer> localTaskCleanupTimeouts = null;
         try {
-            localTaskCleanupTimeouts =
-                    (Map<String, Integer>) localState
-                            .get(Common.LS_TASK_CLEANUP_TIMEOUT);
+            localTaskCleanupTimeouts = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
         } catch (IOException e) {
             LOG.error("Failed to read local task cleanup timeout map", e);
         }
@@ -609,8 +547,7 @@ class SyncSupervisorEvent extends RunnableCallback {
             localTaskCleanupTimeouts.putAll(taskCleanupTimeouts);
 
         try {
-            localState.put(Common.LS_TASK_CLEANUP_TIMEOUT,
-                    localTaskCleanupTimeouts);
+            localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, localTaskCleanupTimeouts);
         } catch (IOException e) {
             LOG.error("Failed to write local task cleanup timeout map", e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java
index 81e4374..394c134 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java
@@ -39,8 +39,7 @@ import com.alibaba.jstorm.utils.Pair;
  * 
  */
 public class BatchDrainerRunable extends DisruptorRunable {
-    private final static Logger LOG = LoggerFactory
-            .getLogger(BatchDrainerRunable.class);
+    private final static Logger LOG = LoggerFactory.getLogger(BatchDrainerRunable.class);
 
     public BatchDrainerRunable(WorkerData workerData) {
         super(workerData.getSendingQueue(), MetricDef.BATCH_DRAINER_THREAD);
@@ -50,8 +49,7 @@ public class BatchDrainerRunable extends DisruptorRunable {
     @Override
     public void handleEvent(Object event, boolean endOfBatch) throws Exception {
 
-        Pair<IConnection, List<TaskMessage>> pair =
-                (Pair<IConnection, List<TaskMessage>>) event;
+        Pair<IConnection, List<TaskMessage>> pair = (Pair<IConnection, List<TaskMessage>>) event;
 
         pair.getFirst().send(pair.getSecond());
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java
index a260323..47e73b8 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java
@@ -17,25 +17,23 @@
  */
 package com.alibaba.jstorm.daemon.worker;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.generated.StormTopology;
 import backtype.storm.generated.StreamInfo;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.ThriftTopologyUtils;
-
 import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * ContextMaker This class is used to create TopologyContext
@@ -56,8 +54,7 @@ public class ContextMaker {
     @SuppressWarnings("rawtypes")
     public ContextMaker(WorkerData workerData) {
         /*
-         * Map stormConf, String topologyId, String workerId, HashMap<Integer,
-         * String> tasksToComponent, Integer port, List<Integer> workerTasks
+         * Map stormConf, String topologyId, String workerId, HashMap<Integer, String> tasksToComponent, Integer port, List<Integer> workerTasks
          */
         this.workerData = workerData;
         this.workerTasks = JStormUtils.mk_list(workerData.getTaskids());
@@ -67,12 +64,9 @@ public class ContextMaker {
             String topologyId = workerData.getTopologyId();
             String workerId = workerData.getWorkerId();
 
-            String distroot =
-                    StormConfig
-                            .supervisor_stormdist_root(stormConf, topologyId);
+            String distroot = StormConfig.supervisor_stormdist_root(stormConf, topologyId);
 
-            resourcePath =
-                    StormConfig.supervisor_storm_resources_path(distroot);
+            resourcePath = StormConfig.supervisor_storm_resources_path(distroot);
 
             pidDir = StormConfig.worker_pids_root(stormConf, workerId);
 
@@ -85,43 +79,32 @@ public class ContextMaker {
         }
     }
 
-    public TopologyContext makeTopologyContext(StormTopology topology,
-            Integer taskId, clojure.lang.Atom openOrPrepareWasCalled) {
+    public TopologyContext makeTopologyContext(StormTopology topology, Integer taskId, clojure.lang.Atom openOrPrepareWasCalled) {
 
         Map stormConf = workerData.getStormConf();
         String topologyId = workerData.getTopologyId();
 
-        HashMap<String, Map<String, Fields>> componentToStreamToFields =
-                new HashMap<String, Map<String, Fields>>();
+        HashMap<String, Map<String, Fields>> componentToStreamToFields = new HashMap<String, Map<String, Fields>>();
 
         Set<String> components = ThriftTopologyUtils.getComponentIds(topology);
         for (String component : components) {
 
-            Map<String, Fields> streamToFieldsMap =
-                    new HashMap<String, Fields>();
+            Map<String, Fields> streamToFieldsMap = new HashMap<String, Fields>();
 
-            Map<String, StreamInfo> streamInfoMap =
-                    ThriftTopologyUtils.getComponentCommon(topology, component)
-                            .get_streams();
+            Map<String, StreamInfo> streamInfoMap = ThriftTopologyUtils.getComponentCommon(topology, component).get_streams();
             for (Entry<String, StreamInfo> entry : streamInfoMap.entrySet()) {
                 String streamId = entry.getKey();
                 StreamInfo streamInfo = entry.getValue();
 
-                streamToFieldsMap.put(streamId,
-                        new Fields(streamInfo.get_output_fields()));
+                streamToFieldsMap.put(streamId, new Fields(streamInfo.get_output_fields()));
             }
 
             componentToStreamToFields.put(component, streamToFieldsMap);
         }
 
-        return new TopologyContext(topology, stormConf,
-                workerData.getTasksToComponent(),
-                workerData.getComponentToSortedTasks(),
-                componentToStreamToFields, topologyId, resourcePath, pidDir,
-                taskId, workerData.getPort(), workerTasks,
-                workerData.getDefaultResources(),
-                workerData.getUserResources(), workerData.getExecutorData(),
-                workerData.getRegisteredMetrics(), openOrPrepareWasCalled);
+        return new TopologyContext(topology, stormConf, workerData.getTasksToComponent(), workerData.getComponentToSortedTasks(), componentToStreamToFields,
+                topologyId, resourcePath, pidDir, taskId, workerData.getPort(), workerTasks, workerData.getDefaultResources(), workerData.getUserResources(),
+                workerData.getExecutorData(), workerData.getRegisteredMetrics(), openOrPrepareWasCalled, workerData.getZkCluster());
 
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java
index 3477cc4..c19947a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java
@@ -46,8 +46,7 @@ import com.alibaba.jstorm.utils.Pair;
  * 
  */
 public class DrainerRunable extends DisruptorRunable {
-    private final static Logger LOG = LoggerFactory
-            .getLogger(DrainerRunable.class);
+    private final static Logger LOG = LoggerFactory.getLogger(DrainerRunable.class);
 
     private DisruptorQueue transferQueue;
     private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
@@ -92,8 +91,7 @@ public class DrainerRunable extends DisruptorRunable {
 
         if (conn.isClosed() == true) {
             // if connection has been closed, just skip the package
-            LOG.debug("Skip one tuple of " + taskId
-                    + ", due to close connection of " + nodePort);
+            LOG.debug("Skip one tuple of " + taskId + ", due to close connection of " + nodePort);
             return;
         }
 
@@ -113,11 +111,8 @@ public class DrainerRunable extends DisruptorRunable {
     }
 
     public void handleFinish() {
-        for (Entry<IConnection, List<TaskMessage>> entry : dispatchMap
-                .entrySet()) {
-            Pair<IConnection, List<TaskMessage>> pair =
-                    new Pair<IConnection, List<TaskMessage>>(entry.getKey(),
-                            entry.getValue());
+        for (Entry<IConnection, List<TaskMessage>> entry : dispatchMap.entrySet()) {
+            Pair<IConnection, List<TaskMessage>> pair = new Pair<IConnection, List<TaskMessage>>(entry.getKey(), entry.getValue());
 
             sendingQueue.publish(pair);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java
index 312c57f..1221680 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java
@@ -38,8 +38,7 @@ public class LocalAssignment implements Serializable {
     private String jvm;
     private long timeStamp;
 
-    public LocalAssignment(String topologyId, Set<Integer> taskIds,
-            String topologyName, long mem, int cpu, String jvm, long timeStamp) {
+    public LocalAssignment(String topologyId, Set<Integer> taskIds, String topologyName, long mem, int cpu, String jvm, long timeStamp) {
         this.topologyId = topologyId;
         this.taskIds = new HashSet<Integer>(taskIds);
         this.topologyName = topologyName;
@@ -105,13 +104,8 @@ public class LocalAssignment implements Serializable {
         result = prime * result + ((jvm == null) ? 0 : jvm.hashCode());
         result = prime * result + (int) (mem ^ (mem >>> 32));
         result = prime * result + ((taskIds == null) ? 0 : taskIds.hashCode());
-        result =
-                prime * result
-                        + ((topologyId == null) ? 0 : topologyId.hashCode());
-        result =
-                prime
-                        * result
-                        + ((topologyName == null) ? 0 : topologyName.hashCode());
+        result = prime * result + ((topologyId == null) ? 0 : topologyId.hashCode());
+        result = prime * result + ((topologyName == null) ? 0 : topologyName.hashCode());
         result = prime * result + (int) (timeStamp & 0xffffffff);
         return result;
     }
@@ -156,7 +150,6 @@ public class LocalAssignment implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java
index 628e0f5..056b6f3 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java
@@ -33,8 +33,7 @@ public class ProcessSimulator {
      * skip old function name: pid-counter
      */
 
-    protected static ConcurrentHashMap<String, WorkerShutdown> processMap =
-            new ConcurrentHashMap<String, WorkerShutdown>();
+    protected static ConcurrentHashMap<String, WorkerShutdown> processMap = new ConcurrentHashMap<String, WorkerShutdown>();
 
     /**
      * Register process handler old function name: register-process

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java
index 3f8acfc..bde8232 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java
@@ -34,8 +34,7 @@ import com.alibaba.jstorm.task.TaskShutdownDameon;
 import com.alibaba.jstorm.utils.JStormUtils;
 
 /**
- * Timely check whether topology is active or not and whether the metrics
- * monitor is enable or disable from ZK
+ * Timely check whether topology is active or not and whether the metrics monitor is enable or disable from ZK
  * 
  * @author yannian/Longda
  * 
@@ -63,9 +62,7 @@ public class RefreshActive extends RunnableCallback {
         this.conf = workerData.getStormConf();
         this.zkCluster = workerData.getZkCluster();
         this.topologyId = workerData.getTopologyId();
-        this.frequence =
-                JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS),
-                        10);
+        this.frequence = JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 10);
     }
 
     @Override
@@ -91,8 +88,7 @@ public class RefreshActive extends RunnableCallback {
                 return;
             }
 
-            LOG.info("Old TopologyStatus:" + oldTopologyStatus
-                    + ", new TopologyStatus:" + newTopologyStatus);
+            LOG.info("Old TopologyStatus:" + oldTopologyStatus + ", new TopologyStatus:" + newTopologyStatus);
 
             List<TaskShutdownDameon> tasks = workerData.getShutdownTasks();
             if (tasks == null) {
@@ -120,8 +116,7 @@ public class RefreshActive extends RunnableCallback {
             boolean newMonitorEnable = base.isEnableMonitor();
             boolean oldMonitorEnable = monitorEnable.get();
             if (newMonitorEnable != oldMonitorEnable) {
-                LOG.info("Change MonitorEnable from " + oldMonitorEnable
-                        + " to " + newMonitorEnable);
+                LOG.info("Change MonitorEnable from " + oldMonitorEnable + " to " + newMonitorEnable);
                 monitorEnable.set(newMonitorEnable);
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java
index 48cc945..130985b 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java
@@ -17,23 +17,10 @@
  */
 package com.alibaba.jstorm.daemon.worker;
 
-import java.io.FileNotFoundException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.IContext;
 import backtype.storm.scheduler.WorkerSlot;
-
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.cluster.StormClusterState;
 import com.alibaba.jstorm.cluster.StormConfig;
@@ -42,9 +29,15 @@ import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
 import com.alibaba.jstorm.task.Task;
 import com.alibaba.jstorm.task.TaskShutdownDameon;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
 import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.TimeUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * 
@@ -56,8 +49,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
  * 
  */
 public class RefreshConnections extends RunnableCallback {
-    private static Logger LOG = LoggerFactory
-            .getLogger(RefreshConnections.class);
+    private static Logger LOG = LoggerFactory.getLogger(RefreshConnections.class);
 
     private WorkerData workerData;
 
@@ -102,13 +94,9 @@ public class RefreshConnections extends RunnableCallback {
         this.supervisorId = workerData.getSupervisorId();
 
         // this.endpoint_socket_lock = endpoint_socket_lock;
-        frequence =
-                JStormUtils
-                        .parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 5);
+        frequence = JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 5);
 
-        taskTimeoutSecs =
-                JStormUtils.parseInt(
-                        conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10);
+        taskTimeoutSecs = JStormUtils.parseInt(conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10);
         taskTimeoutSecs = taskTimeoutSecs * 3;
     }
 
@@ -122,8 +110,7 @@ public class RefreshConnections extends RunnableCallback {
             //
 
             synchronized (this) {
-                Assignment assignment =
-                        zkCluster.assignment_info(topologyId, this);
+                Assignment assignment = zkCluster.assignment_info(topologyId, this);
                 if (assignment == null) {
                     String errMsg = "Failed to get Assignment of " + topologyId;
                     LOG.error(errMsg);
@@ -137,47 +124,39 @@ public class RefreshConnections extends RunnableCallback {
                 // updated. If so, the outbound
                 // task map should be updated accordingly.
                 try {
-                    Long localAssignmentTS =
-                            StormConfig.read_supervisor_topology_timestamp(
-                                    conf, topologyId);
-                    if (localAssignmentTS.longValue() > workerData
-                            .getAssignmentTs().longValue()) {
+                    Long localAssignmentTS = StormConfig.read_supervisor_topology_timestamp(conf, topologyId);
+                    if (localAssignmentTS.longValue() > workerData.getAssignmentTs().longValue()) {
                         try {
-                            if (assignment.getAssignmentType() == AssignmentType.Config) {
+                            if (assignment.getAssignmentType() == AssignmentType.UpdateTopology) {
                                 LOG.info("Get config reload request for " + topologyId);
                                 // If config was updated, notify all tasks
                                 List<TaskShutdownDameon> taskShutdowns = workerData.getShutdownTasks();
                                 Map newConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
                                 workerData.getStormConf().putAll(newConf);
                                 for (TaskShutdownDameon taskSD : taskShutdowns) {
-                                    taskSD.updateConf(newConf);
+                                    taskSD.update(newConf);
                                 }
-                                workerData.setAssignmentType(AssignmentType.Config);
+                                workerData.setAssignmentType(AssignmentType.UpdateTopology);
                             } else {
                                 Set<Integer> addedTasks = getAddedTasks(assignment);
-                                Set<Integer> removedTasks =
-                                        getRemovedTasks(assignment);
-                                
+                                Set<Integer> removedTasks = getRemovedTasks(assignment);
+                                Set<Integer> updatedTasks = getUpdatedTasks(assignment);
+
                                 workerData.updateWorkerData(assignment);
-                                
-                                if (removedTasks.size() > 0)
-                                    shutdownTasks(removedTasks);
-                                if (addedTasks.size() > 0)
-                                    createTasks(addedTasks);
-                                
-                                Set<Integer> tmpOutboundTasks =
-                                        Worker.worker_output_tasks(workerData);
+
+                                shutdownTasks(removedTasks);
+                                createTasks(addedTasks);
+                                updateTasks(updatedTasks);
+
+                                Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(workerData);
                                 if (outboundTasks.equals(tmpOutboundTasks) == false) {
                                     for (int taskId : tmpOutboundTasks) {
                                         if (outboundTasks.contains(taskId) == false)
-                                            workerData
-                                                    .addOutboundTaskStatusIfAbsent(taskId);
+                                            workerData.addOutboundTaskStatusIfAbsent(taskId);
                                     }
-                                    for (int taskId : workerData
-                                            .getOutboundTaskStatus().keySet()) {
+                                    for (int taskId : workerData.getOutboundTaskStatus().keySet()) {
                                         if (tmpOutboundTasks.contains(taskId) == false) {
-                                            workerData
-                                                    .removeOutboundTaskStatus(taskId);
+                                            workerData.removeOutboundTaskStatus(taskId);
                                         }
                                     }
                                     workerData.setOutboundTasks(tmpOutboundTasks);
@@ -196,23 +175,19 @@ public class RefreshConnections extends RunnableCallback {
                     }
 
                 } catch (FileNotFoundException e) {
-                    LOG.warn(
-                            "Failed to read supervisor topology timeStamp for "
-                                    + topologyId + " port="
-                                    + workerData.getPort(), e);
+                    LOG.warn("Failed to read supervisor topology timeStamp for " + topologyId + " port=" + workerData.getPort(), e);
                 }
 
                 Set<ResourceWorkerSlot> workers = assignment.getWorkers();
                 if (workers == null) {
-                    String errMsg =
-                            "Failed to get taskToResource of " + topologyId;
+                    String errMsg = "Failed to get taskToResource of " + topologyId;
                     LOG.error(errMsg);
                     return;
                 }
-                workerData.getWorkerToResource().addAll(workers);
 
-                Map<Integer, WorkerSlot> my_assignment =
-                        new HashMap<Integer, WorkerSlot>();
+                workerData.updateWorkerToResource(workers);
+
+                Map<Integer, WorkerSlot> my_assignment = new HashMap<Integer, WorkerSlot>();
 
                 Map<String, String> node = assignment.getNodeHost();
 
@@ -220,11 +195,13 @@ public class RefreshConnections extends RunnableCallback {
                 Set<WorkerSlot> need_connections = new HashSet<WorkerSlot>();
 
                 Set<Integer> localTasks = new HashSet<Integer>();
+                Set<Integer> localNodeTasks = new HashSet<Integer>();
 
                 if (workers != null && outboundTasks != null) {
                     for (ResourceWorkerSlot worker : workers) {
-                        if (supervisorId.equals(worker.getNodeId())
-                                && worker.getPort() == workerData.getPort())
+                        if (supervisorId.equals(worker.getNodeId()))
+                            localNodeTasks.addAll(worker.getTasks());
+                        if (supervisorId.equals(worker.getNodeId()) && worker.getPort() == workerData.getPort())
                             localTasks.addAll(worker.getTasks());
                         for (Integer id : worker.getTasks()) {
                             if (outboundTasks.contains(id)) {
@@ -236,6 +213,7 @@ public class RefreshConnections extends RunnableCallback {
                 }
                 taskNodeport.putAll(my_assignment);
                 workerData.setLocalTasks(localTasks);
+                workerData.setLocalNodeTasks(localNodeTasks);
 
                 // get which connection need to be remove or add
                 Set<WorkerSlot> current_connections = nodeportSocket.keySet();
@@ -274,18 +252,9 @@ public class RefreshConnections extends RunnableCallback {
                     nodeportSocket.remove(node_port).close();
                 }
 
-                // Update the status of all outbound tasks
+                // check the status of connections to all outbound tasks
                 for (Integer taskId : outboundTasks) {
-                    boolean isActive = false;
-                    int currentTime = TimeUtils.current_time_secs();
-                    TaskHeartbeat tHB =
-                            zkCluster.task_heartbeat(topologyId, taskId);
-                    if (tHB != null) {
-                        int taskReportTime = tHB.getTimeSecs();
-                        if ((currentTime - taskReportTime) < taskTimeoutSecs)
-                            isActive = true;
-                    }
-                    workerData.updateOutboundTaskStatus(taskId, isActive);
+                    workerData.updateOutboundTaskStatus(taskId, isOutTaskConnected(taskId));
                 }
             }
         } catch (Exception e) {
@@ -307,16 +276,13 @@ public class RefreshConnections extends RunnableCallback {
     private Set<Integer> getAddedTasks(Assignment assignment) {
         Set<Integer> ret = new HashSet<Integer>();
         try {
-            Set<Integer> taskIds =
-                    assignment.getCurrentWorkerTasks(
-                            workerData.getSupervisorId(), workerData.getPort());
+            Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort());
             for (Integer taskId : taskIds) {
                 if (!(workerData.getTaskids().contains(taskId)))
                     ret.add(taskId);
             }
         } catch (Exception e) {
-            LOG.warn("Failed to get added task list for"
-                    + workerData.getTopologyId());
+            LOG.warn("Failed to get added task list for" + workerData.getTopologyId());
             ;
         }
         return ret;
@@ -325,22 +291,36 @@ public class RefreshConnections extends RunnableCallback {
     private Set<Integer> getRemovedTasks(Assignment assignment) {
         Set<Integer> ret = new HashSet<Integer>();
         try {
-            Set<Integer> taskIds =
-                    assignment.getCurrentWorkerTasks(
-                            workerData.getSupervisorId(), workerData.getPort());
+            Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort());
             for (Integer taskId : workerData.getTaskids()) {
                 if (!(taskIds.contains(taskId)))
                     ret.add(taskId);
             }
         } catch (Exception e) {
-            LOG.warn("Failed to get removed task list for"
-                    + workerData.getTopologyId());
+            LOG.warn("Failed to get removed task list for" + workerData.getTopologyId());
             ;
         }
         return ret;
     }
 
+    private Set<Integer> getUpdatedTasks(Assignment assignment) {
+        Set<Integer> ret = new HashSet<Integer>();
+        try {
+            Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort());
+            for (Integer taskId : taskIds) {
+                if ((workerData.getTaskids().contains(taskId)))
+                    ret.add(taskId);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to get updated task list for" + workerData.getTopologyId());
+        }
+        return ret;
+    }
+
     private void createTasks(Set<Integer> tasks) {
+        if (tasks == null)
+            return;
+
         for (Integer taskId : tasks) {
             try {
                 TaskShutdownDameon shutdown = Task.mk_task(workerData, taskId);
@@ -352,17 +332,50 @@ public class RefreshConnections extends RunnableCallback {
     }
 
     private void shutdownTasks(Set<Integer> tasks) {
-        for (Integer taskId : tasks) {
+        if (tasks == null)
+            return;
+
+        List<TaskShutdownDameon> shutdowns = workerData.getShutdownDaemonbyTaskIds(tasks);
+        for (TaskShutdownDameon shutdown : shutdowns) {
             try {
-                List<TaskShutdownDameon> shutdowns =
-                        workerData.getShutdownDaemonbyTaskIds(tasks);
-                for (TaskShutdownDameon shutdown : shutdowns) {
-                    shutdown.shutdown();
-                }
+                shutdown.shutdown();
             } catch (Exception e) {
-                LOG.error("Failed to shutdown task-" + taskId, e);
+                LOG.error("Failed to shutdown task-" + shutdown.getTaskId(), e);
             }
         }
     }
 
+    private void updateTasks(Set<Integer> tasks) {
+        if (tasks == null)
+            return;
+
+        List<TaskShutdownDameon> shutdowns = workerData.getShutdownDaemonbyTaskIds(tasks);
+        for (TaskShutdownDameon shutdown : shutdowns) {
+            try {
+                shutdown.getTask().updateTaskData();
+            } catch (Exception e) {
+                LOG.error("Failed to update task-" + shutdown.getTaskId(), e);
+            }
+        }
+    }
+
+    private boolean isOutTaskConnected(int taskId) {
+        boolean ret = false;
+
+        if (workerData.getInnerTaskTransfer().get(taskId) != null) {
+            // Connections to inner tasks should be done after initialization. 
+            // So return true here for all inner tasks.
+            ret = true;
+        } else {
+            WorkerSlot slot = taskNodeport.get(taskId);
+            if (slot != null) {
+                IConnection connection = nodeportSocket.get(slot);
+                if (connection != null) {
+                    ret = connection.available();
+                }
+            }
+        }
+
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java
index 2006b05..97932b9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java
@@ -21,7 +21,6 @@ import backtype.storm.daemon.Shutdownable;
 
 import com.alibaba.jstorm.cluster.DaemonCommon;
 
-public interface ShutdownableDameon extends Shutdownable, DaemonCommon,
-        Runnable {
+public interface ShutdownableDameon extends Shutdownable, DaemonCommon, Runnable {
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java
index 21dc37c..a769cc1 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java
@@ -38,23 +38,21 @@ import com.alibaba.jstorm.utils.DisruptorRunable;
  * 
  */
 public class VirtualPortDispatch extends DisruptorRunable {
-    private final static Logger LOG = LoggerFactory
-            .getLogger(VirtualPortDispatch.class);
+    private final static Logger LOG = LoggerFactory.getLogger(VirtualPortDispatch.class);
 
     private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
     private IConnection recvConnection;
 
-    public VirtualPortDispatch(WorkerData workerData,
-            IConnection recvConnection, DisruptorQueue recvQueue) {
+    public VirtualPortDispatch(WorkerData workerData, IConnection recvConnection, DisruptorQueue recvQueue) {
         super(recvQueue, MetricDef.DISPATCH_THREAD);
 
         this.recvConnection = recvConnection;
         this.deserializeQueues = workerData.getDeserializeQueues();
 
     }
-    
+
     public void shutdownRecv() {
-    	// don't need send shutdown command to every task
+        // don't need send shutdown command to every task
         // due to every task has been shutdown by workerData.active
         // at the same time queue has been fulll
         // byte shutdownCmd[] = { TaskStatus.SHUTDOWN };
@@ -87,8 +85,7 @@ public class VirtualPortDispatch extends DisruptorRunable {
 
         DisruptorQueue queue = deserializeQueues.get(task);
         if (queue == null) {
-            LOG.warn("Received invalid message directed at port " + task
-                    + ". Dropping...");
+            LOG.warn("Received invalid message directed at port " + task + ". Dropping...");
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java
index d5cf9c8..2bf4c9c 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java
@@ -17,22 +17,6 @@
  */
 package com.alibaba.jstorm.daemon.worker;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
@@ -41,30 +25,31 @@ import backtype.storm.messaging.IContext;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.DisruptorQueue;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
 import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
 import com.alibaba.jstorm.daemon.worker.hearbeat.WorkerHeartbeatRunable;
-import com.alibaba.jstorm.metric.JStormMetricsReporter;
 import com.alibaba.jstorm.task.Task;
 import com.alibaba.jstorm.task.TaskShutdownDameon;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
 import com.alibaba.jstorm.utils.JStormServerUtils;
 import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.NetWorkUtils;
 import com.alibaba.jstorm.utils.PathUtils;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
 
 /**
  * worker entrance
  * 
  * @author yannian/Longda
- * 
  */
 public class Worker {
 
@@ -76,26 +61,14 @@ public class Worker {
     private WorkerData workerData;
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public Worker(Map conf, IContext context, String topology_id,
-            String supervisor_id, int port, String worker_id, String jar_path)
-            throws Exception {
-
-        workerData =
-                new WorkerData(conf, context, topology_id, supervisor_id, port,
-                        worker_id, jar_path);
-
+    public Worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception {
+        workerData = new WorkerData(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
     }
 
     /**
      * get current task's output task list
-     * 
-     * @param tasks_component
-     * @param mk_topology_context
-     * @param task_ids
-     * @throws Exception
      */
     public static Set<Integer> worker_output_tasks(WorkerData workerData) {
-
         ContextMaker context_maker = workerData.getContextMaker();
         Set<Integer> task_ids = workerData.getTaskids();
         StormTopology topology = workerData.getSysTopology();
@@ -103,16 +76,13 @@ public class Worker {
         Set<Integer> rtn = new HashSet<Integer>();
 
         for (Integer taskid : task_ids) {
-            TopologyContext context =
-                    context_maker.makeTopologyContext(topology, taskid, null);
+            TopologyContext context = context_maker.makeTopologyContext(topology, taskid, null);
 
             // <StreamId, <ComponentId, Grouping>>
-            Map<String, Map<String, Grouping>> targets =
-                    context.getThisTargets();
+            Map<String, Map<String, Grouping>> targets = context.getThisTargets();
             for (Map<String, Grouping> e : targets.values()) {
                 for (String componentId : e.keySet()) {
-                    List<Integer> tasks =
-                            context.getComponentTasks(componentId);
+                    List<Integer> tasks = context.getComponentTasks(componentId);
                     rtn.addAll(tasks);
                 }
             }
@@ -140,45 +110,46 @@ public class Worker {
 
         Set<Integer> taskids = workerData.getTaskids();
 
+        Set<Thread> threads = new HashSet<Thread>();
+        List<Task> taskArrayList = new ArrayList<Task>();
         for (int taskid : taskids) {
-
-            TaskShutdownDameon t = Task.mk_task(workerData, taskid);
-
-            shutdowntasks.add(t);
+            Task task = new Task(workerData, taskid);
+            Thread thread =new Thread(task);
+            threads.add(thread);
+            taskArrayList.add(task);
+            thread.start();
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        for (Task t : taskArrayList){
+            shutdowntasks.add(t.getTaskShutdownDameon());
         }
-
         return shutdowntasks;
     }
-    
+
     @Deprecated
     private DisruptorQueue startDispatchDisruptor() {
-    	Map stormConf = workerData.getStormConf();
-
-        int queue_size =
-                Utils.getInt(
-                        stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE),
-                        1024);
-        WaitStrategy waitStrategy =
-                (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
-        DisruptorQueue recvQueue =
-                DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI,
-                        queue_size, waitStrategy);
+        Map stormConf = workerData.getStormConf();
+
+        int queue_size = Utils.getInt(stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE), 1024);
+        WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
+        DisruptorQueue recvQueue = DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI, queue_size, waitStrategy);
         // stop consumerStarted
         recvQueue.consumerStarted();
-        
+
         return recvQueue;
     }
 
     private void startDispatchThread() {
-    	// remove dispatch thread, send tuple directly from nettyserver
-    	//startDispatchDisruptor();
+        // remove dispatch thread, send tuple directly from nettyserver
+        // startDispatchDisruptor();
 
         IContext context = workerData.getContext();
         String topologyId = workerData.getTopologyId();
 
-        IConnection recvConnection =
-                context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues());
-        
+        IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues());
+
         workerData.setRecvConnection(recvConnection);
     }
 
@@ -191,40 +162,27 @@ public class Worker {
         // so create client connection before create task
         // refresh connection
         RefreshConnections refreshConn = makeRefreshConnections();
-        AsyncLoopThread refreshconn =
-                new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY,
-                        true);
+        AsyncLoopThread refreshconn = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true);
         threads.add(refreshconn);
 
         // refresh ZK active status
         RefreshActive refreshZkActive = new RefreshActive(workerData);
-        AsyncLoopThread refreshzk =
-                new AsyncLoopThread(refreshZkActive, false,
-                        Thread.MIN_PRIORITY, true);
+        AsyncLoopThread refreshzk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true);
         threads.add(refreshzk);
 
         // Sync heartbeat to Apsara Container
-        AsyncLoopThread syncContainerHbThread =
-                SyncContainerHb.mkWorkerInstance(workerData.getStormConf());
+        AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf());
         if (syncContainerHbThread != null) {
             threads.add(syncContainerHbThread);
         }
 
-        JStormMetricsReporter metricReporter =
-                new JStormMetricsReporter(workerData);
-        AsyncLoopThread metricThread = new AsyncLoopThread(metricReporter);
-        threads.add(metricThread);
-
-        // create task heartbeat
-        TaskHeartbeatRunable taskHB = new TaskHeartbeatRunable(workerData);
-        AsyncLoopThread taskHBThread = new AsyncLoopThread(taskHB);
-        threads.add(taskHBThread);
+        JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData);
+        metricReporter.init();
+        workerData.setMetricsReporter(metricReporter);
 
         // refresh hearbeat to Local dir
         RunnableCallback heartbeat_fn = new WorkerHeartbeatRunable(workerData);
-        AsyncLoopThread hb =
-                new AsyncLoopThread(heartbeat_fn, false, null,
-                        Thread.NORM_PRIORITY, true);
+        AsyncLoopThread hb = new AsyncLoopThread(heartbeat_fn, false, null, Thread.NORM_PRIORITY, true);
         threads.add(hb);
 
         // shutdown task callbacks
@@ -239,7 +197,6 @@ public class Worker {
      * create worker instance and run it
      * 
      * @param conf
-     * @param mq_context
      * @param topology_id
      * @param supervisor_id
      * @param port
@@ -248,9 +205,8 @@ public class Worker {
      * @throws Exception
      */
     @SuppressWarnings("rawtypes")
-    public static WorkerShutdown mk_worker(Map conf, IContext context,
-            String topology_id, String supervisor_id, int port,
-            String worker_id, String jar_path) throws Exception {
+    public static WorkerShutdown mk_worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path)
+            throws Exception {
 
         StringBuilder sb = new StringBuilder();
         sb.append("topologyId:" + topology_id + ", ");
@@ -260,9 +216,7 @@ public class Worker {
 
         LOG.info("Begin to run worker:" + sb.toString());
 
-        Worker w =
-                new Worker(conf, context, topology_id, supervisor_id, port,
-                        worker_id, jar_path);
+        Worker w = new Worker(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
 
         w.redirectOutput();
 
@@ -271,8 +225,7 @@ public class Worker {
 
     public void redirectOutput() {
 
-        if (System.getenv("REDIRECT") == null
-                || !System.getenv("REDIRECT").equals("true")) {
+        if (System.getenv("REDIRECT") == null || !System.getenv("REDIRECT").equals("true")) {
             return;
         }
 
@@ -283,9 +236,7 @@ public class Worker {
             DEFAULT_OUT_TARGET_FILE += ".out";
         }
 
-        String outputFile =
-                ConfigExtension.getWorkerRedirectOutputFile(workerData
-                        .getStormConf());
+        String outputFile = ConfigExtension.getWorkerRedirectOutputFile(workerData.getStormConf());
         if (outputFile == null) {
             outputFile = DEFAULT_OUT_TARGET_FILE;
         } else {
@@ -302,7 +253,6 @@ public class Worker {
                         outputFile = DEFAULT_OUT_TARGET_FILE;
                     }
                 }
-
             } catch (Exception e) {
                 LOG.warn("Failed to touch " + outputFile, e);
                 outputFile = DEFAULT_OUT_TARGET_FILE;
@@ -318,9 +268,7 @@ public class Worker {
     }
 
     /**
-     * Have one problem if the worker's start parameter length is longer than
-     * 4096, ps -ef|grep com.alibaba.jstorm.daemon.worker.Worker can't find
-     * worker
+     * Have one problem if the worker's start parameter length is longer than 4096, ps -ef|grep com.alibaba.jstorm.daemon.worker.Worker can't find worker
      * 
      * @param port
      */
@@ -341,15 +289,11 @@ public class Worker {
 
         try {
             LOG.info("Begin to execute " + sb.toString());
-            Process process =
-                    JStormUtils.launch_process(sb.toString(),
-                            new HashMap<String, String>(), false);
-
+            Process process = JStormUtils.launch_process(sb.toString(), new HashMap<String, String>(), false);
             // Process process = Runtime.getRuntime().exec(sb.toString());
 
             InputStream stdin = process.getInputStream();
-            BufferedReader reader =
-                    new BufferedReader(new InputStreamReader(stdin));
+            BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
 
             JStormUtils.sleepMs(1000);
 
@@ -405,7 +349,6 @@ public class Worker {
                             LOG.info("Skip kill myself");
                             continue;
                         }
-
                         Integer pid = Integer.valueOf(fields[1]);
 
                         LOG.info("Find one process :" + pid.toString());
@@ -415,9 +358,7 @@ public class Worker {
                         continue;
                     }
                 }
-
             }
-
             return ret;
         } catch (IOException e) {
             LOG.info("Failed to execute " + sb.toString());
@@ -429,13 +370,10 @@ public class Worker {
     }
 
     public static void killOldWorker(String port) {
-
         List<Integer> oldPids = getOldPortPids(port);
         for (Integer pid : oldPids) {
-
             JStormUtils.kill(pid);
         }
-
     }
 
     /**
@@ -456,7 +394,6 @@ public class Worker {
         }
 
         StringBuilder sb = new StringBuilder();
-
         try {
             String topology_id = args[0];
             String supervisor_id = args[1];
@@ -476,9 +413,7 @@ public class Worker {
             sb.append("workerId:" + worker_id + ", ");
             sb.append("jar_path:" + jar_path + "\n");
 
-            WorkerShutdown sd =
-                    mk_worker(conf, null, topology_id, supervisor_id,
-                            Integer.parseInt(port_str), worker_id, jar_path);
+            WorkerShutdown sd = mk_worker(conf, null, topology_id, supervisor_id, Integer.parseInt(port_str), worker_id, jar_path);
             sd.join();
 
             LOG.info("Successfully shutdown worker " + sb.toString());