You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/16 19:21:45 UTC

[05/10] storm git commit: fix nimbus test failure

fix nimbus test failure

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/defcb960
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/defcb960
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/defcb960

Branch: refs/heads/master
Commit: defcb9601d8f4d287fa4f1d6de7ee43d8183b137
Parents: 72d409c
Author: Xin Wang <be...@163.com>
Authored: Sun Feb 21 17:20:09 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Feb 21 17:20:09 2016 +0800

----------------------------------------------------------------------
 .../apache/storm/scheduler/EvenScheduler.java   | 29 +++++++++++------
 .../clj/org/apache/storm/scheduler_test.clj     | 34 +++++---------------
 2 files changed, 28 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/defcb960/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
index 2e8565b..d91e187 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
@@ -22,10 +22,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -38,16 +36,29 @@ import com.google.common.collect.Sets;
 public class EvenScheduler implements IScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(EvenScheduler.class);
 
-    public static List<WorkerSlot> sortSlots(List<WorkerSlot> availableSlots, Cluster cluster) {
+    public static List<WorkerSlot> sortSlots(List<WorkerSlot> availableSlots) {
+        //For example, we have a three nodes(supervisor1, supervisor2, supervisor3) cluster:
+        //slots before sort:
+        //supervisor1:6700, supervisor1:6701,
+        //supervisor2:6700, supervisor2:6701, supervisor2:6702,
+        //supervisor3:6700, supervisor3:6703, supervisor3:6702, supervisor3:6701
+        //slots after sort:
+        //supervisor3:6700, supervisor2:6700, supervisor1:6700,
+        //supervisor3:6701, supervisor2:6701, supervisor1:6701,
+        //supervisor3:6702, supervisor2:6702,
+        //supervisor3:6703
+
         if (availableSlots != null && availableSlots.size() > 0) {
             // group by node
             Map<String, List<WorkerSlot>> slotGroups = new TreeMap<String, List<WorkerSlot>>();
             for (WorkerSlot slot : availableSlots) {
-                String host = cluster.getHost(slot.getNodeId());
-                List<WorkerSlot> slots = slotGroups.get(host);
-                if (slots == null) {
-                    slots = new ArrayList<WorkerSlot>();
-                    slotGroups.put(host, slots);
+                String node = slot.getNodeId();
+                List<WorkerSlot> slots = null;
+                if(slotGroups.containsKey(node)){
+                   slots = slotGroups.get(node);
+                }else{
+                   slots = new ArrayList<WorkerSlot>();
+                   slotGroups.put(node, slots);
                 }
                 slots.add(slot);
             }
@@ -93,7 +104,7 @@ public class EvenScheduler implements IScheduler {
         Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
         int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size());
 
-        List<WorkerSlot> sortedList = sortSlots(availableSlots, cluster);
+        List<WorkerSlot> sortedList = sortSlots(availableSlots);
         if (sortedList == null || sortedList.size() < (totalSlotsToUse - aliveAssigned.size())) {
             LOG.error("Available slots are not enough for topology: {}", topology.getName());
             return new HashMap<ExecutorDetails, WorkerSlot>();

http://git-wip-us.apache.org/repos/asf/storm/blob/defcb960/storm-core/test/clj/org/apache/storm/scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
index b14af71..0d74daf 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
@@ -261,34 +261,16 @@
     ))
 
 (deftest test-sort-slots
-  (let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 6700 6701)))
-        supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 6700 6701 6702)))
-        supervisor3 (SupervisorDetails. "supervisor3" "192.168.0.3" (list ) (map int (list 6700 6701 6702 6703)))
-        assignment1 (SchedulerAssignmentImpl. "topology1" nil)
-        assignment2 (SchedulerAssignmentImpl. "topology2" nil)
-        supervisor1-slot0 (WorkerSlot. "supervisor1" 6700)
-        supervisor1-slot1 (WorkerSlot. "supervisor1" 6701)
-        supervisor2-slot0 (WorkerSlot. "supervisor2" 6700)
-        supervisor2-slot1 (WorkerSlot. "supervisor2" 6701)
-        supervisor2-slot2 (WorkerSlot. "supervisor2" 6702)
-        supervisor3-slot0 (WorkerSlot. "supervisor3" 6700)
-        supervisor3-slot1 (WorkerSlot. "supervisor3" 6701)
-        supervisor3-slot2 (WorkerSlot. "supervisor3" 6702)
-        supervisor3-slot3 (WorkerSlot. "supervisor3" 6703)
-        cluster (Cluster. (nimbus/standalone-nimbus)
-                          {"supervisor1" supervisor1 "supervisor2" supervisor2 "supervisor3" supervisor3}
-                          {"topology1" assignment1 "topology2" assignment2}
-                  nil)]
   ;; test supervisor2 has more free slots
   (is (= "[supervisor2:6700, supervisor1:6700, supervisor2:6701, supervisor1:6701, supervisor2:6702]"
-         (.toString (EvenScheduler/sortSlots [supervisor1-slot0 supervisor1-slot1
-                      supervisor2-slot0 supervisor2-slot1 supervisor2-slot2
-                      ] cluster))))
+         (.toString (EvenScheduler/sortSlots [(WorkerSlot. "supervisor1" 6700) (WorkerSlot. "supervisor1" 6701)
+                      (WorkerSlot. "supervisor2" 6700) (WorkerSlot. "supervisor2" 6701) (WorkerSlot. "supervisor2" 6702)
+                      ]))))
   ;; test supervisor3 has more free slots
   (is (= "[supervisor3:6700, supervisor2:6700, supervisor1:6700, supervisor3:6701, supervisor2:6701, supervisor1:6701, supervisor3:6702, supervisor2:6702, supervisor3:6703]"
-         (.toString (EvenScheduler/sortSlots [supervisor1-slot0 supervisor1-slot1
-                      supervisor2-slot0 supervisor2-slot1 supervisor2-slot2
-                      supervisor3-slot0 supervisor3-slot3 supervisor3-slot2 supervisor3-slot1
-                      ] cluster))))
-    ))
+         (.toString (EvenScheduler/sortSlots [(WorkerSlot. "supervisor1" 6700) (WorkerSlot. "supervisor1" 6701)
+                      (WorkerSlot. "supervisor2" 6700) (WorkerSlot. "supervisor2" 6701) (WorkerSlot. "supervisor2" 6702)
+                      (WorkerSlot. "supervisor3" 6700) (WorkerSlot. "supervisor3" 6703) (WorkerSlot. "supervisor3" 6702) (WorkerSlot. "supervisor3" 6701)
+                      ]))))
+    )