You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/17 19:58:05 UTC
git commit: Force free a slot in bad-state
Repository: incubator-storm
Updated Branches:
refs/heads/security 92e3a5742 -> ab7784e49
Force free a slot in bad-state
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ab7784e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ab7784e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ab7784e4
Branch: refs/heads/security
Commit: ab7784e49d251ca4da967c6ec6bc340cc7f940aa
Parents: 92e3a57
Author: Kishor Patil <pa...@yahoo.com>
Authored: Tue Jun 17 15:19:00 2014 +0000
Committer: Kishor Patil <pa...@yahoo.com>
Committed: Tue Jun 17 15:19:00 2014 +0000
----------------------------------------------------------------------
.../storm/scheduler/multitenant/Node.java | 21 +++++++++---
.../scheduler/multitenant_scheduler_test.clj | 35 ++++++++++++++++++++
2 files changed, 51 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ab7784e4/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
index 2bc2cee..1c601ca 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
@@ -163,8 +163,9 @@ public class Node {
* @param ws the slot to free
* @param cluster the cluster to update
*/
- public void free(WorkerSlot ws, Cluster cluster) {
+ public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
if (_freeSlots.contains(ws)) return;
+ boolean wasFound = false;
for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
Set<WorkerSlot> slots = entry.getValue();
if (slots.remove(ws)) {
@@ -172,11 +173,21 @@ public class Node {
if (_isAlive) {
_freeSlots.add(ws);
}
- return;
+ wasFound = true;
+ }
+ }
+ if(!wasFound)
+ {
+ if(forceFree)
+ {
+ LOG.info("Forcefully freeing the " + ws);
+ cluster.freeSlot(ws);
+ _freeSlots.add(ws);
+ } else {
+ throw new IllegalArgumentException("Tried to free a slot that was not" +
+ " part of this node " + _nodeId);
}
}
- throw new IllegalArgumentException("Tried to free a slot that was not" +
- " part of this node " + _nodeId);
}
/**
@@ -301,7 +312,7 @@ public class Node {
}
if (node.assignInternal(ws, topId, true)) {
LOG.warn("Bad scheduling state, "+ws+" assigned multiple workers, unassigning everything...");
- node.free(ws, cluster);
+ node.free(ws, cluster, true);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ab7784e4/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
index 4e79240..b3cdb13 100644
--- a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
+++ b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
@@ -679,6 +679,41 @@
(is (= "Scheduled Isolated on 5 Nodes" (.get (.getStatusMap cluster) "topology3")))
))
+(deftest test-force-free-slot-in-bad-state
+ (let [supers (gen-supervisors 1)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"
+ TOPOLOGY-SUBMITTER-USER "userC"}
+ (StormTopology.)
+ 4
+ (mk-ed-map [["spout1" 0 5]
+ ["bolt1" 5 10]
+ ["bolt2" 10 15]
+ ["bolt3" 15 20]]))
+ existing-assignments {
+ "topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1)
+ (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20)
+ (ExecutorDetails. 10 15) (WorkerSlot. "super0" 1)
+ (ExecutorDetails. 15 20) (WorkerSlot. "super0" 1)})
+ }
+ cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments)
+ node-map (Node/getAllNodesFrom cluster)
+ topologies (Topologies. (to-top-map [topology1]))
+ conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
+ scheduler (MultitenantScheduler.)]
+ (.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
+ (.prepare scheduler conf)
+ (.schedule scheduler topologies cluster)
+ (let [assignment (.getAssignmentById cluster "topology1")
+ assigned-slots (.getSlots assignment)
+ executors (.getExecutors assignment)]
+ (log-message "Executors are:" executors)
+ ;; 4 slots on 1 machine, all executors assigned
+ (is (= 4 (.size assigned-slots)))
+ (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+ ))
(deftest test-multitenant-scheduler-bad-starting-state
(let [supers (gen-supervisors 10)