You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:44 UTC

[06/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java
index 8170ae2..496c2fc 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java
@@ -40,12 +40,8 @@ public class ComponentNumSelector extends AbstractSelector {
             @Override
             public int compare(ResourceWorkerSlot o1, ResourceWorkerSlot o2) {
                 // TODO Auto-generated method stub
-                int o1Num =
-                        context.getComponentNumOnSupervisor(o1.getNodeId(),
-                                name);
-                int o2Num =
-                        context.getComponentNumOnSupervisor(o2.getNodeId(),
-                                name);
+                int o1Num = context.getComponentNumOnSupervisor(o1.getNodeId(), name);
+                int o2Num = context.getComponentNumOnSupervisor(o2.getNodeId(), name);
                 if (o1Num == o2Num)
                     return 0;
                 return o1Num > o2Num ? 1 : -1;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java
index 49eb447..f7f4f5b 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java
@@ -40,12 +40,8 @@ public class InputComponentNumSelector extends AbstractSelector {
             @Override
             public int compare(ResourceWorkerSlot o1, ResourceWorkerSlot o2) {
                 // TODO Auto-generated method stub
-                int o1Num =
-                        context.getInputComponentNumOnSupervisor(
-                                o1.getNodeId(), name);
-                int o2Num =
-                        context.getInputComponentNumOnSupervisor(
-                                o2.getNodeId(), name);
+                int o1Num = context.getInputComponentNumOnSupervisor(o1.getNodeId(), name);
+                int o2Num = context.getInputComponentNumOnSupervisor(o2.getNodeId(), name);
                 if (o1Num == o2Num)
                     return 0;
                 return o1Num > o2Num ? -1 : 1;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java
index adc8b29..6ef5736 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java
@@ -22,6 +22,5 @@ import java.util.List;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
 
 public interface Selector {
-    public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result,
-            String name);
+    public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result, String name);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java
index f01ee9a..8643d6a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java
@@ -21,8 +21,7 @@ import java.util.Comparator;
 
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
 
-public abstract class WorkerComparator implements
-        Comparator<ResourceWorkerSlot> {
+public abstract class WorkerComparator implements Comparator<ResourceWorkerSlot> {
 
     protected String name;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java
index f81d072..33f762a 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java
@@ -24,33 +24,39 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 public class TaskAssignContext {
-
+    private final Map<Integer, String> taskToComponent;
+    
     private final Map<String, List<ResourceWorkerSlot>> supervisorToWorker;
 
     private final Map<String, Set<String>> relationship;
 
     // Map<worker, Map<component name, assigned task num in this worker>
-    private final Map<ResourceWorkerSlot, Map<String, Integer>> workerToComponentNum =
-            new HashMap<ResourceWorkerSlot, Map<String, Integer>>();
+    private final Map<ResourceWorkerSlot, Map<String, Integer>> workerToComponentNum = new HashMap<ResourceWorkerSlot, Map<String, Integer>>();
 
     // Map<available worker, assigned task num in this worker>
-    private final Map<ResourceWorkerSlot, Integer> workerToTaskNum =
-            new HashMap<ResourceWorkerSlot, Integer>();
+    private final Map<ResourceWorkerSlot, Integer> workerToTaskNum = new HashMap<ResourceWorkerSlot, Integer>();
 
-    private final Map<String, ResourceWorkerSlot> HostPortToWorkerMap =
-            new HashMap<String, ResourceWorkerSlot>();
+    private final Map<String, ResourceWorkerSlot> HostPortToWorkerMap = new HashMap<String, ResourceWorkerSlot>();
 
-    public TaskAssignContext(
-            Map<String, List<ResourceWorkerSlot>> supervisorToWorker,
-            Map<String, Set<String>> relationship) {
+    public TaskAssignContext(Map<String, List<ResourceWorkerSlot>> supervisorToWorker, Map<String, Set<String>> relationship, Map<Integer, String> taskToComponent) {
+        this.taskToComponent = taskToComponent;
         this.supervisorToWorker = supervisorToWorker;
         this.relationship = relationship;
 
-        for (Entry<String, List<ResourceWorkerSlot>> entry : supervisorToWorker
-                .entrySet()) {
+        for (Entry<String, List<ResourceWorkerSlot>> entry : supervisorToWorker.entrySet()) {
             for (ResourceWorkerSlot worker : entry.getValue()) {
-                workerToTaskNum.put(worker, 0);
+                workerToTaskNum.put(worker, (worker.getTasks() != null ? worker.getTasks().size() : 0));
                 HostPortToWorkerMap.put(worker.getHostPort(), worker);
+   
+                if (worker.getTasks() != null) {
+                    Map<String, Integer> componentToNum = new HashMap<String, Integer>();
+                    for (Integer taskId : worker.getTasks()) {
+                        String componentId = taskToComponent.get(taskId);
+                        int num = componentToNum.get(componentId) == null ? 0 : componentToNum.get(componentId);
+                        componentToNum.put(componentId, ++num);
+                    }
+                    workerToComponentNum.put(worker, componentToNum);
+                }
             }
         }
     }
@@ -115,8 +121,7 @@ public class TaskAssignContext {
         return result;
     }
 
-    public int getInputComponentNumOnWorker(ResourceWorkerSlot worker,
-            String name) {
+    public int getInputComponentNumOnWorker(ResourceWorkerSlot worker, String name) {
         int result = 0;
         for (String component : relationship.get(name))
             result = result + this.getComponentNumOnWorker(worker, component);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java
index 7131463..a359972 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java
@@ -41,20 +41,15 @@ public class TaskScheduler {
 
     public static Logger LOG = LoggerFactory.getLogger(TaskScheduler.class);
 
-    public static final String ACKER_NAME = "__acker";
-
     private final TaskAssignContext taskContext;
 
-    private List<ResourceWorkerSlot> assignments =
-            new ArrayList<ResourceWorkerSlot>();
+    private List<ResourceWorkerSlot> assignments = new ArrayList<ResourceWorkerSlot>();
 
     private int workerNum;
 
     /**
-     * For balance purpose, default scheduler is trying to assign the same
-     * number of tasks to a worker. e.g. There are 4 tasks and 3 available
-     * workers. Each worker will be assigned one task first. And then one worker
-     * is chosen for the last one.
+     * For balance purpose, default scheduler is trying to assign the same number of tasks to a worker. e.g. There are 4 tasks and 3 available workers. Each
+     * worker will be assigned one task first. And then one worker is chosen for the last one.
      */
     private int avgTaskNum;
     private int leftTaskNum;
@@ -69,48 +64,91 @@ public class TaskScheduler {
 
     private Selector totalTaskNumSelector;
 
-    public TaskScheduler(DefaultTopologyAssignContext context,
-            Set<Integer> tasks, List<ResourceWorkerSlot> workers) {
+    public TaskScheduler(DefaultTopologyAssignContext context, Set<Integer> tasks, List<ResourceWorkerSlot> workers) {
         this.tasks = tasks;
-        LOG.info("Tasks " + tasks + " is going to be assigned in workers "
-                + workers);
+        LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers);
         this.context = context;
         this.taskContext =
-                new TaskAssignContext(this.buildSupervisorToWorker(workers),
-                        Common.buildSpoutOutoputAndBoltInputMap(context));
+                new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent());
         this.componentSelector = new ComponentNumSelector(taskContext);
-        this.inputComponentSelector =
-                new InputComponentNumSelector(taskContext);
+        this.inputComponentSelector = new InputComponentNumSelector(taskContext);
         this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext);
         if (tasks.size() == 0)
             return;
-        setTaskNum(tasks.size(), workerNum);
+        if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){
+            // warning ! it doesn't consider HA TM now!!
+            if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) {
+                assignForTopologyMaster();
+            }
+        }
+
+        int taskNum = tasks.size();
+        Map<ResourceWorkerSlot, Integer> workerSlotIntegerMap = taskContext.getWorkerToTaskNum();
+        Set<ResourceWorkerSlot> preAssignWorkers = new HashSet<ResourceWorkerSlot>();
+        for (Entry<ResourceWorkerSlot, Integer> worker : workerSlotIntegerMap.entrySet()) {
+            if (worker.getValue() > 0) {
+                taskNum += worker.getValue();
+                preAssignWorkers.add(worker.getKey());
+            }
+        }
+        setTaskNum(taskNum, workerNum);
+
+        // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers.
+        // Remove the workers which have been assigned with enough workers.
+        for (ResourceWorkerSlot worker : preAssignWorkers) {
+            Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
+            if (doneWorkers != null) {
+                for (ResourceWorkerSlot doneWorker : doneWorkers) {
+                    taskNum -= doneWorker.getTasks().size();
+                    workerNum--;
+                }
+            }
+        }
+        setTaskNum(taskNum, workerNum);
 
         // For Scale-out case, the old assignment should be kept.
-        if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
-                && context.isReassign() == false) {
-            keepAssignment(context.getOldAssignment().getWorkers());
+        if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) {
+            keepAssignment(taskNum, context.getOldAssignment().getWorkers());
         }
     }
 
-    private void keepAssignment(Set<ResourceWorkerSlot> keepAssignments) {
+    private void keepAssignment(int taskNum, Set<ResourceWorkerSlot> keepAssignments) {
         Set<Integer> keepTasks = new HashSet<Integer>();
+        ResourceWorkerSlot tmWorker = null;
         for (ResourceWorkerSlot worker : keepAssignments) {
+            if (worker.getTasks().contains(context.getTopologyMasterTaskId()))
+                tmWorker = worker;
             for (Integer taskId : worker.getTasks()) {
                 if (tasks.contains(taskId)) {
-                    ResourceWorkerSlot contextWorker =
-                            taskContext.getWorker(worker);
+                    ResourceWorkerSlot contextWorker = taskContext.getWorker(worker);
                     if (contextWorker != null) {
-                        String componentName =
-                                context.getTaskToComponent().get(taskId);
-                        updateAssignedTasksOfWorker(taskId, contextWorker);
-                        updateComponentsNumOfWorker(componentName,
-                                contextWorker);
-                        keepTasks.add(taskId);
+                        if (tmWorker != null && tmWorker.getTasks().contains(taskId) && context.getAssignSingleWorkerForTM() ) {
+                            if (context.getTopologyMasterTaskId() == taskId){
+                                updateAssignedTasksOfWorker(taskId, contextWorker);
+                                taskContext.getWorkerToTaskNum().remove(contextWorker);
+                                contextWorker.getTasks().clear();
+                                contextWorker.getTasks().add(taskId);
+                                assignments.add(contextWorker);
+                                tasks.remove(taskId);
+                                taskNum--;
+                                workerNum--;
+                                LOG.info("assignForTopologyMaster: " + contextWorker);
+                            }
+                        }else {
+                            String componentName = context.getTaskToComponent().get(taskId);
+                            updateAssignedTasksOfWorker(taskId, contextWorker);
+                            updateComponentsNumOfWorker(componentName, contextWorker);
+                            keepTasks.add(taskId);
+                        }
                     }
                 }
             }
         }
+        if ( tmWorker != null){
+            setTaskNum(taskNum, workerNum);
+            keepAssignments.remove(tmWorker);
+        }
+
 
         // Try to find the workers which have been assigned too much tasks
         // If found, remove the workers from worker resource pool and update
@@ -118,11 +156,9 @@ public class TaskScheduler {
         int doneAssignedTaskNum = 0;
         while (true) {
             boolean found = false;
-            Set<ResourceWorkerSlot> doneAssignedWorkers =
-                    new HashSet<ResourceWorkerSlot>();
+            Set<ResourceWorkerSlot> doneAssignedWorkers = new HashSet<ResourceWorkerSlot>();
             for (ResourceWorkerSlot worker : keepAssignments) {
-                ResourceWorkerSlot contextWorker =
-                        taskContext.getWorker(worker);
+                ResourceWorkerSlot contextWorker = taskContext.getWorker(worker);
                 if (contextWorker != null && isTaskFullForWorker(contextWorker)) {
                     found = true;
                     workerNum--;
@@ -135,7 +171,8 @@ public class TaskScheduler {
             }
 
             if (found) {
-                setTaskNum((tasks.size() - doneAssignedTaskNum), workerNum);
+                taskNum -= doneAssignedTaskNum;
+                setTaskNum(taskNum, workerNum);
                 keepAssignments.removeAll(doneAssignedWorkers);
             } else {
                 break;
@@ -150,45 +187,89 @@ public class TaskScheduler {
         Set<Integer> tasks = worker.getTasks();
 
         if (tasks != null) {
-            if ((leftTaskNum == 0 && tasks.size() >= avgTaskNum)
-                    || (leftTaskNum > 0 && tasks.size() >= (avgTaskNum + 1))) {
+            if ((leftTaskNum <= 0 && tasks.size() >= avgTaskNum) || (leftTaskNum > 0 && tasks.size() >= (avgTaskNum + 1))) {
                 ret = true;
             }
         }
         return ret;
     }
 
+    private Set<ResourceWorkerSlot> getRestAssignedWorkers() {
+        Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
+        for (ResourceWorkerSlot worker : taskContext.getWorkerToTaskNum().keySet()) {
+            if (worker.getTasks() != null && worker.getTasks().size() > 0) {
+                ret.add(worker);
+            }
+        }
+        return ret;
+    }
+
     public List<ResourceWorkerSlot> assign() {
-        if (tasks.size() == 0)
+        if (tasks.size() == 0) {
+            assignments.addAll(getRestAssignedWorkers());
             return assignments;
+        }
 
         // Firstly, assign workers to the components which are configured
         // by "task.on.differ.node"
         Set<Integer> assignedTasks = assignForDifferNodeTask();
 
-        // Assign for the tasks except acker
+        // Assign for the tasks except system tasks
         tasks.removeAll(assignedTasks);
-        Set<Integer> ackers = new HashSet<Integer>();
+        Map<Integer, String> systemTasks = new HashMap<Integer, String>();
         for (Integer task : tasks) {
             String name = context.getTaskToComponent().get(task);
-            if (name.equals(TaskScheduler.ACKER_NAME)) {
-                ackers.add(task);
+            if (Common.isSystemComponent(name)) {
+                systemTasks.put(task, name);
                 continue;
             }
             assignForTask(name, task);
         }
-
-        // At last, make the assignment for acker
-        for (Integer task : ackers) {
-            assignForTask(TaskScheduler.ACKER_NAME, task);
+        
+        /*
+         * At last, make the assignment for system component, e.g. acker, topology master...
+         */
+        for (Entry<Integer, String> entry : systemTasks.entrySet()) {
+            assignForTask(entry.getValue(), entry.getKey());
         }
+
+        assignments.addAll(getRestAssignedWorkers());
         return assignments;
     }
 
+    private void assignForTopologyMaster() {
+        int taskId = context.getTopologyMasterTaskId();
+
+        // Try to find a worker which is in a supervisor with most workers,
+        // to avoid the balance problem when the assignment for other workers.
+        ResourceWorkerSlot workerAssigned = null;
+        int workerNumOfSuperv = 0;
+        for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){
+            List<ResourceWorkerSlot> workers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId());
+            if (workers != null && workers.size() > workerNumOfSuperv) {
+                for (ResourceWorkerSlot worker : workers) {
+                    Set<Integer> tasks = worker.getTasks();
+                    if (tasks == null || tasks.size() == 0) {
+                        workerAssigned = worker;
+                        workerNumOfSuperv = workers.size();
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (workerAssigned == null)
+            throw new FailedAssignTopologyException("there's no enough workers for the assignment of topology master");
+        updateAssignedTasksOfWorker(taskId, workerAssigned);
+        taskContext.getWorkerToTaskNum().remove(workerAssigned);
+        assignments.add(workerAssigned);
+        tasks.remove(taskId);
+        workerNum--;
+        LOG.info("assignForTopologyMaster, assignments=" + assignments);
+    }
+
     private void assignForTask(String name, Integer task) {
-        ResourceWorkerSlot worker =
-                chooseWorker(name, new ArrayList<ResourceWorkerSlot>(
-                        taskContext.getWorkerToTaskNum().keySet()));
+        ResourceWorkerSlot worker = chooseWorker(name, new ArrayList<ResourceWorkerSlot>(taskContext.getWorkerToTaskNum().keySet()));
         pushTaskToWorker(task, name, worker);
     }
 
@@ -201,98 +282,97 @@ public class TaskScheduler {
         }
         for (Integer task : ret) {
             String name = context.getTaskToComponent().get(task);
-            ResourceWorkerSlot worker =
-                    chooseWorker(name, getDifferNodeTaskWokers(name));
+            ResourceWorkerSlot worker = chooseWorker(name, getDifferNodeTaskWokers(name));
+            LOG.info("Due to task.on.differ.node, push task-{} to {}:{}", task, worker.getHostname(), worker.getPort());
             pushTaskToWorker(task, name, worker);
         }
         return ret;
     }
 
-    private Map<String, List<ResourceWorkerSlot>> buildSupervisorToWorker(
-            List<ResourceWorkerSlot> workers) {
-        Map<String, List<ResourceWorkerSlot>> supervisorToWorker =
-                new HashMap<String, List<ResourceWorkerSlot>>();
+    private Map<String, List<ResourceWorkerSlot>> buildSupervisorToWorker(List<ResourceWorkerSlot> workers) {
+        Map<String, List<ResourceWorkerSlot>> supervisorToWorker = new HashMap<String, List<ResourceWorkerSlot>>();
         for (ResourceWorkerSlot worker : workers) {
-            if (worker.getTasks() == null || worker.getTasks().size() == 0) {
-                List<ResourceWorkerSlot> supervisor =
-                        supervisorToWorker.get(worker.getNodeId());
-                if (supervisor == null) {
-                    supervisor = new ArrayList<ResourceWorkerSlot>();
-                    supervisorToWorker.put(worker.getNodeId(), supervisor);
-                }
-                supervisor.add(worker);
-            } else {
-                assignments.add(worker);
+                List<ResourceWorkerSlot> supervisor = supervisorToWorker.get(worker.getNodeId());
+            if (supervisor == null) {
+                supervisor = new ArrayList<ResourceWorkerSlot>();
+                supervisorToWorker.put(worker.getNodeId(), supervisor);
             }
+            supervisor.add(worker);
         }
-        this.workerNum = workers.size() - assignments.size();
+        this.workerNum = workers.size();
         return supervisorToWorker;
     }
 
-    private ResourceWorkerSlot chooseWorker(String name,
-            List<ResourceWorkerSlot> workers) {
-        List<ResourceWorkerSlot> result =
-                componentSelector.select(workers, name);
+    private ResourceWorkerSlot chooseWorker(String name, List<ResourceWorkerSlot> workers) {
+        List<ResourceWorkerSlot> result = componentSelector.select(workers, name);
         result = totalTaskNumSelector.select(result, name);
-        if (name.equals(TaskScheduler.ACKER_NAME))
+        if (Common.isSystemComponent(name))
             return result.iterator().next();
         result = inputComponentSelector.select(result, name);
         return result.iterator().next();
     }
 
-    private void pushTaskToWorker(Integer task, String name,
-            ResourceWorkerSlot worker) {
+    private void pushTaskToWorker(Integer task, String name, ResourceWorkerSlot worker) {
         LOG.debug("Push task-" + task + " to worker-" + worker.getPort());
         int taskNum = updateAssignedTasksOfWorker(task, worker);
 
+        removeWorkerFromSrcPool(taskNum, worker);
+
+        updateComponentsNumOfWorker(name, worker);
+    }
+
+    private int updateAssignedTasksOfWorker(Integer task, ResourceWorkerSlot worker) {
+        int ret = 0;
+        Set<Integer> tasks = worker.getTasks();
+        if (tasks == null) {
+            tasks = new HashSet<Integer>();
+            worker.setTasks(tasks);
+        }
+        tasks.add(task);
+
+        ret = taskContext.getWorkerToTaskNum().get(worker);
+        taskContext.getWorkerToTaskNum().put(worker, ++ret);
+        return ret;
+    }
+
+    /*
+     * Remove the worker from source worker pool, if the worker is assigned with enough tasks,
+     */
+    private Set<ResourceWorkerSlot> removeWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) {
+        Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
+
         if (leftTaskNum <= 0) {
-            if (taskNum == avgTaskNum) {
+            if (taskNum >= avgTaskNum) {
                 taskContext.getWorkerToTaskNum().remove(worker);
                 assignments.add(worker);
+                ret.add(worker);
             }
         } else {
-            if (taskNum == (avgTaskNum + 1)) {
+            if (taskNum > avgTaskNum ) {
                 taskContext.getWorkerToTaskNum().remove(worker);
-                leftTaskNum--;
+                leftTaskNum = leftTaskNum -(taskNum -avgTaskNum);
                 assignments.add(worker);
+                ret.add(worker);
             }
             if (leftTaskNum <= 0) {
-                List<ResourceWorkerSlot> needDelete =
-                        new ArrayList<ResourceWorkerSlot>();
-                for (Entry<ResourceWorkerSlot, Integer> entry : taskContext
-                        .getWorkerToTaskNum().entrySet()) {
+                List<ResourceWorkerSlot> needDelete = new ArrayList<ResourceWorkerSlot>();
+                for (Entry<ResourceWorkerSlot, Integer> entry : taskContext.getWorkerToTaskNum().entrySet()) {
                     if (entry.getValue() == avgTaskNum)
                         needDelete.add(entry.getKey());
                 }
                 for (ResourceWorkerSlot workerToDelete : needDelete) {
                     taskContext.getWorkerToTaskNum().remove(workerToDelete);
                     assignments.add(workerToDelete);
+                    ret.add(workerToDelete);
                 }
             }
         }
 
-        updateComponentsNumOfWorker(name, worker);
-    }
-
-    private int updateAssignedTasksOfWorker(Integer task,
-            ResourceWorkerSlot worker) {
-        int ret = 0;
-        Set<Integer> tasks = worker.getTasks();
-        if (tasks == null) {
-            tasks = new HashSet<Integer>();
-            worker.setTasks(tasks);
-        }
-        tasks.add(task);
-
-        ret = taskContext.getWorkerToTaskNum().get(worker);
-        taskContext.getWorkerToTaskNum().put(worker, ++ret);
         return ret;
     }
 
-    private void updateComponentsNumOfWorker(String name,
-            ResourceWorkerSlot worker) {
-        Map<String, Integer> components =
-                taskContext.getWorkerToComponentNum().get(worker);
+    private void updateComponentsNumOfWorker(String name, ResourceWorkerSlot worker) {
+        Map<String, Integer> components = taskContext.getWorkerToComponentNum().get(worker);
         if (components == null) {
             components = new HashMap<String, Integer>();
             taskContext.getWorkerToComponentNum().put(worker, components);
@@ -308,11 +388,9 @@ public class TaskScheduler {
         if (taskNum >= 0 && workerNum > 0) {
             this.avgTaskNum = taskNum / workerNum;
             this.leftTaskNum = taskNum % workerNum;
-            LOG.debug("avgTaskNum=" + avgTaskNum + ", leftTaskNum="
-                    + leftTaskNum);
+            LOG.debug("avgTaskNum=" + avgTaskNum + ", leftTaskNum=" + leftTaskNum);
         } else {
-            LOG.debug("Illegal parameters, taskNum=" + taskNum + ", workerNum="
-                    + workerNum);
+            LOG.debug("Illegal parameters, taskNum=" + taskNum + ", workerNum=" + workerNum);
         }
     }
 
@@ -320,15 +398,12 @@ public class TaskScheduler {
         List<ResourceWorkerSlot> workers = new ArrayList<ResourceWorkerSlot>();
         workers.addAll(taskContext.getWorkerToTaskNum().keySet());
 
-        for (Entry<String, List<ResourceWorkerSlot>> entry : taskContext
-                .getSupervisorToWorker().entrySet()) {
+        for (Entry<String, List<ResourceWorkerSlot>> entry : taskContext.getSupervisorToWorker().entrySet()) {
             if (taskContext.getComponentNumOnSupervisor(entry.getKey(), name) != 0)
                 workers.removeAll(entry.getValue());
         }
         if (workers.size() == 0)
-            throw new FailedAssignTopologyException(
-                    "there's no enough supervisor for making component: "
-                            + name + " 's tasks on different node");
+            throw new FailedAssignTopologyException("there's no enough supervisor for making component: " + name + " 's tasks on different node");
         return workers;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java
index c85d723..08c4730 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java
@@ -28,6 +28,7 @@ import backtype.storm.Config;
 
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.client.WorkerAssignment;
+import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
 import com.alibaba.jstorm.schedule.TopologyAssignContext;
 import com.alibaba.jstorm.utils.FailedAssignTopologyException;
@@ -35,353 +36,369 @@ import com.alibaba.jstorm.utils.NetWorkUtils;
 
 public class WorkerScheduler {
 
-    public static Logger LOG = LoggerFactory.getLogger(WorkerScheduler.class);
-
-    private static WorkerScheduler instance;
-
-    private WorkerScheduler() {
-
-    }
-
-    public static WorkerScheduler getInstance() {
-        if (instance == null) {
-            instance = new WorkerScheduler();
-        }
-        return instance;
-    }
-
-    public List<ResourceWorkerSlot> getAvailableWorkers(
-            DefaultTopologyAssignContext context, Set<Integer> needAssign,
-            int num) {
-        int workersNum = getWorkersNum(context, num);
-        if (workersNum == 0) {
-            throw new FailedAssignTopologyException("there's no enough worker");
-        }
-        List<ResourceWorkerSlot> assignedWorkers =
-                new ArrayList<ResourceWorkerSlot>();
-        // userdefine assignments
-        getRightWorkers(
-                context,
-                needAssign,
-                assignedWorkers,
-                workersNum,
-                getUserDefineWorkers(context, ConfigExtension
-                        .getUserDefineAssignment(context.getStormConf())));
-        // old assignments
-        if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
-            getRightWorkers(context, needAssign, assignedWorkers, workersNum,
-                    context.getOldWorkers());
-        } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
-                && context.isReassign() == false) {
-            int cnt = 0;
-            for (ResourceWorkerSlot worker : context.getOldWorkers()) {
-                if (cnt < workersNum) {
-                    ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
-                    resFreeWorker.setPort(worker.getPort());
-                    resFreeWorker.setHostname(worker.getHostname());
-                    resFreeWorker.setNodeId(worker.getNodeId());
-                    assignedWorkers.add(resFreeWorker);
-                    cnt++;
-                } else {
-                    break;
-                }
-            }
-        }
-        int defaultWorkerNum =
-                Math.min(workersNum - assignedWorkers.size(), needAssign.size());
-        LOG.info("Get workers from user define and old assignments: "
-                + assignedWorkers);
-        for (int i = 0; i < defaultWorkerNum; i++) {
-            assignedWorkers.add(new ResourceWorkerSlot());
-        }
-        List<SupervisorInfo> isolationSupervisors =
-                this.getIsolationSupervisors(context);
-        if (isolationSupervisors.size() != 0) {
-            putAllWorkerToSupervisor(assignedWorkers,
-                    getResAvailSupervisors(isolationSupervisors));
-        } else {
-            putAllWorkerToSupervisor(assignedWorkers,
-                    getResAvailSupervisors(context.getCluster()));
-        }
-        this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
-        LOG.info("Assigned workers=" + assignedWorkers);
-        return assignedWorkers;
-    }
-
-    private void setAllWorkerMemAndCpu(Map conf,
-            List<ResourceWorkerSlot> assignedWorkers) {
-        long defaultSize = ConfigExtension.getMemSizePerWorker(conf);
-        int defaultCpu = ConfigExtension.getCpuSlotPerWorker(conf);
-        for (ResourceWorkerSlot worker : assignedWorkers) {
-            if (worker.getMemSize() <= 0)
-                worker.setMemSize(defaultSize);
-            if (worker.getCpu() <= 0)
-                worker.setCpu(defaultCpu);
-        }
-    }
-
-    private void putAllWorkerToSupervisor(
-            List<ResourceWorkerSlot> assignedWorkers,
-            List<SupervisorInfo> supervisors) {
-        for (ResourceWorkerSlot worker : assignedWorkers) {
-            if (worker.getHostname() != null) {
-                for (SupervisorInfo supervisor : supervisors) {
-                    if (NetWorkUtils.equals(supervisor.getHostName(),
-                            worker.getHostname())
-                            && supervisor.getAvailableWorkerPorts().size() > 0) {
-                        putWorkerToSupervisor(supervisor, worker);
-                        break;
-                    }
-                }
-            }
-        }
-        supervisors = getResAvailSupervisors(supervisors);
-        Collections.sort(supervisors, new Comparator<SupervisorInfo>() {
-
-            @Override
-            public int compare(SupervisorInfo o1, SupervisorInfo o2) {
-                // TODO Auto-generated method stub
-                return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2
-                        .getAvailableWorkerPorts().size());
-            }
-
-        });
-        putWorkerToSupervisor(assignedWorkers, supervisors);
-    }
-
-    private void putWorkerToSupervisor(SupervisorInfo supervisor,
-            ResourceWorkerSlot worker) {
-        int port = worker.getPort();
-        if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
-            port = supervisor.getAvailableWorkerPorts().iterator().next();
-        }
-        worker.setPort(port);
-        supervisor.getAvailableWorkerPorts().remove(port);
-        worker.setNodeId(supervisor.getSupervisorId());
-    }
-
-    private void putWorkerToSupervisor(
-            List<ResourceWorkerSlot> assignedWorkers,
-            List<SupervisorInfo> supervisors) {
-        int allUsedPorts = 0;
-        for (SupervisorInfo supervisor : supervisors) {
-            int supervisorUsedPorts = supervisor.getWorkerPorts().size()
-                    - supervisor.getAvailableWorkerPorts().size();
-            allUsedPorts = allUsedPorts + supervisorUsedPorts;
-        }
-        // per supervisor should be allocated ports in theory
-        int theoryAveragePorts =
-                (allUsedPorts + assignedWorkers.size()) / supervisors.size()
-                        + 1;
-        // supervisor which use more than theoryAveragePorts ports will be
-        // pushed overLoadSupervisors
-        List<SupervisorInfo> overLoadSupervisors =
-                new ArrayList<SupervisorInfo>();
-        int key = 0;
-        Iterator<ResourceWorkerSlot> iterator = assignedWorkers.iterator();
-        while (iterator.hasNext()) {
-            if (supervisors.size() == 0)
-                break;
-            if (key >= supervisors.size())
-                key = 0;
-            SupervisorInfo supervisor = supervisors.get(key);
-            int supervisorUsedPorts = supervisor.getWorkerPorts().size()
-                    - supervisor.getAvailableWorkerPorts().size();
-            if (supervisorUsedPorts < theoryAveragePorts) {
-                ResourceWorkerSlot worker = iterator.next();
-                if (worker.getNodeId() != null)
-                    continue;
-                worker.setHostname(supervisor.getHostName());
-                worker.setNodeId(supervisor.getSupervisorId());
-                worker.setPort(
-                        supervisor.getAvailableWorkerPorts().iterator().next());
-                supervisor.getAvailableWorkerPorts().remove(worker.getPort());
-                if (supervisor.getAvailableWorkerPorts().size() == 0)
-                    supervisors.remove(supervisor);
-                key++;
-            } else {
-                overLoadSupervisors.add(supervisor);
-                supervisors.remove(supervisor);
-            }
-        }
-        // rest assignedWorkers will be allocate supervisor by deal
-        Collections.sort(overLoadSupervisors, new Comparator<SupervisorInfo>() {
-
-            @Override
-            public int compare(SupervisorInfo o1, SupervisorInfo o2) {
-                // TODO Auto-generated method stub
-                return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(),
-                        o2.getAvailableWorkerPorts().size());
-            }
-
-        });
-        key = 0;
-        while (iterator.hasNext()) {
-            if (overLoadSupervisors.size() == 0)
-                break;
-            if (key >= overLoadSupervisors.size())
-                key = 0;
-            ResourceWorkerSlot worker = iterator.next();
-            if (worker.getNodeId() != null)
-                continue;
-            SupervisorInfo supervisor = overLoadSupervisors.get(key);
-            worker.setHostname(supervisor.getHostName());
-            worker.setNodeId(supervisor.getSupervisorId());
-            worker.setPort(
-                    supervisor.getAvailableWorkerPorts().iterator().next());
-            supervisor.getAvailableWorkerPorts().remove(worker.getPort());
-            if (supervisor.getAvailableWorkerPorts().size() == 0)
-                overLoadSupervisors.remove(supervisor);
-            key++;
-        }
-    }
-
-    private void getRightWorkers(DefaultTopologyAssignContext context,
-            Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers,
-            int workersNum, Collection<ResourceWorkerSlot> workers) {
-        Set<Integer> assigned = new HashSet<Integer>();
-        List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
-        if (workers == null)
-            return;
-        for (ResourceWorkerSlot worker : workers) {
-            boolean right = true;
-            Set<Integer> tasks = worker.getTasks();
-            if (tasks == null)
-                continue;
-            for (Integer task : tasks) {
-                if (!needAssign.contains(task) || assigned.contains(task)) {
-                    right = false;
-                    break;
-                }
-            }
-            if (right) {
-                assigned.addAll(tasks);
-                users.add(worker);
-            }
-        }
-        if (users.size() + assignedWorkers.size() > workersNum) {
-            return;
-        }
-
-        if (users.size() + assignedWorkers.size() == workersNum
-                && assigned.size() != needAssign.size()) {
-            return;
-        }
-        assignedWorkers.addAll(users);
-        needAssign.removeAll(assigned);
-    }
-
-    private int getWorkersNum(DefaultTopologyAssignContext context,
-            int workersNum) {
-        Map<String, SupervisorInfo> supervisors = context.getCluster();
-        List<SupervisorInfo> isolationSupervisors =
-                this.getIsolationSupervisors(context);
-        int slotNum = 0;
-
-        if (isolationSupervisors.size() != 0) {
-            for (SupervisorInfo superivsor : isolationSupervisors) {
-                slotNum = slotNum + superivsor.getAvailableWorkerPorts().size();
-            }
-            return Math.min(slotNum, workersNum);
-        }
-        for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
-            slotNum = slotNum + entry.getValue().getAvailableWorkerPorts().size();
-        }
-        return Math.min(slotNum, workersNum);
-    }
-
-    /**
-     * @param context
-     * @param workers
-     * @return
-     */
-    private List<ResourceWorkerSlot> getUserDefineWorkers(
-            DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
-        List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
-        if (workers == null)
-            return ret;
-        Map<String, List<Integer>> componentToTask =
-                (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context
-                        .getComponentTasks()).clone();
-        if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
-            checkUserDefineWorkers(context, workers,
-                    context.getTaskToComponent());
-        }
-        for (WorkerAssignment worker : workers) {
-            ResourceWorkerSlot workerSlot =
-                    new ResourceWorkerSlot(worker, componentToTask);
-            if (workerSlot.getTasks().size() != 0) {
-                ret.add(workerSlot);
-            }
-        }
-        return ret;
-    }
-
-    private void checkUserDefineWorkers(DefaultTopologyAssignContext context,
-            List<WorkerAssignment> workers, Map<Integer, String> taskToComponent) {
-        Set<ResourceWorkerSlot> unstoppedWorkers =
-                context.getUnstoppedWorkers();
-        List<WorkerAssignment> re = new ArrayList<WorkerAssignment>();
-        for (WorkerAssignment worker : workers) {
-            for (ResourceWorkerSlot unstopped : unstoppedWorkers) {
-                if (unstopped
-                        .compareToUserDefineWorker(worker, taskToComponent))
-                    re.add(worker);
-            }
-        }
-        workers.removeAll(re);
-
-    }
-
-    private List<SupervisorInfo> getResAvailSupervisors(
-            Map<String, SupervisorInfo> supervisors) {
-        List<SupervisorInfo> availableSupervisors =
-                new ArrayList<SupervisorInfo>();
-        for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
-            SupervisorInfo supervisor = entry.getValue();
-            if (supervisor.getAvailableWorkerPorts().size() > 0)
-                availableSupervisors.add(entry.getValue());
-        }
-        return availableSupervisors;
-    }
-
-    private List<SupervisorInfo> getResAvailSupervisors(
-            List<SupervisorInfo> supervisors) {
-        List<SupervisorInfo> availableSupervisors =
-                new ArrayList<SupervisorInfo>();
-        for (SupervisorInfo supervisor : supervisors) {
-            if (supervisor.getAvailableWorkerPorts().size() > 0)
-                availableSupervisors.add(supervisor);
-        }
-        return availableSupervisors;
-    }
-
-    private List<SupervisorInfo> getIsolationSupervisors(
-            DefaultTopologyAssignContext context) {
-        List<String> isolationHosts =
-                (List<String>) context.getStormConf().get(
-                        Config.ISOLATION_SCHEDULER_MACHINES);
-        LOG.info("Isolation machines: " + isolationHosts);
-        if (isolationHosts == null)
-            return new ArrayList<SupervisorInfo>();
-        List<SupervisorInfo> isolationSupervisors =
-                new ArrayList<SupervisorInfo>();
-        for (Entry<String, SupervisorInfo> entry : context.getCluster()
-                .entrySet()) {
-            if (containTargetHost(isolationHosts, entry.getValue()
-                    .getHostName())) {
-                isolationSupervisors.add(entry.getValue());
-            }
-        }
-        return isolationSupervisors;
-    }
-
-    private boolean containTargetHost(Collection<String> hosts, String target) {
-        for (String host : hosts) {
-            if (NetWorkUtils.equals(host, target) == true) {
-                return true;
-            }
-        }
-        return false;
-    }
+	public static Logger LOG = LoggerFactory.getLogger(WorkerScheduler.class);
+
+	private static WorkerScheduler instance;
+
+	private WorkerScheduler() {
+
+	}
+
+	public static WorkerScheduler getInstance() {
+		if (instance == null) {
+			instance = new WorkerScheduler();
+		}
+		return instance;
+	}
+
+	public List<ResourceWorkerSlot> getAvailableWorkers(
+			DefaultTopologyAssignContext context, Set<Integer> needAssign,
+			int allocWorkerNum) {
+		int workersNum = getAvailableWorkersNum(context);
+		if (workersNum < allocWorkerNum) {
+			throw new FailedAssignTopologyException(
+					"there's no enough worker. allocWorkerNum="
+							+ allocWorkerNum + ", availableWorkerNum="
+							+ workersNum);
+		}
+		workersNum = allocWorkerNum;
+		List<ResourceWorkerSlot> assignedWorkers = new ArrayList<ResourceWorkerSlot>();
+		// userdefine assignments, but dont't try to use custom scheduling for
+		// TM bolts now.
+		getRightWorkers(
+				context,
+				needAssign,
+				assignedWorkers,
+				workersNum,
+				getUserDefineWorkers(context, ConfigExtension
+						.getUserDefineAssignment(context.getStormConf())));
+
+		// old assignments
+		if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
+			getRightWorkers(context, needAssign, assignedWorkers, workersNum,
+					context.getOldWorkers());
+		} else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
+				&& context.isReassign() == false) {
+			int cnt = 0;
+			for (ResourceWorkerSlot worker : context.getOldWorkers()) {
+				if (cnt < workersNum) {
+					ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
+					resFreeWorker.setPort(worker.getPort());
+					resFreeWorker.setHostname(worker.getHostname());
+					resFreeWorker.setNodeId(worker.getNodeId());
+					assignedWorkers.add(resFreeWorker);
+					cnt++;
+				} else {
+					break;
+				}
+			}
+		}
+		// calculate rest TM bolts
+		int workersForSingleTM = 0;
+		if (context.getAssignSingleWorkerForTM()) {
+			for (Integer taskId : needAssign) {
+				String componentName = context.getTaskToComponent().get(taskId);
+				if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+					workersForSingleTM++;
+				}
+			}
+		}
+
+		LOG.info("Get workers from user define and old assignments: "
+				+ assignedWorkers);
+
+		int restWokerNum = workersNum - assignedWorkers.size();
+		if (restWokerNum < 0)
+			throw new FailedAssignTopologyException(
+					"Too much workers are needed for user define or old assignments. workersNum="
+							+ workersNum + ", assignedWokersNum="
+							+ assignedWorkers.size());
+
+		for (int i = 0; i < restWokerNum; i++) {
+			assignedWorkers.add(new ResourceWorkerSlot());
+		}
+		List<SupervisorInfo> isolationSupervisors = this
+				.getIsolationSupervisors(context);
+		if (isolationSupervisors.size() != 0) {
+			putAllWorkerToSupervisor(assignedWorkers,
+					getResAvailSupervisors(isolationSupervisors));
+		} else {
+			putAllWorkerToSupervisor(assignedWorkers,
+					getResAvailSupervisors(context.getCluster()));
+		}
+		this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
+		LOG.info("Assigned workers=" + assignedWorkers);
+		return assignedWorkers;
+	}
+
+	private void setAllWorkerMemAndCpu(Map conf,
+			List<ResourceWorkerSlot> assignedWorkers) {
+		long defaultSize = ConfigExtension.getMemSizePerWorker(conf);
+		int defaultCpu = ConfigExtension.getCpuSlotPerWorker(conf);
+		for (ResourceWorkerSlot worker : assignedWorkers) {
+			if (worker.getMemSize() <= 0)
+				worker.setMemSize(defaultSize);
+			if (worker.getCpu() <= 0)
+				worker.setCpu(defaultCpu);
+		}
+	}
+
+	private void putAllWorkerToSupervisor(
+			List<ResourceWorkerSlot> assignedWorkers,
+			List<SupervisorInfo> supervisors) {
+		for (ResourceWorkerSlot worker : assignedWorkers) {
+			if (worker.getHostname() != null) {
+				for (SupervisorInfo supervisor : supervisors) {
+					if (NetWorkUtils.equals(supervisor.getHostName(),
+							worker.getHostname())
+							&& supervisor.getAvailableWorkerPorts().size() > 0) {
+						putWorkerToSupervisor(supervisor, worker);
+						break;
+					}
+				}
+			}
+		}
+		supervisors = getResAvailSupervisors(supervisors);
+		Collections.sort(supervisors, new Comparator<SupervisorInfo>() {
+
+			@Override
+			public int compare(SupervisorInfo o1, SupervisorInfo o2) {
+				// TODO Auto-generated method stub
+				return -NumberUtils.compare(
+						o1.getAvailableWorkerPorts().size(), o2
+								.getAvailableWorkerPorts().size());
+			}
+
+		});
+		putWorkerToSupervisor(assignedWorkers, supervisors);
+	}
+
+	private void putWorkerToSupervisor(SupervisorInfo supervisor,
+			ResourceWorkerSlot worker) {
+		int port = worker.getPort();
+		if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
+			port = supervisor.getAvailableWorkerPorts().iterator().next();
+		}
+		worker.setPort(port);
+		supervisor.getAvailableWorkerPorts().remove(port);
+		worker.setNodeId(supervisor.getSupervisorId());
+	}
+
+	private void putWorkerToSupervisor(
+			List<ResourceWorkerSlot> assignedWorkers,
+			List<SupervisorInfo> supervisors) {
+		int allUsedPorts = 0;
+		for (SupervisorInfo supervisor : supervisors) {
+			int supervisorUsedPorts = supervisor.getWorkerPorts().size()
+					- supervisor.getAvailableWorkerPorts().size();
+			allUsedPorts = allUsedPorts + supervisorUsedPorts;
+		}
+		// per supervisor should be allocated ports in theory
+		int theoryAveragePorts = (allUsedPorts + assignedWorkers.size())
+				/ supervisors.size() + 1;
+		// supervisor which use more than theoryAveragePorts ports will be
+		// pushed overLoadSupervisors
+		List<SupervisorInfo> overLoadSupervisors = new ArrayList<SupervisorInfo>();
+		int key = 0;
+		Iterator<ResourceWorkerSlot> iterator = assignedWorkers.iterator();
+		while (iterator.hasNext()) {
+			if (supervisors.size() == 0)
+				break;
+			if (key >= supervisors.size())
+				key = 0;
+			SupervisorInfo supervisor = supervisors.get(key);
+			int supervisorUsedPorts = supervisor.getWorkerPorts().size()
+					- supervisor.getAvailableWorkerPorts().size();
+			if (supervisorUsedPorts < theoryAveragePorts) {
+				ResourceWorkerSlot worker = iterator.next();
+				if (worker.getNodeId() != null)
+					continue;
+				worker.setHostname(supervisor.getHostName());
+				worker.setNodeId(supervisor.getSupervisorId());
+				worker.setPort(supervisor.getAvailableWorkerPorts().iterator()
+						.next());
+				supervisor.getAvailableWorkerPorts().remove(worker.getPort());
+				if (supervisor.getAvailableWorkerPorts().size() == 0)
+					supervisors.remove(supervisor);
+				key++;
+			} else {
+				overLoadSupervisors.add(supervisor);
+				supervisors.remove(supervisor);
+			}
+		}
+		// rest assignedWorkers will be allocate supervisor by deal
+		Collections.sort(overLoadSupervisors, new Comparator<SupervisorInfo>() {
+
+			@Override
+			public int compare(SupervisorInfo o1, SupervisorInfo o2) {
+				// TODO Auto-generated method stub
+				return -NumberUtils.compare(
+						o1.getAvailableWorkerPorts().size(), o2
+								.getAvailableWorkerPorts().size());
+			}
+
+		});
+		key = 0;
+		while (iterator.hasNext()) {
+			if (overLoadSupervisors.size() == 0)
+				break;
+			if (key >= overLoadSupervisors.size())
+				key = 0;
+			ResourceWorkerSlot worker = iterator.next();
+			if (worker.getNodeId() != null)
+				continue;
+			SupervisorInfo supervisor = overLoadSupervisors.get(key);
+			worker.setHostname(supervisor.getHostName());
+			worker.setNodeId(supervisor.getSupervisorId());
+			worker.setPort(supervisor.getAvailableWorkerPorts().iterator()
+					.next());
+			supervisor.getAvailableWorkerPorts().remove(worker.getPort());
+			if (supervisor.getAvailableWorkerPorts().size() == 0)
+				overLoadSupervisors.remove(supervisor);
+			key++;
+		}
+	}
+
+	private void getRightWorkers(DefaultTopologyAssignContext context,
+			Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers,
+			int workersNum, Collection<ResourceWorkerSlot> workers) {
+		Set<Integer> assigned = new HashSet<Integer>();
+		List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
+		if (workers == null)
+			return;
+		for (ResourceWorkerSlot worker : workers) {
+			boolean right = true;
+			Set<Integer> tasks = worker.getTasks();
+			if (tasks == null)
+				continue;
+			for (Integer task : tasks) {
+				if (!needAssign.contains(task) || assigned.contains(task)) {
+					right = false;
+					break;
+				}
+			}
+			if (right) {
+				assigned.addAll(tasks);
+				users.add(worker);
+			}
+		}
+		if (users.size() + assignedWorkers.size() > workersNum) {
+			LOG.warn(
+					"There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}",
+					users, assignedWorkers, workersNum);
+			return;
+		}
+
+		assignedWorkers.addAll(users);
+		needAssign.removeAll(assigned);
+	}
+
+	private int getAvailableWorkersNum(DefaultTopologyAssignContext context) {
+		Map<String, SupervisorInfo> supervisors = context.getCluster();
+		List<SupervisorInfo> isolationSupervisors = this
+				.getIsolationSupervisors(context);
+		int slotNum = 0;
+
+		if (isolationSupervisors.size() != 0) {
+			for (SupervisorInfo superivsor : isolationSupervisors) {
+				slotNum = slotNum + superivsor.getAvailableWorkerPorts().size();
+			}
+		} else {
+			for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
+				slotNum = slotNum
+						+ entry.getValue().getAvailableWorkerPorts().size();
+			}
+		}
+		return slotNum;
+	}
+
+	/**
+	 * @param context
+	 * @param workers
+	 * @return
+	 */
+	private List<ResourceWorkerSlot> getUserDefineWorkers(
+			DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
+		List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
+		if (workers == null)
+			return ret;
+		Map<String, List<Integer>> componentToTask = (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context
+				.getComponentTasks()).clone();
+		if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
+			checkUserDefineWorkers(context, workers,
+					context.getTaskToComponent());
+		}
+		for (WorkerAssignment worker : workers) {
+			ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,
+					componentToTask);
+			if (workerSlot.getTasks().size() != 0) {
+				ret.add(workerSlot);
+			}
+		}
+		return ret;
+	}
+
+	private void checkUserDefineWorkers(DefaultTopologyAssignContext context,
+			List<WorkerAssignment> workers, Map<Integer, String> taskToComponent) {
+		Set<ResourceWorkerSlot> unstoppedWorkers = context
+				.getUnstoppedWorkers();
+		List<WorkerAssignment> re = new ArrayList<WorkerAssignment>();
+		for (WorkerAssignment worker : workers) {
+			for (ResourceWorkerSlot unstopped : unstoppedWorkers) {
+				if (unstopped
+						.compareToUserDefineWorker(worker, taskToComponent))
+					re.add(worker);
+			}
+		}
+		workers.removeAll(re);
+
+	}
+
+	private List<SupervisorInfo> getResAvailSupervisors(
+			Map<String, SupervisorInfo> supervisors) {
+		List<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>();
+		for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
+			SupervisorInfo supervisor = entry.getValue();
+			if (supervisor.getAvailableWorkerPorts().size() > 0)
+				availableSupervisors.add(entry.getValue());
+		}
+		return availableSupervisors;
+	}
+
+	private List<SupervisorInfo> getResAvailSupervisors(
+			List<SupervisorInfo> supervisors) {
+		List<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>();
+		for (SupervisorInfo supervisor : supervisors) {
+			if (supervisor.getAvailableWorkerPorts().size() > 0)
+				availableSupervisors.add(supervisor);
+		}
+		return availableSupervisors;
+	}
+
+	private List<SupervisorInfo> getIsolationSupervisors(
+			DefaultTopologyAssignContext context) {
+		List<String> isolationHosts = (List<String>) context.getStormConf()
+				.get(Config.ISOLATION_SCHEDULER_MACHINES);
+		LOG.info("Isolation machines: " + isolationHosts);
+		if (isolationHosts == null)
+			return new ArrayList<SupervisorInfo>();
+		List<SupervisorInfo> isolationSupervisors = new ArrayList<SupervisorInfo>();
+		for (Entry<String, SupervisorInfo> entry : context.getCluster()
+				.entrySet()) {
+			if (containTargetHost(isolationHosts, entry.getValue()
+					.getHostName())) {
+				isolationSupervisors.add(entry.getValue());
+			}
+		}
+		return isolationSupervisors;
+	}
+
+	private boolean containTargetHost(Collection<String> hosts, String target) {
+		for (String host : hosts) {
+			if (NetWorkUtils.equals(host, target) == true) {
+				return true;
+			}
+		}
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
index 6481c5e..b0fdc92 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
@@ -17,31 +17,20 @@
  */
 package com.alibaba.jstorm.task;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
 import backtype.storm.messaging.IContext;
 import backtype.storm.serialization.KryoTupleSerializer;
 import backtype.storm.spout.ISpout;
 import backtype.storm.task.IBolt;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.Utils;
 import backtype.storm.utils.WorkerClassLoader;
 import clojure.lang.Atom;
-
 import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormZkClusterState;
 import com.alibaba.jstorm.daemon.worker.WorkerData;
 import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
 import com.alibaba.jstorm.task.comm.TaskSendTargets;
@@ -55,29 +44,27 @@ import com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors;
 import com.alibaba.jstorm.task.execute.spout.SingleThreadSpoutExecutors;
 import com.alibaba.jstorm.task.execute.spout.SpoutExecutors;
 import com.alibaba.jstorm.task.group.MkGrouper;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
-import com.alibaba.jstorm.task.heartbeat.TaskStats;
 import com.alibaba.jstorm.utils.JStormServerUtils;
 import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.NetWorkUtils;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Task instance
  * 
  * @author yannian/Longda
- * 
  */
-public class Task {
-
+public class Task implements Runnable{
     private final static Logger LOG = LoggerFactory.getLogger(Task.class);
 
     private Map<Object, Object> stormConf;
 
     private TopologyContext topologyContext;
     private TopologyContext userContext;
-    private String topologyid;
     private IContext context;
 
     private TaskTransfer taskTransfer;
@@ -86,8 +73,9 @@ public class Task {
     private Map<Integer, DisruptorQueue> deserializeQueues;
     private AsyncLoopDefaultKill workHalt;
 
-    private Integer taskid;
-    private String componentid;
+    private String topologyId;
+    private Integer taskId;
+    private String componentId;
     private volatile TaskStatus taskStatus;
     private Atom openOrPrepareWasCalled;
     // running time counter
@@ -97,63 +85,46 @@ public class Task {
     private Object taskObj;
     private TaskBaseMetric taskStats;
     private WorkerData workerData;
-    private String componentType; // "spout" or "bolt"
 
     private TaskSendTargets taskSendTargets;
+    private TaskReportErrorAndDie reportErrorDie;
 
     private boolean isTaskBatchTuple;
+    private TaskShutdownDameon taskShutdownDameon;
 
     @SuppressWarnings("rawtypes")
     public Task(WorkerData workerData, int taskId) throws Exception {
         openOrPrepareWasCalled = new Atom(Boolean.valueOf(false));
 
         this.workerData = workerData;
-        this.topologyContext =
-                workerData.getContextMaker().makeTopologyContext(
-                        workerData.getSysTopology(), taskId,
-                        openOrPrepareWasCalled);
-        this.userContext =
-                workerData.getContextMaker().makeTopologyContext(
-                        workerData.getRawTopology(), taskId,
-                        openOrPrepareWasCalled);
-        this.taskid = taskId;
-        this.componentid = topologyContext.getThisComponentId();
-        this.stormConf =
-                Common.component_conf(workerData.getStormConf(),
-                        topologyContext, componentid);
+        this.topologyContext = workerData.getContextMaker().makeTopologyContext(workerData.getSysTopology(), taskId, openOrPrepareWasCalled);
+        this.userContext = workerData.getContextMaker().makeTopologyContext(workerData.getRawTopology(), taskId, openOrPrepareWasCalled);
+        this.taskId = taskId;
+        this.componentId = topologyContext.getThisComponentId();
+        this.stormConf = Common.component_conf(workerData.getStormConf(), topologyContext, componentId);
 
         this.taskStatus = new TaskStatus();
-        this.taskStats = new TaskBaseMetric(taskId);
 
         this.innerTaskTransfer = workerData.getInnerTaskTransfer();
         this.deserializeQueues = workerData.getDeserializeQueues();
-        this.topologyid = workerData.getTopologyId();
+        this.topologyId = workerData.getTopologyId();
         this.context = workerData.getContext();
         this.workHalt = workerData.getWorkHalt();
-        this.zkCluster =
-                new StormZkClusterState(workerData.getZkClusterstate());
+        this.zkCluster =workerData.getZkCluster();
+        this.taskStats = new TaskBaseMetric(topologyId, componentId, taskId,
+                ConfigExtension.isEnableMetrics(workerData.getStormConf()));
 
-        LOG.info("Begin to deserialize taskObj " + componentid + ":" + taskid);
+        LOG.info("Begin to deserialize taskObj " + componentId + ":" + this.taskId);
 
         WorkerClassLoader.switchThreadContext();
         // get real task object -- spout/bolt/spoutspec
-        this.taskObj =
-                Common.get_task_object(topologyContext.getRawTopology(),
-                        componentid, WorkerClassLoader.getInstance());
+        this.taskObj = Common.get_task_object(topologyContext.getRawTopology(), componentId, WorkerClassLoader.getInstance());
         WorkerClassLoader.restoreThreadContext();
 
         isTaskBatchTuple = ConfigExtension.isTaskBatchTuple(stormConf);
         LOG.info("Transfer/receive in batch mode :" + isTaskBatchTuple);
 
-        LOG.info("Loading task " + componentid + ":" + taskid);
-    }
-
-    private void setComponentType() {
-        if (taskObj instanceof IBolt) {
-            componentType = "bolt";
-        } else if (taskObj instanceof ISpout) {
-            componentType = "spout";
-        }
+        LOG.info("Loading task " + componentId + ":" + this.taskId);
     }
 
     private TaskSendTargets makeSendTargets() {
@@ -161,44 +132,20 @@ public class Task {
 
         // get current task's output
         // <Stream_id,<component, Grouping>>
-        Map<String, Map<String, MkGrouper>> streamComponentGrouper =
-                Common.outbound_components(topologyContext, workerData);
+        Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(topologyContext, workerData);
 
-        return new TaskSendTargets(stormConf, component,
-                streamComponentGrouper, topologyContext, taskStats);
+        return new TaskSendTargets(stormConf, component, streamComponentGrouper, topologyContext, taskStats);
     }
 
     private void updateSendTargets() {
         if (taskSendTargets != null) {
-            Map<String, Map<String, MkGrouper>> streamComponentGrouper =
-                    Common.outbound_components(topologyContext, workerData);
+            Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(topologyContext, workerData);
             taskSendTargets.updateStreamCompGrouper(streamComponentGrouper);
         } else {
             LOG.error("taskSendTargets is null when trying to update it.");
         }
     }
 
-    private TaskTransfer mkTaskSending(WorkerData workerData) {
-
-        // sending tuple's serializer
-        KryoTupleSerializer serializer =
-                new KryoTupleSerializer(workerData.getStormConf(),
-                        topologyContext);
-
-        String taskName = JStormServerUtils.getName(componentid, taskid);
-        // Task sending all tuples through this Object
-        TaskTransfer taskTransfer;
-        if (isTaskBatchTuple)
-            taskTransfer =
-                    new TaskBatchTransfer(this, taskName, serializer,
-                            taskStatus, workerData);
-        else
-            taskTransfer =
-                    new TaskTransfer(this, taskName, serializer, taskStatus,
-                            workerData);
-        return taskTransfer;
-    }
-
     public TaskSendTargets echoToSystemBolt() {
         // send "startup" tuple to system bolt
         List<Object> msg = new ArrayList<Object>();
@@ -206,9 +153,7 @@ public class Task {
 
         // create task receive object
         TaskSendTargets sendTargets = makeSendTargets();
-        UnanchoredSend.send(topologyContext, sendTargets, taskTransfer,
-                Common.SYSTEM_STREAM_ID, msg);
-
+        UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Common.SYSTEM_STREAM_ID, msg);
         return sendTargets;
     }
 
@@ -217,102 +162,90 @@ public class Task {
         if (isOnePending == true) {
             return true;
         }
-
         return ConfigExtension.isSpoutSingleThread(conf);
     }
 
-    public RunnableCallback mk_executors(TaskSendTargets sendTargets,
-            ITaskReportErr report_error) {
+    public BaseExecutors mkExecutor() {
+    	BaseExecutors baseExecutor = null;
 
         if (taskObj instanceof IBolt) {
-            return new BoltExecutors(this, (IBolt) taskObj, taskTransfer,
-                    innerTaskTransfer, stormConf, sendTargets, taskStatus,
-                    topologyContext, userContext, taskStats, report_error);
+        	baseExecutor = new BoltExecutors(this);
         } else if (taskObj instanceof ISpout) {
             if (isSingleThread(stormConf) == true) {
-                return new SingleThreadSpoutExecutors(this, (ISpout) taskObj,
-                        taskTransfer, innerTaskTransfer, stormConf,
-                        sendTargets, taskStatus, topologyContext, userContext,
-                        taskStats, report_error);
+            	baseExecutor = new SingleThreadSpoutExecutors(this);
             } else {
-                return new MultipleThreadSpoutExecutors(this, (ISpout) taskObj,
-                        taskTransfer, innerTaskTransfer, stormConf,
-                        sendTargets, taskStatus, topologyContext, userContext,
-                        taskStats, report_error);
+                baseExecutor = new MultipleThreadSpoutExecutors(this);
             }
         }
-
-        return null;
+        
+        return baseExecutor;
     }
 
     /**
      * create executor to receive tuples and run bolt/spout execute function
-     * 
-     * @param puller
-     * @param sendTargets
-     * @return
      */
-    private RunnableCallback mkExecutor(TaskSendTargets sendTargets) {
+    private RunnableCallback prepareExecutor() {
         // create report error callback,
         // in fact it is storm_cluster.report-task-error
-        ITaskReportErr reportError =
-                new TaskReportError(zkCluster, topologyid, taskid);
+        ITaskReportErr reportError = new TaskReportError(zkCluster, topologyId, taskId);
 
         // report error and halt worker
-        TaskReportErrorAndDie reportErrorDie =
-                new TaskReportErrorAndDie(reportError, workHalt);
+        reportErrorDie = new TaskReportErrorAndDie(reportError, workHalt);
+        
+        final BaseExecutors baseExecutor = mkExecutor();
 
-        return mk_executors(sendTargets, reportErrorDie);
+        return baseExecutor;
     }
 
     public TaskReceiver mkTaskReceiver() {
-        String taskName = JStormServerUtils.getName(componentid, taskid);
-        TaskReceiver taskReceiver;
+        String taskName = JStormServerUtils.getName(componentId, taskId);
         if (isTaskBatchTuple)
-            taskReceiver =
-                    new TaskBatchReceiver(this, taskid, stormConf,
-                            topologyContext, innerTaskTransfer, taskStatus,
-                            taskName);
+            taskReceiver = new TaskBatchReceiver(this, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
         else
-            taskReceiver =
-                    new TaskReceiver(this, taskid, stormConf, topologyContext,
-                            innerTaskTransfer, taskStatus, taskName);
-        deserializeQueues.put(taskid, taskReceiver.getDeserializeQueue());
+            taskReceiver = new TaskReceiver(this, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
+        deserializeQueues.put(taskId, taskReceiver.getDeserializeQueue());
         return taskReceiver;
     }
 
     public TaskShutdownDameon execute() throws Exception {
-        setComponentType();
 
         taskSendTargets = echoToSystemBolt();
 
         // create thread to get tuple from zeroMQ,
         // and pass the tuple to bolt/spout
         taskTransfer = mkTaskSending(workerData);
-        RunnableCallback baseExecutor = mkExecutor(taskSendTargets);
-        AsyncLoopThread executor_threads =
-                new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY,
-                        true);
+        RunnableCallback baseExecutor = prepareExecutor();
+        AsyncLoopThread executor_threads = new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY, true);
         taskReceiver = mkTaskReceiver();
 
         List<AsyncLoopThread> allThreads = new ArrayList<AsyncLoopThread>();
         allThreads.add(executor_threads);
 
-        TaskHeartbeatRunable.registerTaskStats(taskid, new TaskStats(
-                componentType, taskStats));
-        LOG.info("Finished loading task " + componentid + ":" + taskid);
+        LOG.info("Finished loading task " + componentId + ":" + taskId);
 
-        return getShutdown(allThreads, taskReceiver.getDeserializeQueue(),
+        taskShutdownDameon =  getShutdown(allThreads, taskReceiver.getDeserializeQueue(),
                 baseExecutor);
+        return taskShutdownDameon;
     }
 
-    public TaskShutdownDameon getShutdown(List<AsyncLoopThread> allThreads,
-            DisruptorQueue deserializeQueue, RunnableCallback baseExecutor) {
+    private TaskTransfer mkTaskSending(WorkerData workerData) {
+        // sending tuple's serializer
+        KryoTupleSerializer serializer = new KryoTupleSerializer(workerData.getStormConf(), topologyContext);
 
+        String taskName = JStormServerUtils.getName(componentId, taskId);
+        // Task sending all tuples through this Object
+        TaskTransfer taskTransfer;
+        if (isTaskBatchTuple)
+            taskTransfer = new TaskBatchTransfer(this, taskName, serializer, taskStatus, workerData);
+        else
+            taskTransfer = new TaskTransfer(this, taskName, serializer, taskStatus, workerData);
+        return taskTransfer;
+    }
+
+    public TaskShutdownDameon getShutdown(List<AsyncLoopThread> allThreads, DisruptorQueue deserializeQueue, RunnableCallback baseExecutor) {
         AsyncLoopThread ackerThread = null;
         if (baseExecutor instanceof SpoutExecutors) {
-            ackerThread =
-                    ((SpoutExecutors) baseExecutor).getAckerRunnableThread();
+            ackerThread = ((SpoutExecutors) baseExecutor).getAckerRunnableThread();
 
             if (ackerThread != null) {
                 allThreads.add(ackerThread);
@@ -324,24 +257,30 @@ public class Task {
         AsyncLoopThread serializeThread = taskTransfer.getSerializeThread();
         allThreads.add(serializeThread);
 
-        TaskShutdownDameon shutdown =
-                new TaskShutdownDameon(taskStatus, topologyid, taskid,
-                        allThreads, zkCluster, taskObj);
+        TaskShutdownDameon shutdown = new TaskShutdownDameon(taskStatus, topologyId, taskId, allThreads, zkCluster, taskObj, this);
 
         return shutdown;
     }
 
-    public static TaskShutdownDameon mk_task(WorkerData workerData, int taskId)
-            throws Exception {
+    public TaskShutdownDameon getTaskShutdownDameon(){
+        return taskShutdownDameon;
+    }
 
-        Task t = new Task(workerData, taskId);
+    public void run(){
+        try {
+            taskShutdownDameon=this.execute();
+        }catch (Throwable e){
+            LOG.error("init task take error", e);
+        }
+    }
 
+    public static TaskShutdownDameon mk_task(WorkerData workerData, int taskId) throws Exception {
+        Task t = new Task(workerData, taskId);
         return t.execute();
     }
 
     /**
-     * Update the data which can be changed dynamically e.g. when scale-out of a
-     * task parallelism
+     * Update the data which can be changed dynamically e.g. when scale-out of a task parallelism
      */
     public void updateTaskData() {
         // Only update the local task list of topologyContext here. Because
@@ -359,12 +298,94 @@ public class Task {
     public long getWorkerAssignmentTs() {
         return workerData.getAssignmentTs();
     }
-    
+
     public AssignmentType getWorkerAssignmentType() {
         return workerData.getAssignmentType();
     }
 
     public void unregisterDeserializeQueue() {
-        deserializeQueues.remove(taskid);
+        deserializeQueues.remove(taskId);
+    }
+
+    public String getComponentId() {
+        return componentId;
+    }
+
+    public Integer getTaskId() {
+        return taskId;
+    }
+
+    public DisruptorQueue getExecuteQueue() {
+        return innerTaskTransfer.get(taskId);
+    }
+
+    public DisruptorQueue getDeserializeQueue() {
+        return deserializeQueues.get(taskId);
     }
+
+	public Map<Object, Object> getStormConf() {
+		return stormConf;
+	}
+
+	public TopologyContext getTopologyContext() {
+		return topologyContext;
+	}
+
+	public TopologyContext getUserContext() {
+		return userContext;
+	}
+
+	public TaskTransfer getTaskTransfer() {
+		return taskTransfer;
+	}
+
+	public TaskReceiver getTaskReceiver() {
+		return taskReceiver;
+	}
+
+	public Map<Integer, DisruptorQueue> getInnerTaskTransfer() {
+		return innerTaskTransfer;
+	}
+
+	public Map<Integer, DisruptorQueue> getDeserializeQueues() {
+		return deserializeQueues;
+	}
+
+	public String getTopologyId() {
+		return topologyId;
+	}
+
+	public TaskStatus getTaskStatus() {
+		return taskStatus;
+	}
+
+	public StormClusterState getZkCluster() {
+		return zkCluster;
+	}
+
+	public Object getTaskObj() {
+		return taskObj;
+	}
+
+	public TaskBaseMetric getTaskStats() {
+		return taskStats;
+	}
+
+	public WorkerData getWorkerData() {
+		return workerData;
+	}
+
+	public TaskSendTargets getTaskSendTargets() {
+		return taskSendTargets;
+	}
+
+	public TaskReportErrorAndDie getReportErrorDie() {
+		return reportErrorDie;
+	}
+
+	public boolean isTaskBatchTuple() {
+		return isTaskBatchTuple;
+	}
+    
+    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java
index 4c9eb0b..84c6151 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java
@@ -17,116 +17,110 @@
  */
 package com.alibaba.jstorm.task;
 
-import java.io.Serializable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.common.metric.MetricRegistry;
-import com.alibaba.jstorm.common.metric.window.Metric;
+import com.alibaba.jstorm.common.metric.AsmMetric;
 import com.alibaba.jstorm.metric.JStormMetrics;
 import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class TaskBaseMetric implements Serializable {
-	private static final Logger LOG = LoggerFactory.getLogger(TaskBaseMetric.class);
+    private static final Logger logger = LoggerFactory.getLogger(JStormMetrics.class);
+
     private static final long serialVersionUID = -7157987126460293444L;
-    protected MetricRegistry metrics;
+    private String topologyId;
+    private String componentId;
     private int taskId;
+    private boolean enableMetrics;
 
-    public TaskBaseMetric(int taskId) {
-        metrics = JStormMetrics.registerTask(taskId);
+    /**
+     * local metric name cache to avoid frequent metric name concatenation streamId + name ==> full metric name
+     */
+    private static final ConcurrentMap<String, AsmMetric> metricCache = new ConcurrentHashMap<String, AsmMetric>();
+
+    public TaskBaseMetric(String topologyId, String componentId, int taskId, boolean enableMetrics) {
+        this.topologyId = topologyId;
+        this.componentId = componentId;
         this.taskId = taskId;
+        this.enableMetrics = enableMetrics;
+        logger.info("init task base metric, tp id:{}, comp id:{}, task id:{}", topologyId, componentId, taskId);
     }
 
-    public void update(String name, Number value, int type) {
-        Metric metric = metrics.getMetric(name);
-        if (metric == null) {
-            metric = JStormMetrics.Builder.mkInstance(type);
-            try {
-            	/**
-            	 * Here use one hack method to handle competition register metric
-            	 * if duplicated metric, just skip it.
-            	 * 
-            	 * this will improve performance
-            	 */
-            	JStormMetrics.registerTaskMetric(metric, taskId, name);
-            }catch(Exception e) {
-            	LOG.warn("Duplicated metrics of {}, taskId:{}", name, taskId);
-            	return ;
+    public void update(final String streamId, final String name, final Number value, final MetricType metricType,
+                       boolean mergeTopology) {
+        String key = taskId + streamId + name;
+        AsmMetric existingMetric = metricCache.get(key);
+        if (existingMetric == null) {
+            String fullName = MetricUtils.streamMetricName(topologyId, componentId, taskId, streamId, name, metricType);
+            existingMetric = JStormMetrics.getStreamMetric(fullName);
+            if (existingMetric == null) {
+                existingMetric = AsmMetric.Builder.build(metricType);
+                JStormMetrics.registerStreamMetric(fullName, existingMetric, mergeTopology);
             }
-            
+            metricCache.putIfAbsent(key, existingMetric);
         }
-        metric.update(value);
+
+        existingMetric.update(value);
+    }
+
+    public void update(final String streamId, final String name, final Number value, final MetricType metricType) {
+        update(streamId, name, value, metricType, true);
     }
 
     public void send_tuple(String stream, int num_out_tasks) {
-        if (num_out_tasks <= 0) {
-            return;
+        if (enableMetrics && num_out_tasks > 0) {
+            update(stream, MetricDef.EMMITTED_NUM, num_out_tasks, MetricType.COUNTER);
+            update(stream, MetricDef.SEND_TPS, num_out_tasks, MetricType.METER);
         }
-
-        String emmitedName =
-                MetricRegistry.name(MetricDef.EMMITTED_NUM, stream);
-        update(emmitedName, Double.valueOf(num_out_tasks),
-                JStormMetrics.Builder.COUNTER);
-
-        String sendTpsName = MetricRegistry.name(MetricDef.SEND_TPS, stream);
-        update(sendTpsName, Double.valueOf(num_out_tasks),
-                JStormMetrics.Builder.METER);
     }
 
     public void recv_tuple(String component, String stream) {
-
-        String name =
-                MetricRegistry.name(MetricDef.RECV_TPS, component, stream);
-        update(name, Double.valueOf(1), JStormMetrics.Builder.METER);
-
+        if (enableMetrics) {
+            update(stream, AsmMetric.mkName(component, MetricDef.RECV_TPS), 1, MetricType.METER);
+//            update(stream, MetricDef.RECV_TPS, 1, MetricType.METER);
+        }
     }
 
-    public void bolt_acked_tuple(String component, String stream,
-            Double latency_ms) {
+    public void bolt_acked_tuple(String component, String stream, Long latency, Long lifeCycle) {
+        if (enableMetrics) {
+//            update(stream, AsmMetric.mkName(component, MetricDef.ACKED_NUM), 1, MetricType.COUNTER);
+//            update(stream, AsmMetric.mkName(component, MetricDef.PROCESS_LATENCY), latency_ms, MetricType.HISTOGRAM);
+            update(stream, MetricDef.ACKED_NUM, 1, MetricType.COUNTER);
+            update(stream, MetricDef.PROCESS_LATENCY, latency, MetricType.HISTOGRAM, false);
 
-        if (latency_ms == null) {
-            return;
+            if (lifeCycle > 0) {
+                update(stream, AsmMetric.mkName(component, MetricDef.TUPLE_LIEF_CYCLE), lifeCycle, MetricType.HISTOGRAM, false);
+            }
         }
-
-        String ackNumName =
-                MetricRegistry.name(MetricDef.ACKED_NUM, component, stream);
-        update(ackNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER);
-
-        String processName =
-                MetricRegistry.name(MetricDef.PROCESS_LATENCY, component,
-                        stream);
-        update(processName, latency_ms,
-                JStormMetrics.Builder.HISTOGRAM);
     }
 
     public void bolt_failed_tuple(String component, String stream) {
-
-        String failNumName =
-                MetricRegistry.name(MetricDef.FAILED_NUM, component, stream);
-        update(failNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER);
+        if (enableMetrics) {
+            //update(stream, AsmMetric.mkName(component, MetricDef.FAILED_NUM), 1, MetricType.COUNTER);
+            update(stream, MetricDef.FAILED_NUM, 1, MetricType.COUNTER);
+        }
     }
 
-    public void spout_acked_tuple(String stream, long st) {
-
-        String ackNumName =
-                MetricRegistry.name(MetricDef.ACKED_NUM,
-                        Common.ACKER_COMPONENT_ID, stream);
-        update(ackNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER);
-
-        String processName =
-                MetricRegistry.name(MetricDef.PROCESS_LATENCY,
-                        Common.ACKER_COMPONENT_ID, stream);
-        update(processName, Double.valueOf(st), JStormMetrics.Builder.HISTOGRAM);
+    public void spout_acked_tuple(String stream, long st, Long lifeCycle) {
+        if (enableMetrics) {
+            update(stream, MetricDef.ACKED_NUM, 1, MetricType.COUNTER);
+            update(stream, MetricDef.PROCESS_LATENCY, st, MetricType.HISTOGRAM, true);
 
+            if (lifeCycle > 0) {
+                update(stream, AsmMetric.mkName(Common.ACKER_COMPONENT_ID, MetricDef.TUPLE_LIEF_CYCLE), lifeCycle, MetricType.HISTOGRAM, false);
+            }
+        }
     }
 
     public void spout_failed_tuple(String stream) {
-        String failNumName =
-                MetricRegistry.name(MetricDef.FAILED_NUM,
-                        Common.ACKER_COMPONENT_ID, stream);
-        update(failNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER);
-
+        if (enableMetrics) {
+            update(stream, MetricDef.FAILED_NUM, 1, MetricType.COUNTER);
+        }
     }
 }