You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:53 UTC

[15/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java
index e984455..fab05ea 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java
@@ -17,51 +17,36 @@
  */
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.alibaba.jstorm.callback.Callback;
-import com.alibaba.jstorm.callback.impl.ActiveTransitionCallback;
-import com.alibaba.jstorm.callback.impl.DoRebalanceTransitionCallback;
-import com.alibaba.jstorm.callback.impl.DoneRebalanceTransitionCallback;
-import com.alibaba.jstorm.callback.impl.InactiveTransitionCallback;
-import com.alibaba.jstorm.callback.impl.KillTransitionCallback;
-import com.alibaba.jstorm.callback.impl.ReassignTransitionCallback;
-import com.alibaba.jstorm.callback.impl.RebalanceTransitionCallback;
-import com.alibaba.jstorm.callback.impl.RemoveTransitionCallback;
-import com.alibaba.jstorm.callback.impl.UpdateConfTransitionCallback;
+import com.alibaba.jstorm.callback.impl.*;
 import com.alibaba.jstorm.cluster.StormBase;
 import com.alibaba.jstorm.cluster.StormStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Status changing
  * 
  * @author version1: lixin version2: Longda
- * 
- * 
- * 
  */
 public class StatusTransition {
 
-    private final static Logger LOG = LoggerFactory
-            .getLogger(StatusTransition.class);
+    private final static Logger LOG = LoggerFactory.getLogger(StatusTransition.class);
 
     private NimbusData data;
 
-    private Map<String, Object> topologyLocks =
-            new ConcurrentHashMap<String, Object>();
+    private Map<String, Object> topologyLocks = new ConcurrentHashMap<String, Object>();
 
     public StatusTransition(NimbusData data) {
         this.data = data;
 
     }
 
-    public <T> void transition(String topologyid, boolean errorOnNoTransition,
-            StatusType changeStatus, T... args) throws Exception {
+    public <T> void transition(String topologyid, boolean errorOnNoTransition, StatusType changeStatus, T... args) throws Exception {
         // lock outside
         Object lock = topologyLocks.get(topologyid);
         if (lock == null) {
@@ -70,8 +55,7 @@ public class StatusTransition {
         }
 
         if (data.getIsShutdown().get() == true) {
-            LOG.info("Nimbus is in shutdown, skip this event " + topologyid
-                    + ":" + changeStatus);
+            LOG.info("Nimbus is in shutdown, skip this event " + topologyid + ":" + changeStatus);
             return;
         }
 
@@ -86,50 +70,34 @@ public class StatusTransition {
     /**
      * Changing status
      * 
-     * @param topologyId
-     * @param errorOnNTransition if it is true, failure will throw exception
      * @param args -- will be used in the status changing callback
-     * 
      */
-    public <T> void transitionLock(String topologyid,
-            boolean errorOnNoTransition, StatusType changeStatus, T... args)
-            throws Exception {
+    public <T> void transitionLock(String topologyid, boolean errorOnNoTransition, StatusType changeStatus, T... args) throws Exception {
 
         // get ZK's topology node's data, which is StormBase
-        StormBase stormbase =
-                data.getStormClusterState().storm_base(topologyid, null);
+        StormBase stormbase = data.getStormClusterState().storm_base(topologyid, null);
         if (stormbase == null) {
 
-            LOG.error("Cannot apply event changing status "
-                    + changeStatus.getStatus() + " to " + topologyid
-                    + " because failed to get StormBase from ZK");
+            LOG.error("Cannot apply event changing status " + changeStatus.getStatus() + " to " + topologyid + " because failed to get StormBase from ZK");
             return;
         }
 
         StormStatus currentStatus = stormbase.getStatus();
         if (currentStatus == null) {
-            LOG.error("Cannot apply event changing status "
-                    + changeStatus.getStatus() + " to " + topologyid
-                    + " because topologyStatus is null in ZK");
+            LOG.error("Cannot apply event changing status " + changeStatus.getStatus() + " to " + topologyid + " because topologyStatus is null in ZK");
             return;
         }
 
         // <currentStatus, Map<changingStatus, callback>>
-        Map<StatusType, Map<StatusType, Callback>> callbackMap =
-                stateTransitions(topologyid, currentStatus);
+        Map<StatusType, Map<StatusType, Callback>> callbackMap = stateTransitions(topologyid, currentStatus);
 
         // get current changingCallbacks
-        Map<StatusType, Callback> changingCallbacks =
-                callbackMap.get(currentStatus.getStatusType());
+        Map<StatusType, Callback> changingCallbacks = callbackMap.get(currentStatus.getStatusType());
 
-        if (changingCallbacks == null
-                || changingCallbacks.containsKey(changeStatus) == false
-                || changingCallbacks.get(changeStatus) == null) {
+        if (changingCallbacks == null || changingCallbacks.containsKey(changeStatus) == false || changingCallbacks.get(changeStatus) == null) {
             String msg =
-                    "No transition for event: changing status:"
-                            + changeStatus.getStatus() + ", current status: "
-                            + currentStatus.getStatusType() + " topology-id: "
-                            + topologyid;
+                    "No transition for event: changing status:" + changeStatus.getStatus() + ", current status: " + currentStatus.getStatusType()
+                            + " topology-id: " + topologyid;
             LOG.info(msg);
             if (errorOnNoTransition) {
                 throw new RuntimeException(msg);
@@ -144,12 +112,10 @@ public class StatusTransition {
             StormStatus newStatus = (StormStatus) obj;
             // update status to ZK
             data.getStormClusterState().update_storm(topologyid, newStatus);
-            LOG.info("Successfully updated " + topologyid + " as status "
-                    + newStatus);
+            LOG.info("Successfully updated " + topologyid + " as status " + newStatus);
         }
 
-        LOG.info("Successfully apply event changing status "
-                + changeStatus.getStatus() + " to " + topologyid);
+        LOG.info("Successfully apply event changing status " + changeStatus.getStatus() + " to " + topologyid);
         return;
 
     }
@@ -157,104 +123,74 @@ public class StatusTransition {
     /**
      * generate status changing map
      * 
-     * 
-     * 
      * @param topologyid
-     * @param status
-     * @return
-     * 
-     *         Map<StatusType, Map<StatusType, Callback>> means
-     *         Map<currentStatus, Map<changingStatus, Callback>>
+     * @return Map<StatusType, Map<StatusType, Callback>> means Map<currentStatus, Map<changingStatus, Callback>>
      */
 
-    private Map<StatusType, Map<StatusType, Callback>> stateTransitions(
-            String topologyid, StormStatus currentStatus) {
+    private Map<StatusType, Map<StatusType, Callback>> stateTransitions(String topologyid, StormStatus currentStatus) {
 
         /**
          * 
-         * 1. Status: this status will be stored in ZK
-         * killed/inactive/active/rebalancing 2. action:
+         * 1. Status: this status will be stored in ZK killed/inactive/active/rebalancing 2. action:
          * 
-         * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger
-         * this only valid when current status is active inactivate -- client
-         * will trigger this action, only valid when current status is active
-         * activate -- client will trigger this action only valid when current
-         * status is inactive startup -- when nimbus startup, it will trigger
-         * this action only valid when current status is killed/rebalancing kill
-         * -- client kill topology will trigger this action, only valid when
-         * current status is active/inactive/killed remove -- 30 seconds after
-         * client submit kill command, it will do this action, only valid when
-         * current status is killed rebalance -- client submit rebalance
-         * command, only valid when current status is active/deactive
-         * do_rebalance -- 30 seconds after client submit rebalance command, it
-         * will do this action, only valid when current status is rebalance
+         * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this only valid when current status is active inactivate -- client will trigger
+         * this action, only valid when current status is active activate -- client will trigger this action only valid when current status is inactive startup
+         * -- when nimbus startup, it will trigger this action only valid when current status is killed/rebalancing kill -- client kill topology will trigger
+         * this action, only valid when current status is active/inactive/killed remove -- 30 seconds after client submit kill command, it will do this action,
+         * only valid when current status is killed rebalance -- client submit rebalance command, only valid when current status is active/deactive do_rebalance
+         * -- 30 seconds after client submit rebalance command, it will do this action, only valid when current status is rebalance
          */
 
-        Map<StatusType, Map<StatusType, Callback>> rtn =
-                new HashMap<StatusType, Map<StatusType, Callback>>();
+        Map<StatusType, Map<StatusType, Callback>> rtn = new HashMap<StatusType, Map<StatusType, Callback>>();
 
         // current status is active
-        Map<StatusType, Callback> activeMap =
-                new HashMap<StatusType, Callback>();
-        activeMap.put(StatusType.monitor, new ReassignTransitionCallback(data,
-                topologyid));
+        Map<StatusType, Callback> activeMap = new HashMap<StatusType, Callback>();
+        activeMap.put(StatusType.monitor, new ReassignTransitionCallback(data, topologyid));
         activeMap.put(StatusType.inactivate, new InactiveTransitionCallback());
         activeMap.put(StatusType.startup, null);
         activeMap.put(StatusType.activate, null);
-        activeMap.put(StatusType.kill, new KillTransitionCallback(data,
-                topologyid));
+        activeMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid));
         activeMap.put(StatusType.remove, null);
-        activeMap.put(StatusType.rebalance, new RebalanceTransitionCallback(
-                data, topologyid, currentStatus));
+        activeMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus));
         activeMap.put(StatusType.do_rebalance, null);
         activeMap.put(StatusType.done_rebalance, null);
-        activeMap.put(StatusType.update_conf, new UpdateConfTransitionCallback(
-                data, topologyid, currentStatus));
+        activeMap.put(StatusType.update_topology, new UpdateTopologyTransitionCallback(data, topologyid, currentStatus));
 
         rtn.put(StatusType.active, activeMap);
 
         // current status is inactive
-        Map<StatusType, Callback> inactiveMap =
-                new HashMap<StatusType, Callback>();
+        Map<StatusType, Callback> inactiveMap = new HashMap<StatusType, Callback>();
 
-        inactiveMap.put(StatusType.monitor, new ReassignTransitionCallback(
-                data, topologyid, new StormStatus(StatusType.inactive)));
+        inactiveMap.put(StatusType.monitor, new ReassignTransitionCallback(data, topologyid, new StormStatus(StatusType.inactive)));
         inactiveMap.put(StatusType.inactivate, null);
         inactiveMap.put(StatusType.startup, null);
         inactiveMap.put(StatusType.activate, new ActiveTransitionCallback());
-        inactiveMap.put(StatusType.kill, new KillTransitionCallback(data,
-                topologyid));
+        inactiveMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid));
         inactiveMap.put(StatusType.remove, null);
-        inactiveMap.put(StatusType.rebalance, new RebalanceTransitionCallback(
-                data, topologyid, currentStatus));
+        inactiveMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus));
         inactiveMap.put(StatusType.do_rebalance, null);
         inactiveMap.put(StatusType.done_rebalance, null);
-        inactiveMap.put(StatusType.update_conf, null);
+        inactiveMap.put(StatusType.update_topology, null);
 
         rtn.put(StatusType.inactive, inactiveMap);
 
         // current status is killed
-        Map<StatusType, Callback> killedMap =
-                new HashMap<StatusType, Callback>();
+        Map<StatusType, Callback> killedMap = new HashMap<StatusType, Callback>();
 
         killedMap.put(StatusType.monitor, null);
         killedMap.put(StatusType.inactivate, null);
-        killedMap.put(StatusType.startup, new KillTransitionCallback(data,
-                topologyid));
+        killedMap.put(StatusType.startup, new KillTransitionCallback(data, topologyid));
         killedMap.put(StatusType.activate, null);
-        killedMap.put(StatusType.kill, new KillTransitionCallback(data,
-                topologyid));
-        killedMap.put(StatusType.remove, new RemoveTransitionCallback(data,
-                topologyid));
+        killedMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid));
+        killedMap.put(StatusType.remove, new RemoveTransitionCallback(data, topologyid));
         killedMap.put(StatusType.rebalance, null);
         killedMap.put(StatusType.do_rebalance, null);
         killedMap.put(StatusType.done_rebalance, null);
-        killedMap.put(StatusType.update_conf, null);
+        killedMap.put(StatusType.update_topology, null);
         rtn.put(StatusType.killed, killedMap);
 
         // current status is under rebalancing
-        Map<StatusType, Callback> rebalancingMap =
-                new HashMap<StatusType, Callback>();
+        Map<StatusType, Callback> rebalancingMap = new HashMap<StatusType, Callback>();
 
         StatusType rebalanceOldStatus = StatusType.active;
         if (currentStatus.getOldStatus() != null) {
@@ -267,20 +203,14 @@ public class StatusTransition {
 
         rebalancingMap.put(StatusType.monitor, null);
         rebalancingMap.put(StatusType.inactivate, null);
-        rebalancingMap.put(StatusType.startup, new RebalanceTransitionCallback(
-                data, topologyid, new StormStatus(rebalanceOldStatus)));
+        rebalancingMap.put(StatusType.startup, new RebalanceTransitionCallback(data, topologyid, new StormStatus(rebalanceOldStatus)));
         rebalancingMap.put(StatusType.activate, null);
         rebalancingMap.put(StatusType.kill, null);
         rebalancingMap.put(StatusType.remove, null);
-        rebalancingMap
-                .put(StatusType.rebalance, new RebalanceTransitionCallback(
-                        data, topologyid, currentStatus));
-        rebalancingMap.put(StatusType.do_rebalance,
-                new DoRebalanceTransitionCallback(data, topologyid,
-                        new StormStatus(rebalanceOldStatus)));
-        rebalancingMap.put(StatusType.done_rebalance,
-                new DoneRebalanceTransitionCallback(data, topologyid));
-        rebalancingMap.put(StatusType.update_conf, null);
+        rebalancingMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus));
+        rebalancingMap.put(StatusType.do_rebalance, new DoRebalanceTransitionCallback(data, topologyid, new StormStatus(rebalanceOldStatus)));
+        rebalancingMap.put(StatusType.done_rebalance, new DoneRebalanceTransitionCallback(data, topologyid));
+        rebalancingMap.put(StatusType.update_topology, null);
         rtn.put(StatusType.rebalancing, rebalancingMap);
 
         /**
@@ -288,7 +218,6 @@ public class StatusTransition {
          */
 
         return rtn;
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java
index cf785b7..d0f68ff 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java
@@ -20,21 +20,14 @@ package com.alibaba.jstorm.daemon.nimbus;
 /**
  * topology status:
  * 
- * 1. Status: this status will be stored in ZK
- * killed/inactive/active/rebalancing 2. action:
+ * 1. Status: this status will be stored in ZK killed/inactive/active/rebalancing 2. action:
  * 
- * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this
- * only valid when current status is active inactivate -- client will trigger
- * this action, only valid when current status is active activate -- client will
- * trigger this action only valid when current status is inactive startup --
- * when nimbus startup, it will trigger this action only valid when current
- * status is killed/rebalancing kill -- client kill topology will trigger this
- * action, only valid when current status is active/inactive/killed remove -- 30
- * seconds after client submit kill command, it will do this action, only valid
- * when current status is killed rebalance -- client submit rebalance command,
- * only valid when current status is active/deactive do_rebalance -- 30 seconds
- * after client submit rebalance command, it will do this action, only valid
- * when current status is rebalance
+ * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this only valid when current status is active inactivate -- client will trigger this
+ * action, only valid when current status is active activate -- client will trigger this action only valid when current status is inactive startup -- when
+ * nimbus startup, it will trigger this action only valid when current status is killed/rebalancing kill -- client kill topology will trigger this action, only
+ * valid when current status is active/inactive/killed remove -- 30 seconds after client submit kill command, it will do this action, only valid when current
+ * status is killed rebalance -- client submit rebalance command, only valid when current status is active/deactive do_rebalance -- 30 seconds after client
+ * submit rebalance command, it will do this action, only valid when current status is rebalance
  * 
  * 
  * 
@@ -43,13 +36,11 @@ package com.alibaba.jstorm.daemon.nimbus;
 public enum StatusType {
 
     // status
-    active("active"), inactive("inactive"), rebalancing("rebalancing"), killed(
-            "killed"),
+    active("active"), inactive("inactive"), rebalancing("rebalancing"), killed("killed"),
 
     // actions
-    activate("activate"), inactivate("inactivate"), monitor("monitor"), startup(
-            "startup"), kill("kill"), remove("remove"), rebalance("rebalance"), do_rebalance(
-            "do-rebalance"), done_rebalance("done-rebalance"), update_conf("update-config");
+    activate("activate"), inactivate("inactivate"), monitor("monitor"), startup("startup"), kill("kill"), remove("remove"), rebalance("rebalance"), do_rebalance(
+            "do-rebalance"), done_rebalance("done-rebalance"), update_topology("update-topoloogy");
 
     private String status;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java
index fd6f461..51da198 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java
@@ -17,33 +17,12 @@
  */
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.scheduler.WorkerSlot;
-
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.StormBase;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.cluster.StormStatus;
+import com.alibaba.jstorm.cluster.*;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.TaskStartEvent;
 import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
 import com.alibaba.jstorm.schedule.Assignment;
 import com.alibaba.jstorm.schedule.AssignmentBak;
@@ -51,15 +30,23 @@ import com.alibaba.jstorm.schedule.IToplogyScheduler;
 import com.alibaba.jstorm.schedule.TopologyAssignContext;
 import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyScheduler;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.alibaba.jstorm.task.TaskInfo;
 import com.alibaba.jstorm.utils.FailedAssignTopologyException;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.PathUtils;
 import com.alibaba.jstorm.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class TopologyAssign implements Runnable {
 
-    private final static Logger LOG = LoggerFactory
-            .getLogger(TopologyAssign.class);
+    private final static Logger LOG = LoggerFactory.getLogger(TopologyAssign.class);
 
     /**
      * private constructor function to avoid multiple instance
@@ -93,7 +80,7 @@ public class TopologyAssign implements Runnable {
     public void init(NimbusData nimbusData) {
         this.nimbusData = nimbusData;
 
-        //this.cleanupTimeoutSec = 60;
+        // this.cleanupTimeoutSec = 60;
 
         this.schedulers = new HashMap<String, IToplogyScheduler>();
 
@@ -113,8 +100,7 @@ public class TopologyAssign implements Runnable {
         thread.interrupt();
     }
 
-    protected static LinkedBlockingQueue<TopologyAssignEvent> queue =
-            new LinkedBlockingQueue<TopologyAssignEvent>();
+    protected static LinkedBlockingQueue<TopologyAssignEvent> queue = new LinkedBlockingQueue<TopologyAssignEvent>();
 
     public static void push(TopologyAssignEvent event) {
         queue.offer(event);
@@ -159,12 +145,37 @@ public class TopologyAssign implements Runnable {
      * @return
      */
     protected boolean doTopologyAssignment(TopologyAssignEvent event) {
-        Assignment assignment = null;
+        Assignment assignment;
         try {
+            Assignment oldAssignment = null;
+            boolean isReassign = event.isScratch();
+            if (isReassign) {
+                oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
+            }
             assignment = mkAssignment(event);
 
-            if (!(event.isScratch()))
+            // notify jstorm monitor on task assign/reassign/rebalance
+            TaskStartEvent taskEvent = new TaskStartEvent();
+            taskEvent.oldAssignment = oldAssignment;
+            taskEvent.newAssignment = assignment;
+            taskEvent.topologyId = event.getTopologyId();
+            taskEvent.clusterName = nimbusData.getClusterName();
+            taskEvent.timestamp = System.currentTimeMillis();
+
+            Map<Integer, String> task2Component;
+            // get from nimbus cache first
+            Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId());
+            if (taskInfoMap != null) {
+                task2Component = Common.getTaskToComponent(taskInfoMap);
+            } else {
+                task2Component = Common.getTaskToComponent(Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId()));
+            }
+            taskEvent.task2Component = task2Component;
+            nimbusData.getMetricRunnable().pushEvent(taskEvent);
+
+            if (!isReassign) {
                 setTopologyStatus(event);
+            }
         } catch (Throwable e) {
             LOG.error("Failed to assign topology " + event.getTopologyId(), e);
             event.fail(e.getMessage());
@@ -180,8 +191,6 @@ public class TopologyAssign implements Runnable {
     /**
      * cleanup the topologies which are not in ZK /topology, but in other place
      * 
-     * @param nimbusData
-     * @param active_topologys
      * @throws Exception
      */
     public void cleanupDisappearedTopology() throws Exception {
@@ -192,8 +201,7 @@ public class TopologyAssign implements Runnable {
             return;
         }
 
-        Set<String> cleanupIds =
-                get_cleanup_ids(clusterState, active_topologys);
+        Set<String> cleanupIds = get_cleanup_ids(clusterState, active_topologys);
 
         for (String topologyId : cleanupIds) {
 
@@ -202,13 +210,12 @@ public class TopologyAssign implements Runnable {
             clusterState.try_remove_storm(topologyId);
             //
             nimbusData.getTaskHeartbeatsCache().remove(topologyId);
+            nimbusData.getTasksHeartbeat().remove(topologyId);
 
             NimbusUtils.removeTopologyTaskTimeout(nimbusData, topologyId);
 
             // get /nimbus/stormdist/topologyId
-            String master_stormdist_root =
-                    StormConfig.masterStormdistRoot(nimbusData.getConf(),
-                            topologyId);
+            String master_stormdist_root = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId);
             try {
                 // delete topologyId local dir
                 PathUtils.rmr(master_stormdist_root);
@@ -218,14 +225,12 @@ public class TopologyAssign implements Runnable {
         }
     }
 
-    private void get_code_ids(List<String> code_ids,
-            HashSet<String> latest_code_ids) throws IOException {
+    private void get_code_ids(List<String> code_ids, HashSet<String> latest_code_ids) throws IOException {
         Map conf = nimbusData.getConf();
 
         String master_stormdist_root = StormConfig.masterStormdistRoot(conf);
         // listdir /local-dir/nimbus/stormdist
-        List<String> all_code_ids =
-                PathUtils.read_dir_contents(master_stormdist_root);
+        List<String> all_code_ids = PathUtils.read_dir_contents(master_stormdist_root);
         code_ids.addAll(all_code_ids);
 
         long now = System.currentTimeMillis();
@@ -238,9 +243,6 @@ public class TopologyAssign implements Runnable {
 
                 long modify = file.lastModified();
 
-                if (now - modify < cleanupTimeoutSec * 1000) {
-                    latest_code_ids.add(dir);
-                }
             } catch (Exception exception) {
                 LOG.error("Failed to get modify time of " + dir, exception);
             }
@@ -256,14 +258,14 @@ public class TopologyAssign implements Runnable {
      * @return
      * @throws Exception
      */
-    private Set<String> get_cleanup_ids(StormClusterState clusterState,
-            List<String> active_topologys) throws Exception {
+    private Set<String> get_cleanup_ids(StormClusterState clusterState, List<String> active_topologys) throws Exception {
 
         List<String> task_ids = clusterState.task_storms();
         List<String> heartbeat_ids = clusterState.heartbeat_storms();
         List<String> error_ids = clusterState.task_error_storms();
         List<String> assignment_ids = clusterState.assignments(null);
         List<String> metric_ids = clusterState.get_metrics();
+        List<String> backpressure_ids = clusterState.backpressureInfos();
 
         List<String> code_ids = new ArrayList<String>();
         HashSet<String> latest_code_ids = new HashSet<String>();
@@ -272,8 +274,7 @@ public class TopologyAssign implements Runnable {
         // Set<String> assigned_ids =
         // JStormUtils.listToSet(clusterState.active_storms());
         Set<String> to_cleanup_ids = new HashSet<String>();
-        Set<String> pendingTopologys =
-                nimbusData.getPendingSubmitTopoloygs().keySet();
+        Set<String> pendingTopologys = nimbusData.getPendingSubmitTopoloygs().keySet();
 
         if (task_ids != null) {
             to_cleanup_ids.addAll(task_ids);
@@ -294,9 +295,13 @@ public class TopologyAssign implements Runnable {
         if (code_ids != null) {
             to_cleanup_ids.addAll(code_ids);
         }
-        
+
         if (metric_ids != null) {
-        	to_cleanup_ids.addAll(metric_ids);
+            to_cleanup_ids.addAll(metric_ids);
+        }
+
+        if (backpressure_ids != null) {
+            to_cleanup_ids.addAll(backpressure_ids);
         }
 
         if (active_topologys != null) {
@@ -309,8 +314,7 @@ public class TopologyAssign implements Runnable {
         }
 
         /**
-         * Why need to remove latest code. Due to competition between
-         * Thrift.threads and TopologyAssign thread
+         * Why need to remove latest code. Due to competition between Thrift.threads and TopologyAssign thread
          * 
          */
         to_cleanup_ids.removeAll(latest_code_ids);
@@ -321,11 +325,6 @@ public class TopologyAssign implements Runnable {
 
     /**
      * start a topology: set active status of the topology
-     * 
-     * @param topologyName
-     * @param stormClusterState
-     * @param topologyId
-     * @throws Exception
      */
     public void setTopologyStatus(TopologyAssignEvent event) throws Exception {
         StormClusterState stormClusterState = nimbusData.getStormClusterState();
@@ -339,15 +338,11 @@ public class TopologyAssign implements Runnable {
             status = event.getOldStatus();
         }
 
-        boolean isEnable =
-                ConfigExtension
-                        .isEnablePerformanceMetrics(nimbusData.getConf());
+        boolean isEnable = ConfigExtension.isEnablePerformanceMetrics(nimbusData.getConf());
 
         StormBase stormBase = stormClusterState.storm_base(topologyId, null);
         if (stormBase == null) {
-            stormBase =
-                    new StormBase(topologyName, TimeUtils.current_time_secs(),
-                            status, group);
+            stormBase = new StormBase(topologyName, TimeUtils.current_time_secs(), status, group);
             stormBase.setEnableMonitor(isEnable);
             stormClusterState.activate_storm(topologyId, stormBase);
 
@@ -367,18 +362,20 @@ public class TopologyAssign implements Runnable {
 
     }
 
-    protected TopologyAssignContext prepareTopologyAssign(
-            TopologyAssignEvent event) throws Exception {
+    protected TopologyAssignContext prepareTopologyAssign(TopologyAssignEvent event) throws Exception {
         TopologyAssignContext ret = new TopologyAssignContext();
 
         String topologyId = event.getTopologyId();
+        ret.setTopologyId(topologyId);
+
+        int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
+        ret.setTopologyMasterTaskId(topoMasterId);
+        LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);
 
         Map<Object, Object> nimbusConf = nimbusData.getConf();
-        Map<Object, Object> topologyConf =
-                StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
+        Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
 
-        StormTopology rawTopology =
-                StormConfig.read_nimbus_topology_code(nimbusConf, topologyId);
+        StormTopology rawTopology = StormConfig.read_nimbus_topology_code(nimbusConf, topologyId);
         ret.setRawTopology(rawTopology);
 
         Map stormConf = new HashMap();
@@ -389,8 +386,7 @@ public class TopologyAssign implements Runnable {
         StormClusterState stormClusterState = nimbusData.getStormClusterState();
 
         // get all running supervisor, don't need callback to watch supervisor
-        Map<String, SupervisorInfo> supInfos =
-                Cluster.get_all_SupervisorInfo(stormClusterState, null);
+        Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
         // init all AvailableWorkerPorts
         for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
             SupervisorInfo supervisor = supInfo.getValue();
@@ -400,21 +396,16 @@ public class TopologyAssign implements Runnable {
 
         getAliveSupervsByHb(supInfos, nimbusConf);
         if (supInfos.size() == 0) {
-            throw new FailedAssignTopologyException(
-                    "Failed to make assignment " + topologyId
-                            + ", due to no alive supervisor");
+            throw new FailedAssignTopologyException("Failed to make assignment " + topologyId + ", due to no alive supervisor");
         }
 
-        Map<Integer, String> taskToComponent =
-                Cluster.get_all_task_component(stormClusterState, topologyId, null);
+        Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
         ret.setTaskToComponent(taskToComponent);
 
         // get taskids /ZK/tasks/topologyId
         Set<Integer> allTaskIds = taskToComponent.keySet();
         if (allTaskIds == null || allTaskIds.size() == 0) {
-            String errMsg =
-                    "Failed to get all task ID list from /ZK-dir/tasks/"
-                            + topologyId;
+            String errMsg = "Failed to get all task ID list from /ZK-dir/tasks/" + topologyId;
             LOG.warn(errMsg);
             throw new IOException(errMsg);
         }
@@ -425,18 +416,31 @@ public class TopologyAssign implements Runnable {
         // machine
         Set<Integer> unstoppedTasks = new HashSet<Integer>();
         Set<Integer> deadTasks = new HashSet<Integer>();
-        Set<ResourceWorkerSlot> unstoppedWorkers =
-                new HashSet<ResourceWorkerSlot>();
+        Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>();
 
-        Assignment existingAssignment =
-                stormClusterState.assignment_info(topologyId, null);
+        Assignment existingAssignment = stormClusterState.assignment_info(topologyId, null);
         if (existingAssignment != null) {
             aliveTasks = getAliveTasks(topologyId, allTaskIds);
-            unstoppedTasks =
-                    getUnstoppedSlots(aliveTasks, supInfos, existingAssignment);
 
-            deadTasks.addAll(allTaskIds);
-            deadTasks.removeAll(aliveTasks);
+            /*
+             * Check if the topology master task is alive first since all task 
+             * heartbeat info is reported by topology master. 
+             * If master is dead, do reassignment for topology master first.
+             */
+            if (aliveTasks.contains(topoMasterId) == false) {
+                ResourceWorkerSlot worker = existingAssignment.getWorkerByTaskId(topoMasterId);
+                deadTasks.addAll(worker.getTasks());
+
+                Set<Integer> tempSet = new HashSet<Integer>(allTaskIds);
+                tempSet.removeAll(deadTasks);
+                aliveTasks.addAll(tempSet);
+                aliveTasks.removeAll(deadTasks);
+            } else {
+                deadTasks.addAll(allTaskIds);
+                deadTasks.removeAll(aliveTasks);
+            }
+
+            unstoppedTasks = getUnstoppedSlots(aliveTasks, supInfos, existingAssignment);
         }
 
         ret.setDeadTaskIds(deadTasks);
@@ -451,9 +455,7 @@ public class TopologyAssign implements Runnable {
             ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_NEW);
 
             try {
-                AssignmentBak lastAssignment =
-                        stormClusterState.assignment_bak(event
-                                .getTopologyName());
+                AssignmentBak lastAssignment = stormClusterState.assignment_bak(event.getTopologyName());
                 if (lastAssignment != null) {
                     ret.setOldAssignment(lastAssignment.getAssignment());
                 }
@@ -465,13 +467,11 @@ public class TopologyAssign implements Runnable {
             if (event.isScratch()) {
                 ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_REBALANCE);
                 ret.setIsReassign(event.isReassign());
-                unstoppedWorkers =
-                        getUnstoppedWorkers(unstoppedTasks, existingAssignment);
+                unstoppedWorkers = getUnstoppedWorkers(unstoppedTasks, existingAssignment);
                 ret.setUnstoppedWorkers(unstoppedWorkers);
             } else {
                 ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_MONITOR);
-                unstoppedWorkers =
-                        getUnstoppedWorkers(aliveTasks, existingAssignment);
+                unstoppedWorkers = getUnstoppedWorkers(aliveTasks, existingAssignment);
                 ret.setUnstoppedWorkers(unstoppedWorkers);
             }
         }
@@ -480,13 +480,8 @@ public class TopologyAssign implements Runnable {
     }
 
     /**
-     * make assignments for a topology The nimbus core function, this function
-     * has been totally rewrite
+     * make assignments for a topology The nimbus core function, this function has been totally rewrite
      * 
-     * @param nimbusData NimbusData
-     * @param topologyId String
-     * @param isScratch Boolean: isScratch is false unless rebalancing the
-     *            topology
      * @throws Exception
      */
     public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
@@ -500,8 +495,7 @@ public class TopologyAssign implements Runnable {
 
         if (!StormConfig.local_mode(nimbusData.getConf())) {
 
-            IToplogyScheduler scheduler =
-                    schedulers.get(DEFAULT_SCHEDULER_NAME);
+            IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
 
             assignments = scheduler.assignTasks(context);
 
@@ -511,29 +505,24 @@ public class TopologyAssign implements Runnable {
 
         Assignment assignment = null;
         if (assignments != null && assignments.size() > 0) {
-            Map<String, String> nodeHost =
-                    getTopologyNodeHost(context.getCluster(),
-                            context.getOldAssignment(), assignments);
+            Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments);
 
-            Map<Integer, Integer> startTimes =
-                    getTaskStartTimes(context, nimbusData, topologyId,
-                            context.getOldAssignment(), assignments);
+            Map<Integer, Integer> startTimes = getTaskStartTimes(context, nimbusData, topologyId, context.getOldAssignment(), assignments);
 
-            String codeDir =
-                    StormConfig.masterStormdistRoot(nimbusData.getConf(),
-                            topologyId);
+            String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId);
 
-            assignment =
-                    new Assignment(codeDir, assignments, nodeHost, startTimes);
+            assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);
 
-            StormClusterState stormClusterState =
-                    nimbusData.getStormClusterState();
+            //  the topology binary changed.
+            if (event.isScaleTopology()){
+                assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology);
+            }
+            StormClusterState stormClusterState = nimbusData.getStormClusterState();
 
             stormClusterState.set_assignment(topologyId, assignment);
 
             // update task heartbeat's start time
-            NimbusUtils.updateTaskHbStartTime(nimbusData, assignment,
-                    topologyId);
+            NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);
 
             // @@@ TODO
 
@@ -547,14 +536,13 @@ public class TopologyAssign implements Runnable {
 
             NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId);
 
-            LOG.info("Successfully make assignment for topology id "
-                    + topologyId + ": " + assignment);
+            LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment);
         }
         return assignment;
     }
 
     private static Set<ResourceWorkerSlot> mkLocalAssignment(
-            TopologyAssignContext context) {
+            TopologyAssignContext context) throws Exception {
         Set<ResourceWorkerSlot> result = new HashSet<ResourceWorkerSlot>();
         Map<String, SupervisorInfo> cluster = context.getCluster();
         if (cluster.size() != 1)
@@ -565,7 +553,15 @@ public class TopologyAssign implements Runnable {
             supervisorId = entry.getKey();
             localSupervisor = entry.getValue();
         }
-        int port = localSupervisor.getAvailableWorkerPorts().iterator().next();
+        int port = -1;
+        if (localSupervisor.getAvailableWorkerPorts().iterator().hasNext()) {
+            port = localSupervisor.getAvailableWorkerPorts().iterator().next();
+        } else {
+            LOG.info(" amount of worker's ports is not enough");
+            throw new FailedAssignTopologyException(
+                    "Failed to make assignment " + ", due to no enough ports");
+        }
+
         ResourceWorkerSlot worker = new ResourceWorkerSlot(supervisorId, port);
         worker.setTasks(new HashSet<Integer>(context.getAllTaskIds()));
         worker.setHostname(localSupervisor.getHostName());
@@ -573,16 +569,8 @@ public class TopologyAssign implements Runnable {
         return result;
     }
 
-    /**
-     * @param existingAssignment
-     * @param taskWorkerSlot
-     * @return
-     * @throws Exception
-     */
-    public static Map<Integer, Integer> getTaskStartTimes(
-            TopologyAssignContext context, NimbusData nimbusData,
-            String topologyId, Assignment existingAssignment,
-            Set<ResourceWorkerSlot> workers) throws Exception {
+    public static Map<Integer, Integer> getTaskStartTimes(TopologyAssignContext context, NimbusData nimbusData, String topologyId,
+            Assignment existingAssignment, Set<ResourceWorkerSlot> workers) throws Exception {
 
         Map<Integer, Integer> startTimes = new TreeMap<Integer, Integer>();
 
@@ -600,8 +588,7 @@ public class TopologyAssign implements Runnable {
         Set<ResourceWorkerSlot> oldWorkers = new HashSet<ResourceWorkerSlot>();
 
         if (existingAssignment != null) {
-            Map<Integer, Integer> taskStartTimeSecs =
-                    existingAssignment.getTaskStartTimeSecs();
+            Map<Integer, Integer> taskStartTimeSecs = existingAssignment.getTaskStartTimeSecs();
             if (taskStartTimeSecs != null) {
                 startTimes.putAll(taskStartTimeSecs);
             }
@@ -616,23 +603,21 @@ public class TopologyAssign implements Runnable {
         int nowSecs = TimeUtils.current_time_secs();
         for (Integer changedTaskId : changedTaskIds) {
             startTimes.put(changedTaskId, nowSecs);
-            zkClusterState.remove_task_heartbeat(topologyId, changedTaskId);
+            NimbusUtils.removeTopologyTaskHb(nimbusData, topologyId, changedTaskId);
         }
 
         Set<Integer> removedTaskIds = getRemovedTaskIds(oldWorkers, workers);
         for (Integer removedTaskId : removedTaskIds) {
             startTimes.remove(removedTaskId);
-            zkClusterState.remove_task_heartbeat(topologyId, removedTaskId);
+            NimbusUtils.removeTopologyTaskHb(nimbusData, topologyId, removedTaskId);
         }
 
-        LOG.info("Task assignment has been changed: " + changedTaskIds
-                + ", removed tasks " + removedTaskIds);
+        LOG.info("Task assignment has been changed: " + changedTaskIds + ", removed tasks " + removedTaskIds);
         return startTimes;
     }
 
-    public static Map<String, String> getTopologyNodeHost(
-            Map<String, SupervisorInfo> supervisorMap,
-            Assignment existingAssignment, Set<ResourceWorkerSlot> workers) {
+    public static Map<String, String> getTopologyNodeHost(Map<String, SupervisorInfo> supervisorMap, Assignment existingAssignment,
+            Set<ResourceWorkerSlot> workers) {
 
         // the following is that remove unused node from allNodeHost
         Set<String> usedNodes = new HashSet<String>();
@@ -649,8 +634,7 @@ public class TopologyAssign implements Runnable {
         }
 
         // get alive supervisorMap Map<supervisorId, hostname>
-        Map<String, String> nodeHost =
-                SupervisorInfo.getNodeHost(supervisorMap);
+        Map<String, String> nodeHost = SupervisorInfo.getNodeHost(supervisorMap);
         if (nodeHost != null) {
             allNodeHost.putAll(nodeHost);
         }
@@ -661,8 +645,7 @@ public class TopologyAssign implements Runnable {
             if (allNodeHost.containsKey(supervisorId)) {
                 ret.put(supervisorId, allNodeHost.get(supervisorId));
             } else {
-                LOG.warn("Node " + supervisorId
-                        + " doesn't in the supervisor list");
+                LOG.warn("Node " + supervisorId + " doesn't in the supervisor list");
             }
         }
 
@@ -672,16 +655,12 @@ public class TopologyAssign implements Runnable {
     /**
      * get all taskids which are assigned newly or reassigned
      * 
-     * @param taskToWorkerSlot
-     * @param newtaskToWorkerSlot
      * @return Set<Integer> taskid which is assigned newly or reassigned
      */
-    public static Set<Integer> getNewOrChangedTaskIds(
-            Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
+    public static Set<Integer> getNewOrChangedTaskIds(Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
 
         Set<Integer> rtn = new HashSet<Integer>();
-        HashMap<String, ResourceWorkerSlot> workerPortMap =
-                HostPortToWorkerMap(oldWorkers);
+        HashMap<String, ResourceWorkerSlot> workerPortMap = HostPortToWorkerMap(oldWorkers);
         for (ResourceWorkerSlot worker : workers) {
             ResourceWorkerSlot oldWorker = workerPortMap.get(worker.getHostPort());
             if (oldWorker != null) {
@@ -691,14 +670,15 @@ public class TopologyAssign implements Runnable {
                         rtn.add(task);
                 }
             } else {
-                rtn.addAll(worker.getTasks());
+                if (worker.getTasks() != null) {
+                    rtn.addAll(worker.getTasks());
+                }
             }
         }
         return rtn;
     }
 
-    public static Set<Integer> getRemovedTaskIds(
-            Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
+    public static Set<Integer> getRemovedTaskIds(Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
 
         Set<Integer> rtn = new HashSet<Integer>();
         Set<Integer> oldTasks = getTaskSetFromWorkerSet(oldWorkers);
@@ -711,8 +691,7 @@ public class TopologyAssign implements Runnable {
         return rtn;
     }
 
-    private static Set<Integer> getTaskSetFromWorkerSet(
-            Set<ResourceWorkerSlot> workers) {
+    private static Set<Integer> getTaskSetFromWorkerSet(Set<ResourceWorkerSlot> workers) {
         Set<Integer> rtn = new HashSet<Integer>();
         for (ResourceWorkerSlot worker : workers) {
             rtn.addAll(worker.getTasks());
@@ -720,10 +699,8 @@ public class TopologyAssign implements Runnable {
         return rtn;
     }
 
-    private static HashMap<String, ResourceWorkerSlot> HostPortToWorkerMap(
-            Set<ResourceWorkerSlot> workers) {
-        HashMap<String, ResourceWorkerSlot> rtn =
-                new HashMap<String, ResourceWorkerSlot>();
+    private static HashMap<String, ResourceWorkerSlot> HostPortToWorkerMap(Set<ResourceWorkerSlot> workers) {
+        HashMap<String, ResourceWorkerSlot> rtn = new HashMap<String, ResourceWorkerSlot>();
         for (ResourceWorkerSlot worker : workers) {
             rtn.put(worker.getHostPort(), worker);
         }
@@ -731,17 +708,14 @@ public class TopologyAssign implements Runnable {
     }
 
     /**
-     * sort slots, the purpose is to ensure that the tasks are assigned in
-     * balancing
+     * sort slots, the purpose is to ensure that the tasks are assigned in balancing
      * 
      * @param allSlots
      * @return List<WorkerSlot>
      */
-    public static List<WorkerSlot> sortSlots(Set<WorkerSlot> allSlots,
-            int needSlotNum) {
+    public static List<WorkerSlot> sortSlots(Set<WorkerSlot> allSlots, int needSlotNum) {
 
-        Map<String, List<WorkerSlot>> nodeMap =
-                new HashMap<String, List<WorkerSlot>>();
+        Map<String, List<WorkerSlot>> nodeMap = new HashMap<String, List<WorkerSlot>>();
 
         // group by first
         for (WorkerSlot np : allSlots) {
@@ -778,8 +752,7 @@ public class TopologyAssign implements Runnable {
         }
 
         // interleave
-        List<List<WorkerSlot>> splitup =
-                new ArrayList<List<WorkerSlot>>(nodeMap.values());
+        List<List<WorkerSlot>> splitup = new ArrayList<List<WorkerSlot>>(nodeMap.values());
 
         Collections.sort(splitup, new Comparator<List<WorkerSlot>>() {
 
@@ -801,13 +774,8 @@ public class TopologyAssign implements Runnable {
 
     /**
      * Get unstopped slots from alive task list
-     * 
-     * @param aliveAssigned
-     * @param supInfos
-     * @return
      */
-    public Set<Integer> getUnstoppedSlots(Set<Integer> aliveTasks,
-            Map<String, SupervisorInfo> supInfos, Assignment existAssignment) {
+    public Set<Integer> getUnstoppedSlots(Set<Integer> aliveTasks, Map<String, SupervisorInfo> supInfos, Assignment existAssignment) {
         Set<Integer> ret = new HashSet<Integer>();
 
         Set<ResourceWorkerSlot> oldWorkers = existAssignment.getWorkers();
@@ -835,8 +803,7 @@ public class TopologyAssign implements Runnable {
 
     }
 
-    private Set<ResourceWorkerSlot> getUnstoppedWorkers(
-            Set<Integer> aliveTasks, Assignment existAssignment) {
+    private Set<ResourceWorkerSlot> getUnstoppedWorkers(Set<Integer> aliveTasks, Assignment existAssignment) {
         Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
         for (ResourceWorkerSlot worker : existAssignment.getWorkers()) {
             boolean alive = true;
@@ -860,12 +827,9 @@ public class TopologyAssign implements Runnable {
      * @param stormClusterState
      * @throws Exception
      */
-    public static void getFreeSlots(
-            Map<String, SupervisorInfo> supervisorInfos,
-            StormClusterState stormClusterState) throws Exception {
+    public static void getFreeSlots(Map<String, SupervisorInfo> supervisorInfos, StormClusterState stormClusterState) throws Exception {
 
-        Map<String, Assignment> assignments =
-                Cluster.get_all_assignment(stormClusterState, null);
+        Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, null);
 
         for (Entry<String, Assignment> entry : assignments.entrySet()) {
             String topologyId = entry.getKey();
@@ -875,8 +839,7 @@ public class TopologyAssign implements Runnable {
 
             for (ResourceWorkerSlot worker : workers) {
 
-                SupervisorInfo supervisorInfo =
-                        supervisorInfos.get(worker.getNodeId());
+                SupervisorInfo supervisorInfo = supervisorInfos.get(worker.getNodeId());
                 if (supervisorInfo == null) {
                     // the supervisor is dead
                     continue;
@@ -888,31 +851,20 @@ public class TopologyAssign implements Runnable {
     }
 
     /**
-     * find all alived taskid Does not assume that clocks are synchronized. Task
-     * heartbeat is only used so that nimbus knows when it's received a new
-     * heartbeat. All timing is done by nimbus and tracked through
-     * task-heartbeat-cache
+     * find all alived taskid Does not assume that clocks are synchronized. Task heartbeat is only used so that nimbus knows when it's received a new heartbeat.
+     * All timing is done by nimbus and tracked through task-heartbeat-cache
      * 
-     * @param conf
-     * @param topologyId
-     * @param stormClusterState
-     * @param taskIds
-     * @param taskStartTimes
-     * @param taskHeartbeatsCache --Map<topologyId, Map<taskid,
-     *            Map<tkHbCacheTime, time>>>
      * @return Set<Integer> : taskid
      * @throws Exception
      */
-    public Set<Integer> getAliveTasks(String topologyId, Set<Integer> taskIds)
-            throws Exception {
+    public Set<Integer> getAliveTasks(String topologyId, Set<Integer> taskIds) throws Exception {
 
         Set<Integer> aliveTasks = new HashSet<Integer>();
 
         // taskIds is the list from ZK /ZK-DIR/tasks/topologyId
         for (int taskId : taskIds) {
 
-            boolean isDead =
-                    NimbusUtils.isTaskDead(nimbusData, topologyId, taskId);
+            boolean isDead = NimbusUtils.isTaskDead(nimbusData, topologyId, taskId);
             if (isDead == false) {
                 aliveTasks.add(taskId);
             }
@@ -925,24 +877,20 @@ public class TopologyAssign implements Runnable {
     /**
      * Backup the toplogy's Assignment to ZK
      * 
-     * @@@ Question Do we need to do backup operation every time?
      * @param assignment
      * @param event
+     * @@@ Question Do we need to do backup operation every time?
      */
-    public void backupAssignment(Assignment assignment,
-            TopologyAssignEvent event) {
+    public void backupAssignment(Assignment assignment, TopologyAssignEvent event) {
         String topologyId = event.getTopologyId();
         String topologyName = event.getTopologyName();
         try {
 
-            StormClusterState zkClusterState =
-                    nimbusData.getStormClusterState();
+            StormClusterState zkClusterState = nimbusData.getStormClusterState();
             // one little problem, get tasks twice when assign one topology
-            Map<Integer, String> tasks =
-                            Cluster.get_all_task_component(zkClusterState, topologyId, null);
+            Map<Integer, String> tasks = Cluster.get_all_task_component(zkClusterState, topologyId, null);
 
-            Map<String, List<Integer>> componentTasks =
-                    JStormUtils.reverse_map(tasks);
+            Map<String, List<Integer>> componentTasks = JStormUtils.reverse_map(tasks);
 
             for (Entry<String, List<Integer>> entry : componentTasks.entrySet()) {
                 List<Integer> keys = entry.getValue();
@@ -951,31 +899,24 @@ public class TopologyAssign implements Runnable {
 
             }
 
-            AssignmentBak assignmentBak =
-                    new AssignmentBak(componentTasks, assignment);
+            AssignmentBak assignmentBak = new AssignmentBak(componentTasks, assignment);
             zkClusterState.backup_assignment(topologyName, assignmentBak);
 
         } catch (Exception e) {
-            LOG.warn("Failed to backup " + topologyId + " assignment "
-                    + assignment, e);
+            LOG.warn("Failed to backup " + topologyId + " assignment " + assignment, e);
         }
     }
 
-    private void getAliveSupervsByHb(
-            Map<String, SupervisorInfo> supervisorInfos, Map conf) {
+    private void getAliveSupervsByHb(Map<String, SupervisorInfo> supervisorInfos, Map conf) {
         int currentTime = TimeUtils.current_time_secs();
-        int hbTimeout =
-                JStormUtils.parseInt(
-                        conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS),
-                        (JStormUtils.MIN_1 * 3));
+        int hbTimeout = JStormUtils.parseInt(conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS), (JStormUtils.MIN_1 * 3));
         Set<String> supervisorTobeRemoved = new HashSet<String>();
 
         for (Entry<String, SupervisorInfo> entry : supervisorInfos.entrySet()) {
             SupervisorInfo supInfo = entry.getValue();
             int lastReportTime = supInfo.getTimeSecs();
             if ((currentTime - lastReportTime) > hbTimeout) {
-                LOG.warn("Supervisor-" + supInfo.getHostName()
-                        + " is dead. lastReportTime=" + lastReportTime);
+                LOG.warn("Supervisor-" + supInfo.getHostName() + " is dead. lastReportTime=" + lastReportTime);
                 supervisorTobeRemoved.add(entry.getKey());
             }
         }
@@ -989,8 +930,6 @@ public class TopologyAssign implements Runnable {
      * @param args
      */
     public static void main(String[] args) {
-        // TODO Auto-generated method stub
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java
index 8725918..a0bf9b9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java
@@ -25,7 +25,7 @@ import com.alibaba.jstorm.cluster.StormStatus;
 public class TopologyAssignEvent {
 
     // unit is minutes
-    private static final int DEFAULT_WAIT_TIME = 2;
+    private static final int DEFAULT_WAIT_TIME = 5;
     private String topologyId;
     private String topologyName; // if this field has been set, it is create
     private String group;
@@ -37,6 +37,14 @@ public class TopologyAssignEvent {
     private CountDownLatch latch = new CountDownLatch(1);
     private boolean isSuccess = false;
     private String errorMsg;
+    private boolean isScaleTopology = false;
+
+    public void setScaleTopology(boolean isScaleTopology){
+        this.isScaleTopology = isScaleTopology;
+    }
+    public boolean isScaleTopology(){
+        return isScaleTopology;
+    }
 
     public String getTopologyId() {
         return topologyId;