You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/16 19:21:45 UTC
[05/10] storm git commit: fix nimbus test failure
fix nimbus test failure
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/defcb960
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/defcb960
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/defcb960
Branch: refs/heads/master
Commit: defcb9601d8f4d287fa4f1d6de7ee43d8183b137
Parents: 72d409c
Author: Xin Wang <be...@163.com>
Authored: Sun Feb 21 17:20:09 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Feb 21 17:20:09 2016 +0800
----------------------------------------------------------------------
.../apache/storm/scheduler/EvenScheduler.java | 29 +++++++++++------
.../clj/org/apache/storm/scheduler_test.clj | 34 +++++---------------
2 files changed, 28 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/defcb960/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
index 2e8565b..d91e187 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
@@ -22,10 +22,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
@@ -38,16 +36,29 @@ import com.google.common.collect.Sets;
public class EvenScheduler implements IScheduler {
private static final Logger LOG = LoggerFactory.getLogger(EvenScheduler.class);
- public static List<WorkerSlot> sortSlots(List<WorkerSlot> availableSlots, Cluster cluster) {
+ public static List<WorkerSlot> sortSlots(List<WorkerSlot> availableSlots) {
+ //For example, we have a three nodes(supervisor1, supervisor2, supervisor3) cluster:
+ //slots before sort:
+ //supervisor1:6700, supervisor1:6701,
+ //supervisor2:6700, supervisor2:6701, supervisor2:6702,
+ //supervisor3:6700, supervisor3:6703, supervisor3:6702, supervisor3:6701
+ //slots after sort:
+ //supervisor3:6700, supervisor2:6700, supervisor1:6700,
+ //supervisor3:6701, supervisor2:6701, supervisor1:6701,
+ //supervisor3:6702, supervisor2:6702,
+ //supervisor3:6703
+
if (availableSlots != null && availableSlots.size() > 0) {
// group by node
Map<String, List<WorkerSlot>> slotGroups = new TreeMap<String, List<WorkerSlot>>();
for (WorkerSlot slot : availableSlots) {
- String host = cluster.getHost(slot.getNodeId());
- List<WorkerSlot> slots = slotGroups.get(host);
- if (slots == null) {
- slots = new ArrayList<WorkerSlot>();
- slotGroups.put(host, slots);
+ String node = slot.getNodeId();
+ List<WorkerSlot> slots = null;
+ if(slotGroups.containsKey(node)){
+ slots = slotGroups.get(node);
+ }else{
+ slots = new ArrayList<WorkerSlot>();
+ slotGroups.put(node, slots);
}
slots.add(slot);
}
@@ -93,7 +104,7 @@ public class EvenScheduler implements IScheduler {
Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size());
- List<WorkerSlot> sortedList = sortSlots(availableSlots, cluster);
+ List<WorkerSlot> sortedList = sortSlots(availableSlots);
if (sortedList == null || sortedList.size() < (totalSlotsToUse - aliveAssigned.size())) {
LOG.error("Available slots are not enough for topology: {}", topology.getName());
return new HashMap<ExecutorDetails, WorkerSlot>();
http://git-wip-us.apache.org/repos/asf/storm/blob/defcb960/storm-core/test/clj/org/apache/storm/scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
index b14af71..0d74daf 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
@@ -261,34 +261,16 @@
))
(deftest test-sort-slots
- (let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 6700 6701)))
- supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 6700 6701 6702)))
- supervisor3 (SupervisorDetails. "supervisor3" "192.168.0.3" (list ) (map int (list 6700 6701 6702 6703)))
- assignment1 (SchedulerAssignmentImpl. "topology1" nil)
- assignment2 (SchedulerAssignmentImpl. "topology2" nil)
- supervisor1-slot0 (WorkerSlot. "supervisor1" 6700)
- supervisor1-slot1 (WorkerSlot. "supervisor1" 6701)
- supervisor2-slot0 (WorkerSlot. "supervisor2" 6700)
- supervisor2-slot1 (WorkerSlot. "supervisor2" 6701)
- supervisor2-slot2 (WorkerSlot. "supervisor2" 6702)
- supervisor3-slot0 (WorkerSlot. "supervisor3" 6700)
- supervisor3-slot1 (WorkerSlot. "supervisor3" 6701)
- supervisor3-slot2 (WorkerSlot. "supervisor3" 6702)
- supervisor3-slot3 (WorkerSlot. "supervisor3" 6703)
- cluster (Cluster. (nimbus/standalone-nimbus)
- {"supervisor1" supervisor1 "supervisor2" supervisor2 "supervisor3" supervisor3}
- {"topology1" assignment1 "topology2" assignment2}
- nil)]
;; test supervisor2 has more free slots
(is (= "[supervisor2:6700, supervisor1:6700, supervisor2:6701, supervisor1:6701, supervisor2:6702]"
- (.toString (EvenScheduler/sortSlots [supervisor1-slot0 supervisor1-slot1
- supervisor2-slot0 supervisor2-slot1 supervisor2-slot2
- ] cluster))))
+ (.toString (EvenScheduler/sortSlots [(WorkerSlot. "supervisor1" 6700) (WorkerSlot. "supervisor1" 6701)
+ (WorkerSlot. "supervisor2" 6700) (WorkerSlot. "supervisor2" 6701) (WorkerSlot. "supervisor2" 6702)
+ ]))))
;; test supervisor3 has more free slots
(is (= "[supervisor3:6700, supervisor2:6700, supervisor1:6700, supervisor3:6701, supervisor2:6701, supervisor1:6701, supervisor3:6702, supervisor2:6702, supervisor3:6703]"
- (.toString (EvenScheduler/sortSlots [supervisor1-slot0 supervisor1-slot1
- supervisor2-slot0 supervisor2-slot1 supervisor2-slot2
- supervisor3-slot0 supervisor3-slot3 supervisor3-slot2 supervisor3-slot1
- ] cluster))))
- ))
+ (.toString (EvenScheduler/sortSlots [(WorkerSlot. "supervisor1" 6700) (WorkerSlot. "supervisor1" 6701)
+ (WorkerSlot. "supervisor2" 6700) (WorkerSlot. "supervisor2" 6701) (WorkerSlot. "supervisor2" 6702)
+ (WorkerSlot. "supervisor3" 6700) (WorkerSlot. "supervisor3" 6703) (WorkerSlot. "supervisor3" 6702) (WorkerSlot. "supervisor3" 6701)
+ ]))))
+ )