You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/09 01:43:37 UTC
[3/5] storm git commit: STORM-2503: Have a consistent ordering of
executors
STORM-2503: Have a consistent ordering of executors
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0a39ad8a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0a39ad8a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0a39ad8a
Branch: refs/heads/1.1.x-branch
Commit: 0a39ad8a5d49fc085ec7f95e49ae218fc8ec5059
Parents: 0943bb6
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jun 23 10:34:21 2017 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Jul 9 10:31:29 2017 +0900
----------------------------------------------------------------------
.../scheduling/DefaultResourceAwareStrategy.java | 19 ++++++++++++++-----
.../TestDefaultResourceAwareStrategy.java | 4 ++--
2 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0a39ad8a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 5f81a72..50f0830 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -71,7 +71,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
LOG.warn("No available nodes to schedule tasks on!");
return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
}
- Collection<ExecutorDetails> unassignedExecutors = new HashSet<ExecutorDetails>(_cluster.getUnassignedExecutors(td));
+ Collection<ExecutorDetails> unassignedExecutors = new HashSet<>(_cluster.getUnassignedExecutors(td));
Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
@@ -527,7 +527,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
* @return a sorted set of components
*/
private Set<Component> sortComponents(final Map<String, Component> componentMap) {
- Set<Component> sortedComponents = new TreeSet<Component>(new Comparator<Component>() {
+ Set<Component> sortedComponents = new TreeSet<>(new Comparator<Component>() {
@Override
public int compare(Component o1, Component o2) {
int connections1 = 0;
@@ -580,6 +580,13 @@ public class DefaultResourceAwareStrategy implements IStrategy {
return sortedComponents;
}
+ private static Comparator<ExecutorDetails> ORDER_EXEC_BY_IDS = new Comparator<ExecutorDetails>() {
+ @Override
+ public int compare(ExecutorDetails a, ExecutorDetails b) {
+ return Integer.compare(a.getStartTask(), b.getStartTask());
+ }
+ };
+
/**
* Order executors based on how many in and out connections it will potentially need to make, in descending order.
* First order components by the number of in and out connections it will have. Then iterate through the sorted list of components.
@@ -597,7 +604,9 @@ public class DefaultResourceAwareStrategy implements IStrategy {
Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
for (Component component : componentMap.values()) {
compToExecsToSchedule.put(component.id, new LinkedList<ExecutorDetails>());
- for (ExecutorDetails exec : component.execs) {
+ List<ExecutorDetails> sortedExecs = new ArrayList<>(component.execs);
+ Collections.sort(sortedExecs, ORDER_EXEC_BY_IDS);
+ for (ExecutorDetails exec : sortedExecs) {
if (unassignedExecutors.contains(exec)) {
compToExecsToSchedule.get(component.id).add(exec);
}
@@ -608,14 +617,14 @@ public class DefaultResourceAwareStrategy implements IStrategy {
sortedComponents.addAll(componentMap.values());
for (Component currComp : sortedComponents) {
- Map<String, Component> neighbors = new HashMap<String, Component>();
+ Map<String, Component> neighbors = new HashMap<>();
for (String compId : (List<String>) ListUtils.union(currComp.children, currComp.parents)) {
neighbors.put(compId, componentMap.get(compId));
}
Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.id);
- boolean flag = false;
+ boolean flag;
do {
flag = false;
if (!currCompExesToSched.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/0a39ad8a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index c17932e..537a89e 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -116,11 +116,11 @@ public class TestDefaultResourceAwareStrategy {
expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(0, 0)))); //Spout
expectedScheduling.add(new HashSet<>(Arrays.asList(
new ExecutorDetails(2, 2), //bolt-1
- new ExecutorDetails(3, 3), //bolt-2
+ new ExecutorDetails(4, 4), //bolt-2
new ExecutorDetails(6, 6)))); //bolt-3
expectedScheduling.add(new HashSet<>(Arrays.asList(
new ExecutorDetails(1, 1), //bolt-1
- new ExecutorDetails(4, 4), //bolt-2
+ new ExecutorDetails(3, 3), //bolt-2
new ExecutorDetails(5, 5)))); //bolt-3
HashSet<HashSet<ExecutorDetails>> foundScheduling = new HashSet<>();
SchedulerAssignment assignment = cluster.getAssignmentById("testTopology-id");