You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:44 UTC
[06/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java
index 8170ae2..496c2fc 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/ComponentNumSelector.java
@@ -40,12 +40,8 @@ public class ComponentNumSelector extends AbstractSelector {
@Override
public int compare(ResourceWorkerSlot o1, ResourceWorkerSlot o2) {
// TODO Auto-generated method stub
- int o1Num =
- context.getComponentNumOnSupervisor(o1.getNodeId(),
- name);
- int o2Num =
- context.getComponentNumOnSupervisor(o2.getNodeId(),
- name);
+ int o1Num = context.getComponentNumOnSupervisor(o1.getNodeId(), name);
+ int o2Num = context.getComponentNumOnSupervisor(o2.getNodeId(), name);
if (o1Num == o2Num)
return 0;
return o1Num > o2Num ? 1 : -1;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java
index 49eb447..f7f4f5b 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/InputComponentNumSelector.java
@@ -40,12 +40,8 @@ public class InputComponentNumSelector extends AbstractSelector {
@Override
public int compare(ResourceWorkerSlot o1, ResourceWorkerSlot o2) {
// TODO Auto-generated method stub
- int o1Num =
- context.getInputComponentNumOnSupervisor(
- o1.getNodeId(), name);
- int o2Num =
- context.getInputComponentNumOnSupervisor(
- o2.getNodeId(), name);
+ int o1Num = context.getInputComponentNumOnSupervisor(o1.getNodeId(), name);
+ int o2Num = context.getInputComponentNumOnSupervisor(o2.getNodeId(), name);
if (o1Num == o2Num)
return 0;
return o1Num > o2Num ? -1 : 1;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java
index adc8b29..6ef5736 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/Selector.java
@@ -22,6 +22,5 @@ import java.util.List;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
public interface Selector {
- public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result,
- String name);
+ public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result, String name);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java
index f01ee9a..8643d6a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/WorkerComparator.java
@@ -21,8 +21,7 @@ import java.util.Comparator;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
-public abstract class WorkerComparator implements
- Comparator<ResourceWorkerSlot> {
+public abstract class WorkerComparator implements Comparator<ResourceWorkerSlot> {
protected String name;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java
index f81d072..33f762a 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskAssignContext.java
@@ -24,33 +24,39 @@ import java.util.Map.Entry;
import java.util.Set;
public class TaskAssignContext {
-
+ private final Map<Integer, String> taskToComponent;
+
private final Map<String, List<ResourceWorkerSlot>> supervisorToWorker;
private final Map<String, Set<String>> relationship;
// Map<worker, Map<component name, assigned task num in this worker>
- private final Map<ResourceWorkerSlot, Map<String, Integer>> workerToComponentNum =
- new HashMap<ResourceWorkerSlot, Map<String, Integer>>();
+ private final Map<ResourceWorkerSlot, Map<String, Integer>> workerToComponentNum = new HashMap<ResourceWorkerSlot, Map<String, Integer>>();
// Map<available worker, assigned task num in this worker>
- private final Map<ResourceWorkerSlot, Integer> workerToTaskNum =
- new HashMap<ResourceWorkerSlot, Integer>();
+ private final Map<ResourceWorkerSlot, Integer> workerToTaskNum = new HashMap<ResourceWorkerSlot, Integer>();
- private final Map<String, ResourceWorkerSlot> HostPortToWorkerMap =
- new HashMap<String, ResourceWorkerSlot>();
+ private final Map<String, ResourceWorkerSlot> HostPortToWorkerMap = new HashMap<String, ResourceWorkerSlot>();
- public TaskAssignContext(
- Map<String, List<ResourceWorkerSlot>> supervisorToWorker,
- Map<String, Set<String>> relationship) {
+ public TaskAssignContext(Map<String, List<ResourceWorkerSlot>> supervisorToWorker, Map<String, Set<String>> relationship, Map<Integer, String> taskToComponent) {
+ this.taskToComponent = taskToComponent;
this.supervisorToWorker = supervisorToWorker;
this.relationship = relationship;
- for (Entry<String, List<ResourceWorkerSlot>> entry : supervisorToWorker
- .entrySet()) {
+ for (Entry<String, List<ResourceWorkerSlot>> entry : supervisorToWorker.entrySet()) {
for (ResourceWorkerSlot worker : entry.getValue()) {
- workerToTaskNum.put(worker, 0);
+ workerToTaskNum.put(worker, (worker.getTasks() != null ? worker.getTasks().size() : 0));
HostPortToWorkerMap.put(worker.getHostPort(), worker);
+
+ if (worker.getTasks() != null) {
+ Map<String, Integer> componentToNum = new HashMap<String, Integer>();
+ for (Integer taskId : worker.getTasks()) {
+ String componentId = taskToComponent.get(taskId);
+ int num = componentToNum.get(componentId) == null ? 0 : componentToNum.get(componentId);
+ componentToNum.put(componentId, ++num);
+ }
+ workerToComponentNum.put(worker, componentToNum);
+ }
}
}
}
@@ -115,8 +121,7 @@ public class TaskAssignContext {
return result;
}
- public int getInputComponentNumOnWorker(ResourceWorkerSlot worker,
- String name) {
+ public int getInputComponentNumOnWorker(ResourceWorkerSlot worker, String name) {
int result = 0;
for (String component : relationship.get(name))
result = result + this.getComponentNumOnWorker(worker, component);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java
index 7131463..a359972 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/TaskScheduler.java
@@ -41,20 +41,15 @@ public class TaskScheduler {
public static Logger LOG = LoggerFactory.getLogger(TaskScheduler.class);
- public static final String ACKER_NAME = "__acker";
-
private final TaskAssignContext taskContext;
- private List<ResourceWorkerSlot> assignments =
- new ArrayList<ResourceWorkerSlot>();
+ private List<ResourceWorkerSlot> assignments = new ArrayList<ResourceWorkerSlot>();
private int workerNum;
/**
- * For balance purpose, default scheduler is trying to assign the same
- * number of tasks to a worker. e.g. There are 4 tasks and 3 available
- * workers. Each worker will be assigned one task first. And then one worker
- * is chosen for the last one.
+ * For balance purpose, default scheduler is trying to assign the same number of tasks to a worker. e.g. There are 4 tasks and 3 available workers. Each
+ * worker will be assigned one task first. And then one worker is chosen for the last one.
*/
private int avgTaskNum;
private int leftTaskNum;
@@ -69,48 +64,91 @@ public class TaskScheduler {
private Selector totalTaskNumSelector;
- public TaskScheduler(DefaultTopologyAssignContext context,
- Set<Integer> tasks, List<ResourceWorkerSlot> workers) {
+ public TaskScheduler(DefaultTopologyAssignContext context, Set<Integer> tasks, List<ResourceWorkerSlot> workers) {
this.tasks = tasks;
- LOG.info("Tasks " + tasks + " is going to be assigned in workers "
- + workers);
+ LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers);
this.context = context;
this.taskContext =
- new TaskAssignContext(this.buildSupervisorToWorker(workers),
- Common.buildSpoutOutoputAndBoltInputMap(context));
+ new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent());
this.componentSelector = new ComponentNumSelector(taskContext);
- this.inputComponentSelector =
- new InputComponentNumSelector(taskContext);
+ this.inputComponentSelector = new InputComponentNumSelector(taskContext);
this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext);
if (tasks.size() == 0)
return;
- setTaskNum(tasks.size(), workerNum);
+ if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){
+ // warning ! it doesn't consider HA TM now!!
+ if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) {
+ assignForTopologyMaster();
+ }
+ }
+
+ int taskNum = tasks.size();
+ Map<ResourceWorkerSlot, Integer> workerSlotIntegerMap = taskContext.getWorkerToTaskNum();
+ Set<ResourceWorkerSlot> preAssignWorkers = new HashSet<ResourceWorkerSlot>();
+ for (Entry<ResourceWorkerSlot, Integer> worker : workerSlotIntegerMap.entrySet()) {
+ if (worker.getValue() > 0) {
+ taskNum += worker.getValue();
+ preAssignWorkers.add(worker.getKey());
+ }
+ }
+ setTaskNum(taskNum, workerNum);
+
+ // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers.
+ // Remove the workers which have been assigned with enough workers.
+ for (ResourceWorkerSlot worker : preAssignWorkers) {
+ Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
+ if (doneWorkers != null) {
+ for (ResourceWorkerSlot doneWorker : doneWorkers) {
+ taskNum -= doneWorker.getTasks().size();
+ workerNum--;
+ }
+ }
+ }
+ setTaskNum(taskNum, workerNum);
// For Scale-out case, the old assignment should be kept.
- if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
- && context.isReassign() == false) {
- keepAssignment(context.getOldAssignment().getWorkers());
+ if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) {
+ keepAssignment(taskNum, context.getOldAssignment().getWorkers());
}
}
- private void keepAssignment(Set<ResourceWorkerSlot> keepAssignments) {
+ private void keepAssignment(int taskNum, Set<ResourceWorkerSlot> keepAssignments) {
Set<Integer> keepTasks = new HashSet<Integer>();
+ ResourceWorkerSlot tmWorker = null;
for (ResourceWorkerSlot worker : keepAssignments) {
+ if (worker.getTasks().contains(context.getTopologyMasterTaskId()))
+ tmWorker = worker;
for (Integer taskId : worker.getTasks()) {
if (tasks.contains(taskId)) {
- ResourceWorkerSlot contextWorker =
- taskContext.getWorker(worker);
+ ResourceWorkerSlot contextWorker = taskContext.getWorker(worker);
if (contextWorker != null) {
- String componentName =
- context.getTaskToComponent().get(taskId);
- updateAssignedTasksOfWorker(taskId, contextWorker);
- updateComponentsNumOfWorker(componentName,
- contextWorker);
- keepTasks.add(taskId);
+ if (tmWorker != null && tmWorker.getTasks().contains(taskId) && context.getAssignSingleWorkerForTM() ) {
+ if (context.getTopologyMasterTaskId() == taskId){
+ updateAssignedTasksOfWorker(taskId, contextWorker);
+ taskContext.getWorkerToTaskNum().remove(contextWorker);
+ contextWorker.getTasks().clear();
+ contextWorker.getTasks().add(taskId);
+ assignments.add(contextWorker);
+ tasks.remove(taskId);
+ taskNum--;
+ workerNum--;
+ LOG.info("assignForTopologyMaster: " + contextWorker);
+ }
+ }else {
+ String componentName = context.getTaskToComponent().get(taskId);
+ updateAssignedTasksOfWorker(taskId, contextWorker);
+ updateComponentsNumOfWorker(componentName, contextWorker);
+ keepTasks.add(taskId);
+ }
}
}
}
}
+ if ( tmWorker != null){
+ setTaskNum(taskNum, workerNum);
+ keepAssignments.remove(tmWorker);
+ }
+
// Try to find the workers which have been assigned too much tasks
// If found, remove the workers from worker resource pool and update
@@ -118,11 +156,9 @@ public class TaskScheduler {
int doneAssignedTaskNum = 0;
while (true) {
boolean found = false;
- Set<ResourceWorkerSlot> doneAssignedWorkers =
- new HashSet<ResourceWorkerSlot>();
+ Set<ResourceWorkerSlot> doneAssignedWorkers = new HashSet<ResourceWorkerSlot>();
for (ResourceWorkerSlot worker : keepAssignments) {
- ResourceWorkerSlot contextWorker =
- taskContext.getWorker(worker);
+ ResourceWorkerSlot contextWorker = taskContext.getWorker(worker);
if (contextWorker != null && isTaskFullForWorker(contextWorker)) {
found = true;
workerNum--;
@@ -135,7 +171,8 @@ public class TaskScheduler {
}
if (found) {
- setTaskNum((tasks.size() - doneAssignedTaskNum), workerNum);
+ taskNum -= doneAssignedTaskNum;
+ setTaskNum(taskNum, workerNum);
keepAssignments.removeAll(doneAssignedWorkers);
} else {
break;
@@ -150,45 +187,89 @@ public class TaskScheduler {
Set<Integer> tasks = worker.getTasks();
if (tasks != null) {
- if ((leftTaskNum == 0 && tasks.size() >= avgTaskNum)
- || (leftTaskNum > 0 && tasks.size() >= (avgTaskNum + 1))) {
+ if ((leftTaskNum <= 0 && tasks.size() >= avgTaskNum) || (leftTaskNum > 0 && tasks.size() >= (avgTaskNum + 1))) {
ret = true;
}
}
return ret;
}
+ private Set<ResourceWorkerSlot> getRestAssignedWorkers() {
+ Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
+ for (ResourceWorkerSlot worker : taskContext.getWorkerToTaskNum().keySet()) {
+ if (worker.getTasks() != null && worker.getTasks().size() > 0) {
+ ret.add(worker);
+ }
+ }
+ return ret;
+ }
+
public List<ResourceWorkerSlot> assign() {
- if (tasks.size() == 0)
+ if (tasks.size() == 0) {
+ assignments.addAll(getRestAssignedWorkers());
return assignments;
+ }
// Firstly, assign workers to the components which are configured
// by "task.on.differ.node"
Set<Integer> assignedTasks = assignForDifferNodeTask();
- // Assign for the tasks except acker
+ // Assign for the tasks except system tasks
tasks.removeAll(assignedTasks);
- Set<Integer> ackers = new HashSet<Integer>();
+ Map<Integer, String> systemTasks = new HashMap<Integer, String>();
for (Integer task : tasks) {
String name = context.getTaskToComponent().get(task);
- if (name.equals(TaskScheduler.ACKER_NAME)) {
- ackers.add(task);
+ if (Common.isSystemComponent(name)) {
+ systemTasks.put(task, name);
continue;
}
assignForTask(name, task);
}
-
- // At last, make the assignment for acker
- for (Integer task : ackers) {
- assignForTask(TaskScheduler.ACKER_NAME, task);
+
+ /*
+ * At last, make the assignment for system component, e.g. acker, topology master...
+ */
+ for (Entry<Integer, String> entry : systemTasks.entrySet()) {
+ assignForTask(entry.getValue(), entry.getKey());
}
+
+ assignments.addAll(getRestAssignedWorkers());
return assignments;
}
+ private void assignForTopologyMaster() {
+ int taskId = context.getTopologyMasterTaskId();
+
+ // Try to find a worker which is in a supervisor with most workers,
+ // to avoid the balance problem when the assignment for other workers.
+ ResourceWorkerSlot workerAssigned = null;
+ int workerNumOfSuperv = 0;
+ for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){
+ List<ResourceWorkerSlot> workers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId());
+ if (workers != null && workers.size() > workerNumOfSuperv) {
+ for (ResourceWorkerSlot worker : workers) {
+ Set<Integer> tasks = worker.getTasks();
+ if (tasks == null || tasks.size() == 0) {
+ workerAssigned = worker;
+ workerNumOfSuperv = workers.size();
+ break;
+ }
+ }
+ }
+ }
+
+ if (workerAssigned == null)
+ throw new FailedAssignTopologyException("there's no enough workers for the assignment of topology master");
+ updateAssignedTasksOfWorker(taskId, workerAssigned);
+ taskContext.getWorkerToTaskNum().remove(workerAssigned);
+ assignments.add(workerAssigned);
+ tasks.remove(taskId);
+ workerNum--;
+ LOG.info("assignForTopologyMaster, assignments=" + assignments);
+ }
+
private void assignForTask(String name, Integer task) {
- ResourceWorkerSlot worker =
- chooseWorker(name, new ArrayList<ResourceWorkerSlot>(
- taskContext.getWorkerToTaskNum().keySet()));
+ ResourceWorkerSlot worker = chooseWorker(name, new ArrayList<ResourceWorkerSlot>(taskContext.getWorkerToTaskNum().keySet()));
pushTaskToWorker(task, name, worker);
}
@@ -201,98 +282,97 @@ public class TaskScheduler {
}
for (Integer task : ret) {
String name = context.getTaskToComponent().get(task);
- ResourceWorkerSlot worker =
- chooseWorker(name, getDifferNodeTaskWokers(name));
+ ResourceWorkerSlot worker = chooseWorker(name, getDifferNodeTaskWokers(name));
+ LOG.info("Due to task.on.differ.node, push task-{} to {}:{}", task, worker.getHostname(), worker.getPort());
pushTaskToWorker(task, name, worker);
}
return ret;
}
- private Map<String, List<ResourceWorkerSlot>> buildSupervisorToWorker(
- List<ResourceWorkerSlot> workers) {
- Map<String, List<ResourceWorkerSlot>> supervisorToWorker =
- new HashMap<String, List<ResourceWorkerSlot>>();
+ private Map<String, List<ResourceWorkerSlot>> buildSupervisorToWorker(List<ResourceWorkerSlot> workers) {
+ Map<String, List<ResourceWorkerSlot>> supervisorToWorker = new HashMap<String, List<ResourceWorkerSlot>>();
for (ResourceWorkerSlot worker : workers) {
- if (worker.getTasks() == null || worker.getTasks().size() == 0) {
- List<ResourceWorkerSlot> supervisor =
- supervisorToWorker.get(worker.getNodeId());
- if (supervisor == null) {
- supervisor = new ArrayList<ResourceWorkerSlot>();
- supervisorToWorker.put(worker.getNodeId(), supervisor);
- }
- supervisor.add(worker);
- } else {
- assignments.add(worker);
+ List<ResourceWorkerSlot> supervisor = supervisorToWorker.get(worker.getNodeId());
+ if (supervisor == null) {
+ supervisor = new ArrayList<ResourceWorkerSlot>();
+ supervisorToWorker.put(worker.getNodeId(), supervisor);
}
+ supervisor.add(worker);
}
- this.workerNum = workers.size() - assignments.size();
+ this.workerNum = workers.size();
return supervisorToWorker;
}
- private ResourceWorkerSlot chooseWorker(String name,
- List<ResourceWorkerSlot> workers) {
- List<ResourceWorkerSlot> result =
- componentSelector.select(workers, name);
+ private ResourceWorkerSlot chooseWorker(String name, List<ResourceWorkerSlot> workers) {
+ List<ResourceWorkerSlot> result = componentSelector.select(workers, name);
result = totalTaskNumSelector.select(result, name);
- if (name.equals(TaskScheduler.ACKER_NAME))
+ if (Common.isSystemComponent(name))
return result.iterator().next();
result = inputComponentSelector.select(result, name);
return result.iterator().next();
}
- private void pushTaskToWorker(Integer task, String name,
- ResourceWorkerSlot worker) {
+ private void pushTaskToWorker(Integer task, String name, ResourceWorkerSlot worker) {
LOG.debug("Push task-" + task + " to worker-" + worker.getPort());
int taskNum = updateAssignedTasksOfWorker(task, worker);
+ removeWorkerFromSrcPool(taskNum, worker);
+
+ updateComponentsNumOfWorker(name, worker);
+ }
+
+ private int updateAssignedTasksOfWorker(Integer task, ResourceWorkerSlot worker) {
+ int ret = 0;
+ Set<Integer> tasks = worker.getTasks();
+ if (tasks == null) {
+ tasks = new HashSet<Integer>();
+ worker.setTasks(tasks);
+ }
+ tasks.add(task);
+
+ ret = taskContext.getWorkerToTaskNum().get(worker);
+ taskContext.getWorkerToTaskNum().put(worker, ++ret);
+ return ret;
+ }
+
+ /*
+ * Remove the worker from source worker pool, if the worker is assigned with enough tasks,
+ */
+ private Set<ResourceWorkerSlot> removeWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) {
+ Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
+
if (leftTaskNum <= 0) {
- if (taskNum == avgTaskNum) {
+ if (taskNum >= avgTaskNum) {
taskContext.getWorkerToTaskNum().remove(worker);
assignments.add(worker);
+ ret.add(worker);
}
} else {
- if (taskNum == (avgTaskNum + 1)) {
+ if (taskNum > avgTaskNum ) {
taskContext.getWorkerToTaskNum().remove(worker);
- leftTaskNum--;
+ leftTaskNum = leftTaskNum -(taskNum -avgTaskNum);
assignments.add(worker);
+ ret.add(worker);
}
if (leftTaskNum <= 0) {
- List<ResourceWorkerSlot> needDelete =
- new ArrayList<ResourceWorkerSlot>();
- for (Entry<ResourceWorkerSlot, Integer> entry : taskContext
- .getWorkerToTaskNum().entrySet()) {
+ List<ResourceWorkerSlot> needDelete = new ArrayList<ResourceWorkerSlot>();
+ for (Entry<ResourceWorkerSlot, Integer> entry : taskContext.getWorkerToTaskNum().entrySet()) {
if (entry.getValue() == avgTaskNum)
needDelete.add(entry.getKey());
}
for (ResourceWorkerSlot workerToDelete : needDelete) {
taskContext.getWorkerToTaskNum().remove(workerToDelete);
assignments.add(workerToDelete);
+ ret.add(workerToDelete);
}
}
}
- updateComponentsNumOfWorker(name, worker);
- }
-
- private int updateAssignedTasksOfWorker(Integer task,
- ResourceWorkerSlot worker) {
- int ret = 0;
- Set<Integer> tasks = worker.getTasks();
- if (tasks == null) {
- tasks = new HashSet<Integer>();
- worker.setTasks(tasks);
- }
- tasks.add(task);
-
- ret = taskContext.getWorkerToTaskNum().get(worker);
- taskContext.getWorkerToTaskNum().put(worker, ++ret);
return ret;
}
- private void updateComponentsNumOfWorker(String name,
- ResourceWorkerSlot worker) {
- Map<String, Integer> components =
- taskContext.getWorkerToComponentNum().get(worker);
+ private void updateComponentsNumOfWorker(String name, ResourceWorkerSlot worker) {
+ Map<String, Integer> components = taskContext.getWorkerToComponentNum().get(worker);
if (components == null) {
components = new HashMap<String, Integer>();
taskContext.getWorkerToComponentNum().put(worker, components);
@@ -308,11 +388,9 @@ public class TaskScheduler {
if (taskNum >= 0 && workerNum > 0) {
this.avgTaskNum = taskNum / workerNum;
this.leftTaskNum = taskNum % workerNum;
- LOG.debug("avgTaskNum=" + avgTaskNum + ", leftTaskNum="
- + leftTaskNum);
+ LOG.debug("avgTaskNum=" + avgTaskNum + ", leftTaskNum=" + leftTaskNum);
} else {
- LOG.debug("Illegal parameters, taskNum=" + taskNum + ", workerNum="
- + workerNum);
+ LOG.debug("Illegal parameters, taskNum=" + taskNum + ", workerNum=" + workerNum);
}
}
@@ -320,15 +398,12 @@ public class TaskScheduler {
List<ResourceWorkerSlot> workers = new ArrayList<ResourceWorkerSlot>();
workers.addAll(taskContext.getWorkerToTaskNum().keySet());
- for (Entry<String, List<ResourceWorkerSlot>> entry : taskContext
- .getSupervisorToWorker().entrySet()) {
+ for (Entry<String, List<ResourceWorkerSlot>> entry : taskContext.getSupervisorToWorker().entrySet()) {
if (taskContext.getComponentNumOnSupervisor(entry.getKey(), name) != 0)
workers.removeAll(entry.getValue());
}
if (workers.size() == 0)
- throw new FailedAssignTopologyException(
- "there's no enough supervisor for making component: "
- + name + " 's tasks on different node");
+ throw new FailedAssignTopologyException("there's no enough supervisor for making component: " + name + " 's tasks on different node");
return workers;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java
index c85d723..08c4730 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/WorkerScheduler.java
@@ -28,6 +28,7 @@ import backtype.storm.Config;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.client.WorkerAssignment;
+import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.TopologyAssignContext;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
@@ -35,353 +36,369 @@ import com.alibaba.jstorm.utils.NetWorkUtils;
public class WorkerScheduler {
- public static Logger LOG = LoggerFactory.getLogger(WorkerScheduler.class);
-
- private static WorkerScheduler instance;
-
- private WorkerScheduler() {
-
- }
-
- public static WorkerScheduler getInstance() {
- if (instance == null) {
- instance = new WorkerScheduler();
- }
- return instance;
- }
-
- public List<ResourceWorkerSlot> getAvailableWorkers(
- DefaultTopologyAssignContext context, Set<Integer> needAssign,
- int num) {
- int workersNum = getWorkersNum(context, num);
- if (workersNum == 0) {
- throw new FailedAssignTopologyException("there's no enough worker");
- }
- List<ResourceWorkerSlot> assignedWorkers =
- new ArrayList<ResourceWorkerSlot>();
- // userdefine assignments
- getRightWorkers(
- context,
- needAssign,
- assignedWorkers,
- workersNum,
- getUserDefineWorkers(context, ConfigExtension
- .getUserDefineAssignment(context.getStormConf())));
- // old assignments
- if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
- getRightWorkers(context, needAssign, assignedWorkers, workersNum,
- context.getOldWorkers());
- } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
- && context.isReassign() == false) {
- int cnt = 0;
- for (ResourceWorkerSlot worker : context.getOldWorkers()) {
- if (cnt < workersNum) {
- ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
- resFreeWorker.setPort(worker.getPort());
- resFreeWorker.setHostname(worker.getHostname());
- resFreeWorker.setNodeId(worker.getNodeId());
- assignedWorkers.add(resFreeWorker);
- cnt++;
- } else {
- break;
- }
- }
- }
- int defaultWorkerNum =
- Math.min(workersNum - assignedWorkers.size(), needAssign.size());
- LOG.info("Get workers from user define and old assignments: "
- + assignedWorkers);
- for (int i = 0; i < defaultWorkerNum; i++) {
- assignedWorkers.add(new ResourceWorkerSlot());
- }
- List<SupervisorInfo> isolationSupervisors =
- this.getIsolationSupervisors(context);
- if (isolationSupervisors.size() != 0) {
- putAllWorkerToSupervisor(assignedWorkers,
- getResAvailSupervisors(isolationSupervisors));
- } else {
- putAllWorkerToSupervisor(assignedWorkers,
- getResAvailSupervisors(context.getCluster()));
- }
- this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
- LOG.info("Assigned workers=" + assignedWorkers);
- return assignedWorkers;
- }
-
- private void setAllWorkerMemAndCpu(Map conf,
- List<ResourceWorkerSlot> assignedWorkers) {
- long defaultSize = ConfigExtension.getMemSizePerWorker(conf);
- int defaultCpu = ConfigExtension.getCpuSlotPerWorker(conf);
- for (ResourceWorkerSlot worker : assignedWorkers) {
- if (worker.getMemSize() <= 0)
- worker.setMemSize(defaultSize);
- if (worker.getCpu() <= 0)
- worker.setCpu(defaultCpu);
- }
- }
-
- private void putAllWorkerToSupervisor(
- List<ResourceWorkerSlot> assignedWorkers,
- List<SupervisorInfo> supervisors) {
- for (ResourceWorkerSlot worker : assignedWorkers) {
- if (worker.getHostname() != null) {
- for (SupervisorInfo supervisor : supervisors) {
- if (NetWorkUtils.equals(supervisor.getHostName(),
- worker.getHostname())
- && supervisor.getAvailableWorkerPorts().size() > 0) {
- putWorkerToSupervisor(supervisor, worker);
- break;
- }
- }
- }
- }
- supervisors = getResAvailSupervisors(supervisors);
- Collections.sort(supervisors, new Comparator<SupervisorInfo>() {
-
- @Override
- public int compare(SupervisorInfo o1, SupervisorInfo o2) {
- // TODO Auto-generated method stub
- return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2
- .getAvailableWorkerPorts().size());
- }
-
- });
- putWorkerToSupervisor(assignedWorkers, supervisors);
- }
-
- private void putWorkerToSupervisor(SupervisorInfo supervisor,
- ResourceWorkerSlot worker) {
- int port = worker.getPort();
- if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
- port = supervisor.getAvailableWorkerPorts().iterator().next();
- }
- worker.setPort(port);
- supervisor.getAvailableWorkerPorts().remove(port);
- worker.setNodeId(supervisor.getSupervisorId());
- }
-
- private void putWorkerToSupervisor(
- List<ResourceWorkerSlot> assignedWorkers,
- List<SupervisorInfo> supervisors) {
- int allUsedPorts = 0;
- for (SupervisorInfo supervisor : supervisors) {
- int supervisorUsedPorts = supervisor.getWorkerPorts().size()
- - supervisor.getAvailableWorkerPorts().size();
- allUsedPorts = allUsedPorts + supervisorUsedPorts;
- }
- // per supervisor should be allocated ports in theory
- int theoryAveragePorts =
- (allUsedPorts + assignedWorkers.size()) / supervisors.size()
- + 1;
- // supervisor which use more than theoryAveragePorts ports will be
- // pushed overLoadSupervisors
- List<SupervisorInfo> overLoadSupervisors =
- new ArrayList<SupervisorInfo>();
- int key = 0;
- Iterator<ResourceWorkerSlot> iterator = assignedWorkers.iterator();
- while (iterator.hasNext()) {
- if (supervisors.size() == 0)
- break;
- if (key >= supervisors.size())
- key = 0;
- SupervisorInfo supervisor = supervisors.get(key);
- int supervisorUsedPorts = supervisor.getWorkerPorts().size()
- - supervisor.getAvailableWorkerPorts().size();
- if (supervisorUsedPorts < theoryAveragePorts) {
- ResourceWorkerSlot worker = iterator.next();
- if (worker.getNodeId() != null)
- continue;
- worker.setHostname(supervisor.getHostName());
- worker.setNodeId(supervisor.getSupervisorId());
- worker.setPort(
- supervisor.getAvailableWorkerPorts().iterator().next());
- supervisor.getAvailableWorkerPorts().remove(worker.getPort());
- if (supervisor.getAvailableWorkerPorts().size() == 0)
- supervisors.remove(supervisor);
- key++;
- } else {
- overLoadSupervisors.add(supervisor);
- supervisors.remove(supervisor);
- }
- }
- // rest assignedWorkers will be allocate supervisor by deal
- Collections.sort(overLoadSupervisors, new Comparator<SupervisorInfo>() {
-
- @Override
- public int compare(SupervisorInfo o1, SupervisorInfo o2) {
- // TODO Auto-generated method stub
- return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(),
- o2.getAvailableWorkerPorts().size());
- }
-
- });
- key = 0;
- while (iterator.hasNext()) {
- if (overLoadSupervisors.size() == 0)
- break;
- if (key >= overLoadSupervisors.size())
- key = 0;
- ResourceWorkerSlot worker = iterator.next();
- if (worker.getNodeId() != null)
- continue;
- SupervisorInfo supervisor = overLoadSupervisors.get(key);
- worker.setHostname(supervisor.getHostName());
- worker.setNodeId(supervisor.getSupervisorId());
- worker.setPort(
- supervisor.getAvailableWorkerPorts().iterator().next());
- supervisor.getAvailableWorkerPorts().remove(worker.getPort());
- if (supervisor.getAvailableWorkerPorts().size() == 0)
- overLoadSupervisors.remove(supervisor);
- key++;
- }
- }
-
- private void getRightWorkers(DefaultTopologyAssignContext context,
- Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers,
- int workersNum, Collection<ResourceWorkerSlot> workers) {
- Set<Integer> assigned = new HashSet<Integer>();
- List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
- if (workers == null)
- return;
- for (ResourceWorkerSlot worker : workers) {
- boolean right = true;
- Set<Integer> tasks = worker.getTasks();
- if (tasks == null)
- continue;
- for (Integer task : tasks) {
- if (!needAssign.contains(task) || assigned.contains(task)) {
- right = false;
- break;
- }
- }
- if (right) {
- assigned.addAll(tasks);
- users.add(worker);
- }
- }
- if (users.size() + assignedWorkers.size() > workersNum) {
- return;
- }
-
- if (users.size() + assignedWorkers.size() == workersNum
- && assigned.size() != needAssign.size()) {
- return;
- }
- assignedWorkers.addAll(users);
- needAssign.removeAll(assigned);
- }
-
- private int getWorkersNum(DefaultTopologyAssignContext context,
- int workersNum) {
- Map<String, SupervisorInfo> supervisors = context.getCluster();
- List<SupervisorInfo> isolationSupervisors =
- this.getIsolationSupervisors(context);
- int slotNum = 0;
-
- if (isolationSupervisors.size() != 0) {
- for (SupervisorInfo superivsor : isolationSupervisors) {
- slotNum = slotNum + superivsor.getAvailableWorkerPorts().size();
- }
- return Math.min(slotNum, workersNum);
- }
- for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
- slotNum = slotNum + entry.getValue().getAvailableWorkerPorts().size();
- }
- return Math.min(slotNum, workersNum);
- }
-
- /**
- * @param context
- * @param workers
- * @return
- */
- private List<ResourceWorkerSlot> getUserDefineWorkers(
- DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
- List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
- if (workers == null)
- return ret;
- Map<String, List<Integer>> componentToTask =
- (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context
- .getComponentTasks()).clone();
- if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
- checkUserDefineWorkers(context, workers,
- context.getTaskToComponent());
- }
- for (WorkerAssignment worker : workers) {
- ResourceWorkerSlot workerSlot =
- new ResourceWorkerSlot(worker, componentToTask);
- if (workerSlot.getTasks().size() != 0) {
- ret.add(workerSlot);
- }
- }
- return ret;
- }
-
- private void checkUserDefineWorkers(DefaultTopologyAssignContext context,
- List<WorkerAssignment> workers, Map<Integer, String> taskToComponent) {
- Set<ResourceWorkerSlot> unstoppedWorkers =
- context.getUnstoppedWorkers();
- List<WorkerAssignment> re = new ArrayList<WorkerAssignment>();
- for (WorkerAssignment worker : workers) {
- for (ResourceWorkerSlot unstopped : unstoppedWorkers) {
- if (unstopped
- .compareToUserDefineWorker(worker, taskToComponent))
- re.add(worker);
- }
- }
- workers.removeAll(re);
-
- }
-
- private List<SupervisorInfo> getResAvailSupervisors(
- Map<String, SupervisorInfo> supervisors) {
- List<SupervisorInfo> availableSupervisors =
- new ArrayList<SupervisorInfo>();
- for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
- SupervisorInfo supervisor = entry.getValue();
- if (supervisor.getAvailableWorkerPorts().size() > 0)
- availableSupervisors.add(entry.getValue());
- }
- return availableSupervisors;
- }
-
- private List<SupervisorInfo> getResAvailSupervisors(
- List<SupervisorInfo> supervisors) {
- List<SupervisorInfo> availableSupervisors =
- new ArrayList<SupervisorInfo>();
- for (SupervisorInfo supervisor : supervisors) {
- if (supervisor.getAvailableWorkerPorts().size() > 0)
- availableSupervisors.add(supervisor);
- }
- return availableSupervisors;
- }
-
- private List<SupervisorInfo> getIsolationSupervisors(
- DefaultTopologyAssignContext context) {
- List<String> isolationHosts =
- (List<String>) context.getStormConf().get(
- Config.ISOLATION_SCHEDULER_MACHINES);
- LOG.info("Isolation machines: " + isolationHosts);
- if (isolationHosts == null)
- return new ArrayList<SupervisorInfo>();
- List<SupervisorInfo> isolationSupervisors =
- new ArrayList<SupervisorInfo>();
- for (Entry<String, SupervisorInfo> entry : context.getCluster()
- .entrySet()) {
- if (containTargetHost(isolationHosts, entry.getValue()
- .getHostName())) {
- isolationSupervisors.add(entry.getValue());
- }
- }
- return isolationSupervisors;
- }
-
- private boolean containTargetHost(Collection<String> hosts, String target) {
- for (String host : hosts) {
- if (NetWorkUtils.equals(host, target) == true) {
- return true;
- }
- }
- return false;
- }
+ public static Logger LOG = LoggerFactory.getLogger(WorkerScheduler.class);
+
+ private static WorkerScheduler instance;
+
+ private WorkerScheduler() {
+
+ }
+
+ public static WorkerScheduler getInstance() {
+ if (instance == null) {
+ instance = new WorkerScheduler();
+ }
+ return instance;
+ }
+
+ public List<ResourceWorkerSlot> getAvailableWorkers(
+ DefaultTopologyAssignContext context, Set<Integer> needAssign,
+ int allocWorkerNum) {
+ int workersNum = getAvailableWorkersNum(context);
+ if (workersNum < allocWorkerNum) {
+ throw new FailedAssignTopologyException(
+ "there's no enough worker. allocWorkerNum="
+ + allocWorkerNum + ", availableWorkerNum="
+ + workersNum);
+ }
+ workersNum = allocWorkerNum;
+ List<ResourceWorkerSlot> assignedWorkers = new ArrayList<ResourceWorkerSlot>();
+ // userdefine assignments, but dont't try to use custom scheduling for
+ // TM bolts now.
+ getRightWorkers(
+ context,
+ needAssign,
+ assignedWorkers,
+ workersNum,
+ getUserDefineWorkers(context, ConfigExtension
+ .getUserDefineAssignment(context.getStormConf())));
+
+ // old assignments
+ if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
+ getRightWorkers(context, needAssign, assignedWorkers, workersNum,
+ context.getOldWorkers());
+ } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
+ && context.isReassign() == false) {
+ int cnt = 0;
+ for (ResourceWorkerSlot worker : context.getOldWorkers()) {
+ if (cnt < workersNum) {
+ ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
+ resFreeWorker.setPort(worker.getPort());
+ resFreeWorker.setHostname(worker.getHostname());
+ resFreeWorker.setNodeId(worker.getNodeId());
+ assignedWorkers.add(resFreeWorker);
+ cnt++;
+ } else {
+ break;
+ }
+ }
+ }
+ // calculate rest TM bolts
+ int workersForSingleTM = 0;
+ if (context.getAssignSingleWorkerForTM()) {
+ for (Integer taskId : needAssign) {
+ String componentName = context.getTaskToComponent().get(taskId);
+ if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+ workersForSingleTM++;
+ }
+ }
+ }
+
+ LOG.info("Get workers from user define and old assignments: "
+ + assignedWorkers);
+
+ int restWokerNum = workersNum - assignedWorkers.size();
+ if (restWokerNum < 0)
+ throw new FailedAssignTopologyException(
+ "Too much workers are needed for user define or old assignments. workersNum="
+ + workersNum + ", assignedWokersNum="
+ + assignedWorkers.size());
+
+ for (int i = 0; i < restWokerNum; i++) {
+ assignedWorkers.add(new ResourceWorkerSlot());
+ }
+ List<SupervisorInfo> isolationSupervisors = this
+ .getIsolationSupervisors(context);
+ if (isolationSupervisors.size() != 0) {
+ putAllWorkerToSupervisor(assignedWorkers,
+ getResAvailSupervisors(isolationSupervisors));
+ } else {
+ putAllWorkerToSupervisor(assignedWorkers,
+ getResAvailSupervisors(context.getCluster()));
+ }
+ this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
+ LOG.info("Assigned workers=" + assignedWorkers);
+ return assignedWorkers;
+ }
+
+ private void setAllWorkerMemAndCpu(Map conf,
+ List<ResourceWorkerSlot> assignedWorkers) {
+ long defaultSize = ConfigExtension.getMemSizePerWorker(conf);
+ int defaultCpu = ConfigExtension.getCpuSlotPerWorker(conf);
+ for (ResourceWorkerSlot worker : assignedWorkers) {
+ if (worker.getMemSize() <= 0)
+ worker.setMemSize(defaultSize);
+ if (worker.getCpu() <= 0)
+ worker.setCpu(defaultCpu);
+ }
+ }
+
+ private void putAllWorkerToSupervisor(
+ List<ResourceWorkerSlot> assignedWorkers,
+ List<SupervisorInfo> supervisors) {
+ for (ResourceWorkerSlot worker : assignedWorkers) {
+ if (worker.getHostname() != null) {
+ for (SupervisorInfo supervisor : supervisors) {
+ if (NetWorkUtils.equals(supervisor.getHostName(),
+ worker.getHostname())
+ && supervisor.getAvailableWorkerPorts().size() > 0) {
+ putWorkerToSupervisor(supervisor, worker);
+ break;
+ }
+ }
+ }
+ }
+ supervisors = getResAvailSupervisors(supervisors);
+ Collections.sort(supervisors, new Comparator<SupervisorInfo>() {
+
+ @Override
+ public int compare(SupervisorInfo o1, SupervisorInfo o2) {
+ // TODO Auto-generated method stub
+ return -NumberUtils.compare(
+ o1.getAvailableWorkerPorts().size(), o2
+ .getAvailableWorkerPorts().size());
+ }
+
+ });
+ putWorkerToSupervisor(assignedWorkers, supervisors);
+ }
+
+ private void putWorkerToSupervisor(SupervisorInfo supervisor,
+ ResourceWorkerSlot worker) {
+ int port = worker.getPort();
+ if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
+ port = supervisor.getAvailableWorkerPorts().iterator().next();
+ }
+ worker.setPort(port);
+ supervisor.getAvailableWorkerPorts().remove(port);
+ worker.setNodeId(supervisor.getSupervisorId());
+ }
+
+ private void putWorkerToSupervisor(
+ List<ResourceWorkerSlot> assignedWorkers,
+ List<SupervisorInfo> supervisors) {
+ int allUsedPorts = 0;
+ for (SupervisorInfo supervisor : supervisors) {
+ int supervisorUsedPorts = supervisor.getWorkerPorts().size()
+ - supervisor.getAvailableWorkerPorts().size();
+ allUsedPorts = allUsedPorts + supervisorUsedPorts;
+ }
+ // per supervisor should be allocated ports in theory
+ int theoryAveragePorts = (allUsedPorts + assignedWorkers.size())
+ / supervisors.size() + 1;
+ // supervisor which use more than theoryAveragePorts ports will be
+ // pushed overLoadSupervisors
+ List<SupervisorInfo> overLoadSupervisors = new ArrayList<SupervisorInfo>();
+ int key = 0;
+ Iterator<ResourceWorkerSlot> iterator = assignedWorkers.iterator();
+ while (iterator.hasNext()) {
+ if (supervisors.size() == 0)
+ break;
+ if (key >= supervisors.size())
+ key = 0;
+ SupervisorInfo supervisor = supervisors.get(key);
+ int supervisorUsedPorts = supervisor.getWorkerPorts().size()
+ - supervisor.getAvailableWorkerPorts().size();
+ if (supervisorUsedPorts < theoryAveragePorts) {
+ ResourceWorkerSlot worker = iterator.next();
+ if (worker.getNodeId() != null)
+ continue;
+ worker.setHostname(supervisor.getHostName());
+ worker.setNodeId(supervisor.getSupervisorId());
+ worker.setPort(supervisor.getAvailableWorkerPorts().iterator()
+ .next());
+ supervisor.getAvailableWorkerPorts().remove(worker.getPort());
+ if (supervisor.getAvailableWorkerPorts().size() == 0)
+ supervisors.remove(supervisor);
+ key++;
+ } else {
+ overLoadSupervisors.add(supervisor);
+ supervisors.remove(supervisor);
+ }
+ }
+ // rest assignedWorkers will be allocate supervisor by deal
+ Collections.sort(overLoadSupervisors, new Comparator<SupervisorInfo>() {
+
+ @Override
+ public int compare(SupervisorInfo o1, SupervisorInfo o2) {
+ // TODO Auto-generated method stub
+ return -NumberUtils.compare(
+ o1.getAvailableWorkerPorts().size(), o2
+ .getAvailableWorkerPorts().size());
+ }
+
+ });
+ key = 0;
+ while (iterator.hasNext()) {
+ if (overLoadSupervisors.size() == 0)
+ break;
+ if (key >= overLoadSupervisors.size())
+ key = 0;
+ ResourceWorkerSlot worker = iterator.next();
+ if (worker.getNodeId() != null)
+ continue;
+ SupervisorInfo supervisor = overLoadSupervisors.get(key);
+ worker.setHostname(supervisor.getHostName());
+ worker.setNodeId(supervisor.getSupervisorId());
+ worker.setPort(supervisor.getAvailableWorkerPorts().iterator()
+ .next());
+ supervisor.getAvailableWorkerPorts().remove(worker.getPort());
+ if (supervisor.getAvailableWorkerPorts().size() == 0)
+ overLoadSupervisors.remove(supervisor);
+ key++;
+ }
+ }
+
+ private void getRightWorkers(DefaultTopologyAssignContext context,
+ Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers,
+ int workersNum, Collection<ResourceWorkerSlot> workers) {
+ Set<Integer> assigned = new HashSet<Integer>();
+ List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
+ if (workers == null)
+ return;
+ for (ResourceWorkerSlot worker : workers) {
+ boolean right = true;
+ Set<Integer> tasks = worker.getTasks();
+ if (tasks == null)
+ continue;
+ for (Integer task : tasks) {
+ if (!needAssign.contains(task) || assigned.contains(task)) {
+ right = false;
+ break;
+ }
+ }
+ if (right) {
+ assigned.addAll(tasks);
+ users.add(worker);
+ }
+ }
+ if (users.size() + assignedWorkers.size() > workersNum) {
+ LOG.warn(
+ "There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}",
+ users, assignedWorkers, workersNum);
+ return;
+ }
+
+ assignedWorkers.addAll(users);
+ needAssign.removeAll(assigned);
+ }
+
+ private int getAvailableWorkersNum(DefaultTopologyAssignContext context) {
+ Map<String, SupervisorInfo> supervisors = context.getCluster();
+ List<SupervisorInfo> isolationSupervisors = this
+ .getIsolationSupervisors(context);
+ int slotNum = 0;
+
+ if (isolationSupervisors.size() != 0) {
+ for (SupervisorInfo superivsor : isolationSupervisors) {
+ slotNum = slotNum + superivsor.getAvailableWorkerPorts().size();
+ }
+ } else {
+ for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
+ slotNum = slotNum
+ + entry.getValue().getAvailableWorkerPorts().size();
+ }
+ }
+ return slotNum;
+ }
+
+ /**
+ * @param context
+ * @param workers
+ * @return
+ */
+ private List<ResourceWorkerSlot> getUserDefineWorkers(
+ DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
+ List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
+ if (workers == null)
+ return ret;
+ Map<String, List<Integer>> componentToTask = (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context
+ .getComponentTasks()).clone();
+ if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
+ checkUserDefineWorkers(context, workers,
+ context.getTaskToComponent());
+ }
+ for (WorkerAssignment worker : workers) {
+ ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,
+ componentToTask);
+ if (workerSlot.getTasks().size() != 0) {
+ ret.add(workerSlot);
+ }
+ }
+ return ret;
+ }
+
+ private void checkUserDefineWorkers(DefaultTopologyAssignContext context,
+ List<WorkerAssignment> workers, Map<Integer, String> taskToComponent) {
+ Set<ResourceWorkerSlot> unstoppedWorkers = context
+ .getUnstoppedWorkers();
+ List<WorkerAssignment> re = new ArrayList<WorkerAssignment>();
+ for (WorkerAssignment worker : workers) {
+ for (ResourceWorkerSlot unstopped : unstoppedWorkers) {
+ if (unstopped
+ .compareToUserDefineWorker(worker, taskToComponent))
+ re.add(worker);
+ }
+ }
+ workers.removeAll(re);
+
+ }
+
+ private List<SupervisorInfo> getResAvailSupervisors(
+ Map<String, SupervisorInfo> supervisors) {
+ List<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>();
+ for (Entry<String, SupervisorInfo> entry : supervisors.entrySet()) {
+ SupervisorInfo supervisor = entry.getValue();
+ if (supervisor.getAvailableWorkerPorts().size() > 0)
+ availableSupervisors.add(entry.getValue());
+ }
+ return availableSupervisors;
+ }
+
+ private List<SupervisorInfo> getResAvailSupervisors(
+ List<SupervisorInfo> supervisors) {
+ List<SupervisorInfo> availableSupervisors = new ArrayList<SupervisorInfo>();
+ for (SupervisorInfo supervisor : supervisors) {
+ if (supervisor.getAvailableWorkerPorts().size() > 0)
+ availableSupervisors.add(supervisor);
+ }
+ return availableSupervisors;
+ }
+
+ private List<SupervisorInfo> getIsolationSupervisors(
+ DefaultTopologyAssignContext context) {
+ List<String> isolationHosts = (List<String>) context.getStormConf()
+ .get(Config.ISOLATION_SCHEDULER_MACHINES);
+ LOG.info("Isolation machines: " + isolationHosts);
+ if (isolationHosts == null)
+ return new ArrayList<SupervisorInfo>();
+ List<SupervisorInfo> isolationSupervisors = new ArrayList<SupervisorInfo>();
+ for (Entry<String, SupervisorInfo> entry : context.getCluster()
+ .entrySet()) {
+ if (containTargetHost(isolationHosts, entry.getValue()
+ .getHostName())) {
+ isolationSupervisors.add(entry.getValue());
+ }
+ }
+ return isolationSupervisors;
+ }
+
+ private boolean containTargetHost(Collection<String> hosts, String target) {
+ for (String host : hosts) {
+ if (NetWorkUtils.equals(host, target) == true) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
index 6481c5e..b0fdc92 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/Task.java
@@ -17,31 +17,20 @@
*/
package com.alibaba.jstorm.task;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
import backtype.storm.messaging.IContext;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.spout.ISpout;
import backtype.storm.task.IBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import clojure.lang.Atom;
-
import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormZkClusterState;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
@@ -55,29 +44,27 @@ import com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SingleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SpoutExecutors;
import com.alibaba.jstorm.task.group.MkGrouper;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
-import com.alibaba.jstorm.task.heartbeat.TaskStats;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.NetWorkUtils;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
/**
* Task instance
*
* @author yannian/Longda
- *
*/
-public class Task {
-
+public class Task implements Runnable{
private final static Logger LOG = LoggerFactory.getLogger(Task.class);
private Map<Object, Object> stormConf;
private TopologyContext topologyContext;
private TopologyContext userContext;
- private String topologyid;
private IContext context;
private TaskTransfer taskTransfer;
@@ -86,8 +73,9 @@ public class Task {
private Map<Integer, DisruptorQueue> deserializeQueues;
private AsyncLoopDefaultKill workHalt;
- private Integer taskid;
- private String componentid;
+ private String topologyId;
+ private Integer taskId;
+ private String componentId;
private volatile TaskStatus taskStatus;
private Atom openOrPrepareWasCalled;
// running time counter
@@ -97,63 +85,46 @@ public class Task {
private Object taskObj;
private TaskBaseMetric taskStats;
private WorkerData workerData;
- private String componentType; // "spout" or "bolt"
private TaskSendTargets taskSendTargets;
+ private TaskReportErrorAndDie reportErrorDie;
private boolean isTaskBatchTuple;
+ private TaskShutdownDameon taskShutdownDameon;
@SuppressWarnings("rawtypes")
public Task(WorkerData workerData, int taskId) throws Exception {
openOrPrepareWasCalled = new Atom(Boolean.valueOf(false));
this.workerData = workerData;
- this.topologyContext =
- workerData.getContextMaker().makeTopologyContext(
- workerData.getSysTopology(), taskId,
- openOrPrepareWasCalled);
- this.userContext =
- workerData.getContextMaker().makeTopologyContext(
- workerData.getRawTopology(), taskId,
- openOrPrepareWasCalled);
- this.taskid = taskId;
- this.componentid = topologyContext.getThisComponentId();
- this.stormConf =
- Common.component_conf(workerData.getStormConf(),
- topologyContext, componentid);
+ this.topologyContext = workerData.getContextMaker().makeTopologyContext(workerData.getSysTopology(), taskId, openOrPrepareWasCalled);
+ this.userContext = workerData.getContextMaker().makeTopologyContext(workerData.getRawTopology(), taskId, openOrPrepareWasCalled);
+ this.taskId = taskId;
+ this.componentId = topologyContext.getThisComponentId();
+ this.stormConf = Common.component_conf(workerData.getStormConf(), topologyContext, componentId);
this.taskStatus = new TaskStatus();
- this.taskStats = new TaskBaseMetric(taskId);
this.innerTaskTransfer = workerData.getInnerTaskTransfer();
this.deserializeQueues = workerData.getDeserializeQueues();
- this.topologyid = workerData.getTopologyId();
+ this.topologyId = workerData.getTopologyId();
this.context = workerData.getContext();
this.workHalt = workerData.getWorkHalt();
- this.zkCluster =
- new StormZkClusterState(workerData.getZkClusterstate());
+ this.zkCluster =workerData.getZkCluster();
+ this.taskStats = new TaskBaseMetric(topologyId, componentId, taskId,
+ ConfigExtension.isEnableMetrics(workerData.getStormConf()));
- LOG.info("Begin to deserialize taskObj " + componentid + ":" + taskid);
+ LOG.info("Begin to deserialize taskObj " + componentId + ":" + this.taskId);
WorkerClassLoader.switchThreadContext();
// get real task object -- spout/bolt/spoutspec
- this.taskObj =
- Common.get_task_object(topologyContext.getRawTopology(),
- componentid, WorkerClassLoader.getInstance());
+ this.taskObj = Common.get_task_object(topologyContext.getRawTopology(), componentId, WorkerClassLoader.getInstance());
WorkerClassLoader.restoreThreadContext();
isTaskBatchTuple = ConfigExtension.isTaskBatchTuple(stormConf);
LOG.info("Transfer/receive in batch mode :" + isTaskBatchTuple);
- LOG.info("Loading task " + componentid + ":" + taskid);
- }
-
- private void setComponentType() {
- if (taskObj instanceof IBolt) {
- componentType = "bolt";
- } else if (taskObj instanceof ISpout) {
- componentType = "spout";
- }
+ LOG.info("Loading task " + componentId + ":" + this.taskId);
}
private TaskSendTargets makeSendTargets() {
@@ -161,44 +132,20 @@ public class Task {
// get current task's output
// <Stream_id,<component, Grouping>>
- Map<String, Map<String, MkGrouper>> streamComponentGrouper =
- Common.outbound_components(topologyContext, workerData);
+ Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(topologyContext, workerData);
- return new TaskSendTargets(stormConf, component,
- streamComponentGrouper, topologyContext, taskStats);
+ return new TaskSendTargets(stormConf, component, streamComponentGrouper, topologyContext, taskStats);
}
private void updateSendTargets() {
if (taskSendTargets != null) {
- Map<String, Map<String, MkGrouper>> streamComponentGrouper =
- Common.outbound_components(topologyContext, workerData);
+ Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(topologyContext, workerData);
taskSendTargets.updateStreamCompGrouper(streamComponentGrouper);
} else {
LOG.error("taskSendTargets is null when trying to update it.");
}
}
- private TaskTransfer mkTaskSending(WorkerData workerData) {
-
- // sending tuple's serializer
- KryoTupleSerializer serializer =
- new KryoTupleSerializer(workerData.getStormConf(),
- topologyContext);
-
- String taskName = JStormServerUtils.getName(componentid, taskid);
- // Task sending all tuples through this Object
- TaskTransfer taskTransfer;
- if (isTaskBatchTuple)
- taskTransfer =
- new TaskBatchTransfer(this, taskName, serializer,
- taskStatus, workerData);
- else
- taskTransfer =
- new TaskTransfer(this, taskName, serializer, taskStatus,
- workerData);
- return taskTransfer;
- }
-
public TaskSendTargets echoToSystemBolt() {
// send "startup" tuple to system bolt
List<Object> msg = new ArrayList<Object>();
@@ -206,9 +153,7 @@ public class Task {
// create task receive object
TaskSendTargets sendTargets = makeSendTargets();
- UnanchoredSend.send(topologyContext, sendTargets, taskTransfer,
- Common.SYSTEM_STREAM_ID, msg);
-
+ UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Common.SYSTEM_STREAM_ID, msg);
return sendTargets;
}
@@ -217,102 +162,90 @@ public class Task {
if (isOnePending == true) {
return true;
}
-
return ConfigExtension.isSpoutSingleThread(conf);
}
- public RunnableCallback mk_executors(TaskSendTargets sendTargets,
- ITaskReportErr report_error) {
+ public BaseExecutors mkExecutor() {
+ BaseExecutors baseExecutor = null;
if (taskObj instanceof IBolt) {
- return new BoltExecutors(this, (IBolt) taskObj, taskTransfer,
- innerTaskTransfer, stormConf, sendTargets, taskStatus,
- topologyContext, userContext, taskStats, report_error);
+ baseExecutor = new BoltExecutors(this);
} else if (taskObj instanceof ISpout) {
if (isSingleThread(stormConf) == true) {
- return new SingleThreadSpoutExecutors(this, (ISpout) taskObj,
- taskTransfer, innerTaskTransfer, stormConf,
- sendTargets, taskStatus, topologyContext, userContext,
- taskStats, report_error);
+ baseExecutor = new SingleThreadSpoutExecutors(this);
} else {
- return new MultipleThreadSpoutExecutors(this, (ISpout) taskObj,
- taskTransfer, innerTaskTransfer, stormConf,
- sendTargets, taskStatus, topologyContext, userContext,
- taskStats, report_error);
+ baseExecutor = new MultipleThreadSpoutExecutors(this);
}
}
-
- return null;
+
+ return baseExecutor;
}
/**
* create executor to receive tuples and run bolt/spout execute function
- *
- * @param puller
- * @param sendTargets
- * @return
*/
- private RunnableCallback mkExecutor(TaskSendTargets sendTargets) {
+ private RunnableCallback prepareExecutor() {
// create report error callback,
// in fact it is storm_cluster.report-task-error
- ITaskReportErr reportError =
- new TaskReportError(zkCluster, topologyid, taskid);
+ ITaskReportErr reportError = new TaskReportError(zkCluster, topologyId, taskId);
// report error and halt worker
- TaskReportErrorAndDie reportErrorDie =
- new TaskReportErrorAndDie(reportError, workHalt);
+ reportErrorDie = new TaskReportErrorAndDie(reportError, workHalt);
+
+ final BaseExecutors baseExecutor = mkExecutor();
- return mk_executors(sendTargets, reportErrorDie);
+ return baseExecutor;
}
public TaskReceiver mkTaskReceiver() {
- String taskName = JStormServerUtils.getName(componentid, taskid);
- TaskReceiver taskReceiver;
+ String taskName = JStormServerUtils.getName(componentId, taskId);
if (isTaskBatchTuple)
- taskReceiver =
- new TaskBatchReceiver(this, taskid, stormConf,
- topologyContext, innerTaskTransfer, taskStatus,
- taskName);
+ taskReceiver = new TaskBatchReceiver(this, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
else
- taskReceiver =
- new TaskReceiver(this, taskid, stormConf, topologyContext,
- innerTaskTransfer, taskStatus, taskName);
- deserializeQueues.put(taskid, taskReceiver.getDeserializeQueue());
+ taskReceiver = new TaskReceiver(this, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
+ deserializeQueues.put(taskId, taskReceiver.getDeserializeQueue());
return taskReceiver;
}
public TaskShutdownDameon execute() throws Exception {
- setComponentType();
taskSendTargets = echoToSystemBolt();
// create thread to get tuple from zeroMQ,
// and pass the tuple to bolt/spout
taskTransfer = mkTaskSending(workerData);
- RunnableCallback baseExecutor = mkExecutor(taskSendTargets);
- AsyncLoopThread executor_threads =
- new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY,
- true);
+ RunnableCallback baseExecutor = prepareExecutor();
+ AsyncLoopThread executor_threads = new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY, true);
taskReceiver = mkTaskReceiver();
List<AsyncLoopThread> allThreads = new ArrayList<AsyncLoopThread>();
allThreads.add(executor_threads);
- TaskHeartbeatRunable.registerTaskStats(taskid, new TaskStats(
- componentType, taskStats));
- LOG.info("Finished loading task " + componentid + ":" + taskid);
+ LOG.info("Finished loading task " + componentId + ":" + taskId);
- return getShutdown(allThreads, taskReceiver.getDeserializeQueue(),
+ taskShutdownDameon = getShutdown(allThreads, taskReceiver.getDeserializeQueue(),
baseExecutor);
+ return taskShutdownDameon;
}
- public TaskShutdownDameon getShutdown(List<AsyncLoopThread> allThreads,
- DisruptorQueue deserializeQueue, RunnableCallback baseExecutor) {
+ private TaskTransfer mkTaskSending(WorkerData workerData) {
+ // sending tuple's serializer
+ KryoTupleSerializer serializer = new KryoTupleSerializer(workerData.getStormConf(), topologyContext);
+ String taskName = JStormServerUtils.getName(componentId, taskId);
+ // Task sending all tuples through this Object
+ TaskTransfer taskTransfer;
+ if (isTaskBatchTuple)
+ taskTransfer = new TaskBatchTransfer(this, taskName, serializer, taskStatus, workerData);
+ else
+ taskTransfer = new TaskTransfer(this, taskName, serializer, taskStatus, workerData);
+ return taskTransfer;
+ }
+
+ public TaskShutdownDameon getShutdown(List<AsyncLoopThread> allThreads, DisruptorQueue deserializeQueue, RunnableCallback baseExecutor) {
AsyncLoopThread ackerThread = null;
if (baseExecutor instanceof SpoutExecutors) {
- ackerThread =
- ((SpoutExecutors) baseExecutor).getAckerRunnableThread();
+ ackerThread = ((SpoutExecutors) baseExecutor).getAckerRunnableThread();
if (ackerThread != null) {
allThreads.add(ackerThread);
@@ -324,24 +257,30 @@ public class Task {
AsyncLoopThread serializeThread = taskTransfer.getSerializeThread();
allThreads.add(serializeThread);
- TaskShutdownDameon shutdown =
- new TaskShutdownDameon(taskStatus, topologyid, taskid,
- allThreads, zkCluster, taskObj);
+ TaskShutdownDameon shutdown = new TaskShutdownDameon(taskStatus, topologyId, taskId, allThreads, zkCluster, taskObj, this);
return shutdown;
}
- public static TaskShutdownDameon mk_task(WorkerData workerData, int taskId)
- throws Exception {
+ public TaskShutdownDameon getTaskShutdownDameon(){
+ return taskShutdownDameon;
+ }
- Task t = new Task(workerData, taskId);
+ public void run(){
+ try {
+ taskShutdownDameon=this.execute();
+ }catch (Throwable e){
+ LOG.error("init task take error", e);
+ }
+ }
+ public static TaskShutdownDameon mk_task(WorkerData workerData, int taskId) throws Exception {
+ Task t = new Task(workerData, taskId);
return t.execute();
}
/**
- * Update the data which can be changed dynamically e.g. when scale-out of a
- * task parallelism
+ * Update the data which can be changed dynamically e.g. when scale-out of a task parallelism
*/
public void updateTaskData() {
// Only update the local task list of topologyContext here. Because
@@ -359,12 +298,94 @@ public class Task {
public long getWorkerAssignmentTs() {
return workerData.getAssignmentTs();
}
-
+
public AssignmentType getWorkerAssignmentType() {
return workerData.getAssignmentType();
}
public void unregisterDeserializeQueue() {
- deserializeQueues.remove(taskid);
+ deserializeQueues.remove(taskId);
+ }
+
+ public String getComponentId() {
+ return componentId;
+ }
+
+ public Integer getTaskId() {
+ return taskId;
+ }
+
+ public DisruptorQueue getExecuteQueue() {
+ return innerTaskTransfer.get(taskId);
+ }
+
+ public DisruptorQueue getDeserializeQueue() {
+ return deserializeQueues.get(taskId);
}
+
+ public Map<Object, Object> getStormConf() {
+ return stormConf;
+ }
+
+ public TopologyContext getTopologyContext() {
+ return topologyContext;
+ }
+
+ public TopologyContext getUserContext() {
+ return userContext;
+ }
+
+ public TaskTransfer getTaskTransfer() {
+ return taskTransfer;
+ }
+
+ public TaskReceiver getTaskReceiver() {
+ return taskReceiver;
+ }
+
+ public Map<Integer, DisruptorQueue> getInnerTaskTransfer() {
+ return innerTaskTransfer;
+ }
+
+ public Map<Integer, DisruptorQueue> getDeserializeQueues() {
+ return deserializeQueues;
+ }
+
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ public TaskStatus getTaskStatus() {
+ return taskStatus;
+ }
+
+ public StormClusterState getZkCluster() {
+ return zkCluster;
+ }
+
+ public Object getTaskObj() {
+ return taskObj;
+ }
+
+ public TaskBaseMetric getTaskStats() {
+ return taskStats;
+ }
+
+ public WorkerData getWorkerData() {
+ return workerData;
+ }
+
+ public TaskSendTargets getTaskSendTargets() {
+ return taskSendTargets;
+ }
+
+ public TaskReportErrorAndDie getReportErrorDie() {
+ return reportErrorDie;
+ }
+
+ public boolean isTaskBatchTuple() {
+ return isTaskBatchTuple;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java
index 4c9eb0b..84c6151 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/TaskBaseMetric.java
@@ -17,116 +17,110 @@
*/
package com.alibaba.jstorm.task;
-import java.io.Serializable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.common.metric.MetricRegistry;
-import com.alibaba.jstorm.common.metric.window.Metric;
+import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class TaskBaseMetric implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(TaskBaseMetric.class);
+ private static final Logger logger = LoggerFactory.getLogger(JStormMetrics.class);
+
private static final long serialVersionUID = -7157987126460293444L;
- protected MetricRegistry metrics;
+ private String topologyId;
+ private String componentId;
private int taskId;
+ private boolean enableMetrics;
- public TaskBaseMetric(int taskId) {
- metrics = JStormMetrics.registerTask(taskId);
+ /**
+ * local metric name cache to avoid frequent metric name concatenation streamId + name ==> full metric name
+ */
+ private static final ConcurrentMap<String, AsmMetric> metricCache = new ConcurrentHashMap<String, AsmMetric>();
+
+ public TaskBaseMetric(String topologyId, String componentId, int taskId, boolean enableMetrics) {
+ this.topologyId = topologyId;
+ this.componentId = componentId;
this.taskId = taskId;
+ this.enableMetrics = enableMetrics;
+ logger.info("init task base metric, tp id:{}, comp id:{}, task id:{}", topologyId, componentId, taskId);
}
- public void update(String name, Number value, int type) {
- Metric metric = metrics.getMetric(name);
- if (metric == null) {
- metric = JStormMetrics.Builder.mkInstance(type);
- try {
- /**
- * Here use one hack method to handle competition register metric
- * if duplicated metric, just skip it.
- *
- * this will improve performance
- */
- JStormMetrics.registerTaskMetric(metric, taskId, name);
- }catch(Exception e) {
- LOG.warn("Duplicated metrics of {}, taskId:{}", name, taskId);
- return ;
+ public void update(final String streamId, final String name, final Number value, final MetricType metricType,
+ boolean mergeTopology) {
+ String key = taskId + streamId + name;
+ AsmMetric existingMetric = metricCache.get(key);
+ if (existingMetric == null) {
+ String fullName = MetricUtils.streamMetricName(topologyId, componentId, taskId, streamId, name, metricType);
+ existingMetric = JStormMetrics.getStreamMetric(fullName);
+ if (existingMetric == null) {
+ existingMetric = AsmMetric.Builder.build(metricType);
+ JStormMetrics.registerStreamMetric(fullName, existingMetric, mergeTopology);
}
-
+ metricCache.putIfAbsent(key, existingMetric);
}
- metric.update(value);
+
+ existingMetric.update(value);
+ }
+
+ public void update(final String streamId, final String name, final Number value, final MetricType metricType) {
+ update(streamId, name, value, metricType, true);
}
public void send_tuple(String stream, int num_out_tasks) {
- if (num_out_tasks <= 0) {
- return;
+ if (enableMetrics && num_out_tasks > 0) {
+ update(stream, MetricDef.EMMITTED_NUM, num_out_tasks, MetricType.COUNTER);
+ update(stream, MetricDef.SEND_TPS, num_out_tasks, MetricType.METER);
}
-
- String emmitedName =
- MetricRegistry.name(MetricDef.EMMITTED_NUM, stream);
- update(emmitedName, Double.valueOf(num_out_tasks),
- JStormMetrics.Builder.COUNTER);
-
- String sendTpsName = MetricRegistry.name(MetricDef.SEND_TPS, stream);
- update(sendTpsName, Double.valueOf(num_out_tasks),
- JStormMetrics.Builder.METER);
}
public void recv_tuple(String component, String stream) {
-
- String name =
- MetricRegistry.name(MetricDef.RECV_TPS, component, stream);
- update(name, Double.valueOf(1), JStormMetrics.Builder.METER);
-
+ if (enableMetrics) {
+ update(stream, AsmMetric.mkName(component, MetricDef.RECV_TPS), 1, MetricType.METER);
+// update(stream, MetricDef.RECV_TPS, 1, MetricType.METER);
+ }
}
- public void bolt_acked_tuple(String component, String stream,
- Double latency_ms) {
+ public void bolt_acked_tuple(String component, String stream, Long latency, Long lifeCycle) {
+ if (enableMetrics) {
+// update(stream, AsmMetric.mkName(component, MetricDef.ACKED_NUM), 1, MetricType.COUNTER);
+// update(stream, AsmMetric.mkName(component, MetricDef.PROCESS_LATENCY), latency_ms, MetricType.HISTOGRAM);
+ update(stream, MetricDef.ACKED_NUM, 1, MetricType.COUNTER);
+ update(stream, MetricDef.PROCESS_LATENCY, latency, MetricType.HISTOGRAM, false);
- if (latency_ms == null) {
- return;
+ if (lifeCycle > 0) {
+ update(stream, AsmMetric.mkName(component, MetricDef.TUPLE_LIEF_CYCLE), lifeCycle, MetricType.HISTOGRAM, false);
+ }
}
-
- String ackNumName =
- MetricRegistry.name(MetricDef.ACKED_NUM, component, stream);
- update(ackNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER);
-
- String processName =
- MetricRegistry.name(MetricDef.PROCESS_LATENCY, component,
- stream);
- update(processName, latency_ms,
- JStormMetrics.Builder.HISTOGRAM);
}
public void bolt_failed_tuple(String component, String stream) {
-
- String failNumName =
- MetricRegistry.name(MetricDef.FAILED_NUM, component, stream);
- update(failNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER);
+ if (enableMetrics) {
+ //update(stream, AsmMetric.mkName(component, MetricDef.FAILED_NUM), 1, MetricType.COUNTER);
+ update(stream, MetricDef.FAILED_NUM, 1, MetricType.COUNTER);
+ }
}
- public void spout_acked_tuple(String stream, long st) {
-
- String ackNumName =
- MetricRegistry.name(MetricDef.ACKED_NUM,
- Common.ACKER_COMPONENT_ID, stream);
- update(ackNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER);
-
- String processName =
- MetricRegistry.name(MetricDef.PROCESS_LATENCY,
- Common.ACKER_COMPONENT_ID, stream);
- update(processName, Double.valueOf(st), JStormMetrics.Builder.HISTOGRAM);
+ public void spout_acked_tuple(String stream, long st, Long lifeCycle) {
+ if (enableMetrics) {
+ update(stream, MetricDef.ACKED_NUM, 1, MetricType.COUNTER);
+ update(stream, MetricDef.PROCESS_LATENCY, st, MetricType.HISTOGRAM, true);
+ if (lifeCycle > 0) {
+ update(stream, AsmMetric.mkName(Common.ACKER_COMPONENT_ID, MetricDef.TUPLE_LIEF_CYCLE), lifeCycle, MetricType.HISTOGRAM, false);
+ }
+ }
}
public void spout_failed_tuple(String stream) {
- String failNumName =
- MetricRegistry.name(MetricDef.FAILED_NUM,
- Common.ACKER_COMPONENT_ID, stream);
- update(failNumName, Double.valueOf(1), JStormMetrics.Builder.COUNTER);
-
+ if (enableMetrics) {
+ update(stream, MetricDef.FAILED_NUM, 1, MetricType.COUNTER);
+ }
}
}