You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/09 01:43:37 UTC

[3/5] storm git commit: STORM-2503: Have a consistent ordering of executors

STORM-2503:  Have a consistent ordering of executors


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0a39ad8a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0a39ad8a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0a39ad8a

Branch: refs/heads/1.1.x-branch
Commit: 0a39ad8a5d49fc085ec7f95e49ae218fc8ec5059
Parents: 0943bb6
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jun 23 10:34:21 2017 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Jul 9 10:31:29 2017 +0900

----------------------------------------------------------------------
 .../scheduling/DefaultResourceAwareStrategy.java | 19 ++++++++++++++-----
 .../TestDefaultResourceAwareStrategy.java        |  4 ++--
 2 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0a39ad8a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 5f81a72..50f0830 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -71,7 +71,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             LOG.warn("No available nodes to schedule tasks on!");
             return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
         }
-        Collection<ExecutorDetails> unassignedExecutors = new HashSet<ExecutorDetails>(_cluster.getUnassignedExecutors(td));
+        Collection<ExecutorDetails> unassignedExecutors = new HashSet<>(_cluster.getUnassignedExecutors(td));
         Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
         LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
         Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
@@ -527,7 +527,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
      * @return a sorted set of components
      */
     private Set<Component> sortComponents(final Map<String, Component> componentMap) {
-        Set<Component> sortedComponents = new TreeSet<Component>(new Comparator<Component>() {
+        Set<Component> sortedComponents = new TreeSet<>(new Comparator<Component>() {
             @Override
             public int compare(Component o1, Component o2) {
                 int connections1 = 0;
@@ -580,6 +580,13 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         return sortedComponents;
     }
 
+    private static Comparator<ExecutorDetails> ORDER_EXEC_BY_IDS = new Comparator<ExecutorDetails>() {
+        @Override
+        public int compare(ExecutorDetails a, ExecutorDetails b) {
+            return Integer.compare(a.getStartTask(), b.getStartTask());
+        }
+    };
+
     /**
      * Order executors based on how many in and out connections it will potentially need to make, in descending order.
      * First order components by the number of in and out connections it will have.  Then iterate through the sorted list of components.
@@ -597,7 +604,9 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
         for (Component component : componentMap.values()) {
             compToExecsToSchedule.put(component.id, new LinkedList<ExecutorDetails>());
-            for (ExecutorDetails exec : component.execs) {
+            List<ExecutorDetails> sortedExecs = new ArrayList<>(component.execs);
+            Collections.sort(sortedExecs, ORDER_EXEC_BY_IDS);
+            for (ExecutorDetails exec : sortedExecs) {
                 if (unassignedExecutors.contains(exec)) {
                     compToExecsToSchedule.get(component.id).add(exec);
                 }
@@ -608,14 +617,14 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         sortedComponents.addAll(componentMap.values());
 
         for (Component currComp : sortedComponents) {
-            Map<String, Component> neighbors = new HashMap<String, Component>();
+            Map<String, Component> neighbors = new HashMap<>();
             for (String compId : (List<String>) ListUtils.union(currComp.children, currComp.parents)) {
                 neighbors.put(compId, componentMap.get(compId));
             }
             Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
             Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.id);
 
-            boolean flag = false;
+            boolean flag;
             do {
                 flag = false;
                 if (!currCompExesToSched.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/0a39ad8a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index c17932e..537a89e 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -116,11 +116,11 @@ public class TestDefaultResourceAwareStrategy {
         expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(0, 0)))); //Spout
         expectedScheduling.add(new HashSet<>(Arrays.asList(
             new ExecutorDetails(2, 2), //bolt-1
-            new ExecutorDetails(3, 3), //bolt-2
+            new ExecutorDetails(4, 4), //bolt-2
             new ExecutorDetails(6, 6)))); //bolt-3
         expectedScheduling.add(new HashSet<>(Arrays.asList(
             new ExecutorDetails(1, 1), //bolt-1
-            new ExecutorDetails(4, 4), //bolt-2
+            new ExecutorDetails(3, 3), //bolt-2
             new ExecutorDetails(5, 5)))); //bolt-3
         HashSet<HashSet<ExecutorDetails>> foundScheduling = new HashSet<>();
         SchedulerAssignment assignment = cluster.getAssignmentById("testTopology-id");