You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:12 UTC

[34/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java
index dc7eded..3a50a3f 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java
@@ -40,307 +40,297 @@ import backtype.storm.scheduler.WorkerSlot;
  * A pool of machines that can be used to run isolated topologies
  */
 public class IsolatedPool extends NodePool {
-  private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class);
-  private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>();
-  private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
-  private HashSet<String> _isolated = new HashSet<String>();
-  private int _maxNodes;
-  private int _usedNodes;
+    private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class);
+    private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>();
+    private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
+    private HashSet<String> _isolated = new HashSet<String>();
+    private int _maxNodes;
+    private int _usedNodes;
 
-  public IsolatedPool(int maxNodes) {
-    _maxNodes = maxNodes;
-    _usedNodes = 0;
-  }
-
-  @Override
-  public void addTopology(TopologyDetails td) {
-    String topId = td.getId();
-    LOG.debug("Adding in Topology {}", topId);
-    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
-    Set<Node> assignedNodes = new HashSet<Node>();
-    if (assignment != null) {
-      for (WorkerSlot ws: assignment.getSlots()) {
-        Node n = _nodeIdToNode.get(ws.getNodeId());
-        assignedNodes.add(n);
-      }
-    }
-    _usedNodes += assignedNodes.size();
-    _topologyIdToNodes.put(topId, assignedNodes);
-    _tds.put(topId, td);
-    if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
-      _isolated.add(topId);
+    public IsolatedPool(int maxNodes) {
+        _maxNodes = maxNodes;
+        _usedNodes = 0;
     }
-  }
 
-  @Override
-  public boolean canAdd(TopologyDetails td) {
-    //Only add topologies that are not sharing nodes with other topologies
-    String topId = td.getId();
-    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
-    if (assignment != null) {
-      for (WorkerSlot ws: assignment.getSlots()) {
-        Node n = _nodeIdToNode.get(ws.getNodeId());
-        if (n.getRunningTopologies().size() > 1) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-  
-  @Override
-  public void scheduleAsNeeded(NodePool ... lesserPools) {
-    for (String topId : _topologyIdToNodes.keySet()) {
-      TopologyDetails td = _tds.get(topId);
-      if (_cluster.needsScheduling(td)) {
-        LOG.debug("Scheduling topology {}",topId);
-        Set<Node> allNodes = _topologyIdToNodes.get(topId);
-        Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
-        int slotsToUse = 0;
-        if (nodesRequested == null) {
-          slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
-        } else {
-          slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools, 
-              nodesRequested.intValue());
+    @Override
+    public void addTopology(TopologyDetails td) {
+        String topId = td.getId();
+        LOG.debug("Adding in Topology {}", topId);
+        SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
+        Set<Node> assignedNodes = new HashSet<Node>();
+        if (assignment != null) {
+            for (WorkerSlot ws : assignment.getSlots()) {
+                Node n = _nodeIdToNode.get(ws.getNodeId());
+                assignedNodes.add(n);
+            }
         }
-        //No slots to schedule for some reason, so skip it.
-        if (slotsToUse <= 0) {
-          continue;
+        _usedNodes += assignedNodes.size();
+        _topologyIdToNodes.put(topId, assignedNodes);
+        _tds.put(topId, td);
+        if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
+            _isolated.add(topId);
         }
-        
-        RoundRobinSlotScheduler slotSched = 
-          new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
-        
-        LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes);
-        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+    }
 
-        LOG.debug("Nodes sorted by free space {}", sortedNodes);
-        while (true) {
-          Node n = sortedNodes.remove();
-          if (!slotSched.assignSlotTo(n)) {
-            break;
-          }
-          int freeSlots = n.totalSlotsFree();
-          for (int i = 0; i < sortedNodes.size(); i++) {
-            if (freeSlots >= sortedNodes.get(i).totalSlotsFree()) {
-              sortedNodes.add(i, n);
-              n = null;
-              break;
+    @Override
+    public boolean canAdd(TopologyDetails td) {
+        // Only add topologies that are not sharing nodes with other topologies
+        String topId = td.getId();
+        SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
+        if (assignment != null) {
+            for (WorkerSlot ws : assignment.getSlots()) {
+                Node n = _nodeIdToNode.get(ws.getNodeId());
+                if (n.getRunningTopologies().size() > 1) {
+                    return false;
+                }
             }
-          }
-          if (n != null) {
-            sortedNodes.add(n);
-          }
         }
-      }
-      Set<Node> found = _topologyIdToNodes.get(topId);
-      int nc = found == null ? 0 : found.size();
-      _cluster.setStatus(topId,"Scheduled Isolated on "+nc+" Nodes");
+        return true;
     }
-  }
-  
-  /**
-   * Get the nodes needed to schedule an isolated topology.
-   * @param td the topology to be scheduled
-   * @param allNodes the nodes already scheduled for this topology.
-   * This will be updated to include new nodes if needed. 
-   * @param lesserPools node pools we can steal nodes from
-   * @return the number of additional slots that should be used for scheduling.
-   */
-  private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes,
-      NodePool[] lesserPools, int nodesRequested) {
-    String topId = td.getId();
-    LOG.debug("Topology {} is isolated", topId);
-    int nodesFromUsAvailable = nodesAvailable();
-    int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools);
 
-    int nodesUsed = _topologyIdToNodes.get(topId).size();
-    int nodesNeeded = nodesRequested - nodesUsed;
-    LOG.debug("Nodes... requested {} used {} available from us {} " +
-        "avail from other {} needed {}", new Object[] {nodesRequested, 
-        nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable,
-        nodesNeeded});
-    if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
-      _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. "
-        + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) 
-        + " more nodes needed to run topology.");
-      return 0;
-    }
+    @Override
+    public void scheduleAsNeeded(NodePool... lesserPools) {
+        for (String topId : _topologyIdToNodes.keySet()) {
+            TopologyDetails td = _tds.get(topId);
+            if (_cluster.needsScheduling(td)) {
+                LOG.debug("Scheduling topology {}", topId);
+                Set<Node> allNodes = _topologyIdToNodes.get(topId);
+                Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
+                int slotsToUse = 0;
+                if (nodesRequested == null) {
+                    slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
+                } else {
+                    slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools, nodesRequested.intValue());
+                }
+                // No slots to schedule for some reason, so skip it.
+                if (slotsToUse <= 0) {
+                    continue;
+                }
 
-    //In order to avoid going over _maxNodes I may need to steal from
-    // myself even though other pools have free nodes. so figure out how
-    // much each group should provide
-    int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, 
-        nodesFromOthersAvailable), nodesNeeded);
-    int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers; 
-    LOG.debug("Nodes... needed from us {} needed from others {}", 
-        nodesNeededFromUs, nodesNeededFromOthers);
+                RoundRobinSlotScheduler slotSched = new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
 
-    if (nodesNeededFromUs > nodesFromUsAvailable) {
-      _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology");
-      return 0;
+                LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes);
+                Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+
+                LOG.debug("Nodes sorted by free space {}", sortedNodes);
+                while (true) {
+                    Node n = sortedNodes.remove();
+                    if (!slotSched.assignSlotTo(n)) {
+                        break;
+                    }
+                    int freeSlots = n.totalSlotsFree();
+                    for (int i = 0; i < sortedNodes.size(); i++) {
+                        if (freeSlots >= sortedNodes.get(i).totalSlotsFree()) {
+                            sortedNodes.add(i, n);
+                            n = null;
+                            break;
+                        }
+                    }
+                    if (n != null) {
+                        sortedNodes.add(n);
+                    }
+                }
+            }
+            Set<Node> found = _topologyIdToNodes.get(topId);
+            int nc = found == null ? 0 : found.size();
+            _cluster.setStatus(topId, "Scheduled Isolated on " + nc + " Nodes");
+        }
     }
 
-    //Get the nodes
-    Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools);
-    _usedNodes += found.size();
-    allNodes.addAll(found);
-    Collection<Node> foundMore = takeNodes(nodesNeededFromUs);
-    _usedNodes += foundMore.size();
-    allNodes.addAll(foundMore);
+    /**
+     * Get the nodes needed to schedule an isolated topology.
+     * 
+     * @param td the topology to be scheduled
+     * @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed.
+     * @param lesserPools node pools we can steal nodes from
+     * @return the number of additional slots that should be used for scheduling.
+     */
+    private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools, int nodesRequested) {
+        String topId = td.getId();
+        LOG.debug("Topology {} is isolated", topId);
+        int nodesFromUsAvailable = nodesAvailable();
+        int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools);
 
-    int totalTasks = td.getExecutors().size();
-    int origRequest = td.getNumWorkers();
-    int slotsRequested = Math.min(totalTasks, origRequest);
-    int slotsUsed = Node.countSlotsUsed(allNodes);
-    int slotsFree = Node.countFreeSlotsAlive(allNodes);
-    int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree);
-    if (slotsToUse <= 0) {
-      _cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology.");
-    }
-    return slotsToUse;
-  }
-  
-  /**
-   * Get the nodes needed to schedule a non-isolated topology.
-   * @param td the topology to be scheduled
-   * @param allNodes the nodes already scheduled for this topology.
-   * This will be updated to include new nodes if needed. 
-   * @param lesserPools node pools we can steal nodes from
-   * @return the number of additional slots that should be used for scheduling.
-   */
-  private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes,
-      NodePool[] lesserPools) {
-    String topId = td.getId();
-    LOG.debug("Topology {} is not isolated",topId);
-    int totalTasks = td.getExecutors().size();
-    int origRequest = td.getNumWorkers();
-    int slotsRequested = Math.min(totalTasks, origRequest);
-    int slotsUsed = Node.countSlotsUsed(topId, allNodes);
-    int slotsFree = Node.countFreeSlotsAlive(allNodes);
-    //Check to see if we have enough slots before trying to get them
-    int slotsAvailable = 0;
-    if (slotsRequested > slotsFree) {
-      slotsAvailable = NodePool.slotsAvailable(lesserPools);
-    }
-    int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
-    LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", 
-        new Object[] {slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse});
-    if (slotsToUse <= 0) {
-      _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
-      return 0;
+        int nodesUsed = _topologyIdToNodes.get(topId).size();
+        int nodesNeeded = nodesRequested - nodesUsed;
+        LOG.debug("Nodes... requested {} used {} available from us {} " + "avail from other {} needed {}", new Object[] { nodesRequested, nodesUsed,
+                nodesFromUsAvailable, nodesFromOthersAvailable, nodesNeeded });
+        if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
+            _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. "
+                    + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) + " more nodes needed to run topology.");
+            return 0;
+        }
+
+        // In order to avoid going over _maxNodes I may need to steal from
+        // myself even though other pools have free nodes. so figure out how
+        // much each group should provide
+        int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, nodesFromOthersAvailable), nodesNeeded);
+        int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers;
+        LOG.debug("Nodes... needed from us {} needed from others {}", nodesNeededFromUs, nodesNeededFromOthers);
+
+        if (nodesNeededFromUs > nodesFromUsAvailable) {
+            _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology");
+            return 0;
+        }
+
+        // Get the nodes
+        Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools);
+        _usedNodes += found.size();
+        allNodes.addAll(found);
+        Collection<Node> foundMore = takeNodes(nodesNeededFromUs);
+        _usedNodes += foundMore.size();
+        allNodes.addAll(foundMore);
+
+        int totalTasks = td.getExecutors().size();
+        int origRequest = td.getNumWorkers();
+        int slotsRequested = Math.min(totalTasks, origRequest);
+        int slotsUsed = Node.countSlotsUsed(allNodes);
+        int slotsFree = Node.countFreeSlotsAlive(allNodes);
+        int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree);
+        if (slotsToUse <= 0) {
+            _cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology.");
+        }
+        return slotsToUse;
     }
-    int slotsNeeded = slotsToUse - slotsFree;
-    int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
-    LOG.debug("Nodes... new {} used {} max {}",
-        new Object[]{numNewNodes, _usedNodes, _maxNodes});
-    if ((numNewNodes + _usedNodes) > _maxNodes) {
-      _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. " +
-      (numNewNodes - (_maxNodes - _usedNodes)) + " more nodes needed to run topology.");
-      return 0;
+
+    /**
+     * Get the nodes needed to schedule a non-isolated topology.
+     * 
+     * @param td the topology to be scheduled
+     * @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed.
+     * @param lesserPools node pools we can steal nodes from
+     * @return the number of additional slots that should be used for scheduling.
+     */
+    private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools) {
+        String topId = td.getId();
+        LOG.debug("Topology {} is not isolated", topId);
+        int totalTasks = td.getExecutors().size();
+        int origRequest = td.getNumWorkers();
+        int slotsRequested = Math.min(totalTasks, origRequest);
+        int slotsUsed = Node.countSlotsUsed(topId, allNodes);
+        int slotsFree = Node.countFreeSlotsAlive(allNodes);
+        // Check to see if we have enough slots before trying to get them
+        int slotsAvailable = 0;
+        if (slotsRequested > slotsFree) {
+            slotsAvailable = NodePool.slotsAvailable(lesserPools);
+        }
+        int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
+        LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", new Object[] { slotsRequested, slotsUsed, slotsFree, slotsAvailable,
+                slotsToUse });
+        if (slotsToUse <= 0) {
+            _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
+            return 0;
+        }
+        int slotsNeeded = slotsToUse - slotsFree;
+        int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
+        LOG.debug("Nodes... new {} used {} max {}", new Object[] { numNewNodes, _usedNodes, _maxNodes });
+        if ((numNewNodes + _usedNodes) > _maxNodes) {
+            _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + (numNewNodes - (_maxNodes - _usedNodes))
+                    + " more nodes needed to run topology.");
+            return 0;
+        }
+
+        Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools);
+        _usedNodes += found.size();
+        allNodes.addAll(found);
+        return slotsToUse;
     }
-    
-    Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools);
-    _usedNodes += found.size();
-    allNodes.addAll(found);
-    return slotsToUse;
-  }
 
-  @Override
-  public Collection<Node> takeNodes(int nodesNeeded) {
-    LOG.debug("Taking {} from {}", nodesNeeded, this);
-    HashSet<Node> ret = new HashSet<Node>();
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        Iterator<Node> it = entry.getValue().iterator();
-        while (it.hasNext()) {
-          if (nodesNeeded <= 0) {
-            return ret;
-          }
-          Node n = it.next();
-          it.remove();
-          n.freeAllSlots(_cluster);
-          ret.add(n);
-          nodesNeeded--;
-          _usedNodes--;
+    @Override
+    public Collection<Node> takeNodes(int nodesNeeded) {
+        LOG.debug("Taking {} from {}", nodesNeeded, this);
+        HashSet<Node> ret = new HashSet<Node>();
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                Iterator<Node> it = entry.getValue().iterator();
+                while (it.hasNext()) {
+                    if (nodesNeeded <= 0) {
+                        return ret;
+                    }
+                    Node n = it.next();
+                    it.remove();
+                    n.freeAllSlots(_cluster);
+                    ret.add(n);
+                    nodesNeeded--;
+                    _usedNodes--;
+                }
+            }
         }
-      }
+        return ret;
     }
-    return ret;
-  }
-  
-  @Override
-  public int nodesAvailable() {
-    int total = 0;
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        total += entry.getValue().size();
-      }
+
+    @Override
+    public int nodesAvailable() {
+        int total = 0;
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                total += entry.getValue().size();
+            }
+        }
+        return total;
     }
-    return total;
-  }
-  
-  @Override
-  public int slotsAvailable() {
-    int total = 0;
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        total += Node.countTotalSlotsAlive(entry.getValue());
-      }
+
+    @Override
+    public int slotsAvailable() {
+        int total = 0;
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                total += Node.countTotalSlotsAlive(entry.getValue());
+            }
+        }
+        return total;
     }
-    return total;
-  }
 
-  @Override
-  public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        Iterator<Node> it = entry.getValue().iterator();
-        while (it.hasNext()) {
-          Node n = it.next();
-          if (n.isAlive()) {
-            it.remove();
-            _usedNodes--;
-            n.freeAllSlots(_cluster);
-            ret.add(n);
-            slotsNeeded -= n.totalSlots();
-            if (slotsNeeded <= 0) {
-              return ret;
+    @Override
+    public Collection<Node> takeNodesBySlots(int slotsNeeded) {
+        HashSet<Node> ret = new HashSet<Node>();
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                Iterator<Node> it = entry.getValue().iterator();
+                while (it.hasNext()) {
+                    Node n = it.next();
+                    if (n.isAlive()) {
+                        it.remove();
+                        _usedNodes--;
+                        n.freeAllSlots(_cluster);
+                        ret.add(n);
+                        slotsNeeded -= n.totalSlots();
+                        if (slotsNeeded <= 0) {
+                            return ret;
+                        }
+                    }
+                }
             }
-          }
         }
-      }
+        return ret;
     }
-    return ret;
-  }
-  
-  @Override
-  public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
-    int nodesFound = 0;
-    int slotsFound = 0;
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        Iterator<Node> it = entry.getValue().iterator();
-        while (it.hasNext()) {
-          Node n = it.next();
-          if (n.isAlive()) {
-            nodesFound++;
-            int totalSlotsFree = n.totalSlots();
-            slotsFound += totalSlotsFree;
-            slotsNeeded -= totalSlotsFree;
-            if (slotsNeeded <= 0) {
-              return new NodeAndSlotCounts(nodesFound, slotsFound);
+
+    @Override
+    public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
+        int nodesFound = 0;
+        int slotsFound = 0;
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                Iterator<Node> it = entry.getValue().iterator();
+                while (it.hasNext()) {
+                    Node n = it.next();
+                    if (n.isAlive()) {
+                        nodesFound++;
+                        int totalSlotsFree = n.totalSlots();
+                        slotsFound += totalSlotsFree;
+                        slotsNeeded -= totalSlotsFree;
+                        if (slotsNeeded <= 0) {
+                            return new NodeAndSlotCounts(nodesFound, slotsFound);
+                        }
+                    }
+                }
             }
-          }
         }
-      }
+        return new NodeAndSlotCounts(nodesFound, slotsFound);
+    }
+
+    @Override
+    public String toString() {
+        return "IsolatedPool... ";
     }
-    return new NodeAndSlotCounts(nodesFound, slotsFound);
-  }
-  
-  @Override
-  public String toString() {
-    return "IsolatedPool... ";
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
index 320b388..27475d9 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -32,67 +32,66 @@ import backtype.storm.scheduler.TopologyDetails;
 import backtype.storm.utils.Utils;
 
 public class MultitenantScheduler implements IScheduler {
-  private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class);
-  @SuppressWarnings("rawtypes")
-  private Map _conf;
-  
-  @Override
-  public void prepare(@SuppressWarnings("rawtypes") Map conf) {
-    _conf = conf;
-  }
- 
-  private Map<String, Number> getUserConf() {
-    Map<String, Number> ret = (Map<String, Number>)_conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
-    if (ret == null) {
-      ret = new HashMap<String, Number>();
-    } else {
-      ret = new HashMap<String, Number>(ret); 
-    }
+    private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class);
+    @SuppressWarnings("rawtypes")
+    private Map _conf;
 
-    Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false);
-    Map<String, Number> tmp = (Map<String, Number>)fromFile.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
-    if (tmp != null) {
-      ret.putAll(tmp);
+    @Override
+    public void prepare(@SuppressWarnings("rawtypes") Map conf) {
+        _conf = conf;
     }
-    return ret;
-  }
 
- 
-  @Override
-  public void schedule(Topologies topologies, Cluster cluster) {
-    LOG.debug("Rerunning scheduling...");
-    Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster);
-    
-    Map<String, Number> userConf = getUserConf();
-    
-    Map<String, IsolatedPool> userPools = new HashMap<String, IsolatedPool>();
-    for (Map.Entry<String, Number> entry : userConf.entrySet()) {
-      userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue()));
-    }
-    DefaultPool defaultPool = new DefaultPool();
-    FreePool freePool = new FreePool();
-    
-    freePool.init(cluster, nodeIdToNode);
-    for (IsolatedPool pool : userPools.values()) {
-      pool.init(cluster, nodeIdToNode);
-    }
-    defaultPool.init(cluster, nodeIdToNode);
-    
-    for (TopologyDetails td: topologies.getTopologies()) {
-      String user = (String)td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER);
-      LOG.debug("Found top {} run by user {}",td.getId(), user);
-      NodePool pool = userPools.get(user);
-      if (pool == null || !pool.canAdd(td)) {
-        pool = defaultPool;
-      }
-      pool.addTopology(td);
+    private Map<String, Number> getUserConf() {
+        Map<String, Number> ret = (Map<String, Number>) _conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
+        if (ret == null) {
+            ret = new HashMap<String, Number>();
+        } else {
+            ret = new HashMap<String, Number>(ret);
+        }
+
+        Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false);
+        Map<String, Number> tmp = (Map<String, Number>) fromFile.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
+        if (tmp != null) {
+            ret.putAll(tmp);
+        }
+        return ret;
     }
-    
-    //Now schedule all of the topologies that need to be scheduled
-    for (IsolatedPool pool : userPools.values()) {
-      pool.scheduleAsNeeded(freePool, defaultPool);
+
+    @Override
+    public void schedule(Topologies topologies, Cluster cluster) {
+        LOG.debug("Rerunning scheduling...");
+        Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster);
+
+        Map<String, Number> userConf = getUserConf();
+
+        Map<String, IsolatedPool> userPools = new HashMap<String, IsolatedPool>();
+        for (Map.Entry<String, Number> entry : userConf.entrySet()) {
+            userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue()));
+        }
+        DefaultPool defaultPool = new DefaultPool();
+        FreePool freePool = new FreePool();
+
+        freePool.init(cluster, nodeIdToNode);
+        for (IsolatedPool pool : userPools.values()) {
+            pool.init(cluster, nodeIdToNode);
+        }
+        defaultPool.init(cluster, nodeIdToNode);
+
+        for (TopologyDetails td : topologies.getTopologies()) {
+            String user = (String) td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER);
+            LOG.debug("Found top {} run by user {}", td.getId(), user);
+            NodePool pool = userPools.get(user);
+            if (pool == null || !pool.canAdd(td)) {
+                pool = defaultPool;
+            }
+            pool.addTopology(td);
+        }
+
+        // Now schedule all of the topologies that need to be scheduled
+        for (IsolatedPool pool : userPools.values()) {
+            pool.scheduleAsNeeded(freePool, defaultPool);
+        }
+        defaultPool.scheduleAsNeeded(freePool);
+        LOG.debug("Scheduling done...");
     }
-    defaultPool.scheduleAsNeeded(freePool);
-    LOG.debug("Scheduling done...");
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java
index 883c65f..2cc49a8 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java
@@ -39,305 +39,299 @@ import backtype.storm.scheduler.WorkerSlot;
  * Represents a single node in the cluster.
  */
 public class Node {
-  private static final Logger LOG = LoggerFactory.getLogger(Node.class);
-  private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>();
-  private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
-  private final String _nodeId;
-  private boolean _isAlive;
-  
-  public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) {
-    _nodeId = nodeId;
-    _isAlive = isAlive;
-    if (_isAlive && allPorts != null) {
-      for (int port: allPorts) {
-        _freeSlots.add(new WorkerSlot(_nodeId, port));
-      }
-    }
-  }
-
-  public String getId() {
-    return _nodeId;
-  }
-  
-  public boolean isAlive() {
-    return _isAlive;
-  }
-  
-  /**
-   * @return a collection of the topology ids currently running on this node
-   */
-  public Collection<String> getRunningTopologies() {
-    return _topIdToUsedSlots.keySet();
-  }
-  
-  public boolean isTotallyFree() {
-    return _topIdToUsedSlots.isEmpty();
-  }
-  
-  public int totalSlotsFree() {
-    return _freeSlots.size();
-  }
-  
-  public int totalSlotsUsed() {
-    int total = 0;
-    for (Set<WorkerSlot> slots: _topIdToUsedSlots.values()) {
-      total += slots.size();
+    private static final Logger LOG = LoggerFactory.getLogger(Node.class);
+    private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+    private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
+    private final String _nodeId;
+    private boolean _isAlive;
+
+    public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) {
+        _nodeId = nodeId;
+        _isAlive = isAlive;
+        if (_isAlive && allPorts != null) {
+            for (int port : allPorts) {
+                _freeSlots.add(new WorkerSlot(_nodeId, port));
+            }
+        }
     }
-    return total;
-  }
-  
-  public int totalSlots() {
-    return totalSlotsFree() + totalSlotsUsed();
-  }
-  
-  public int totalSlotsUsed(String topId) {
-    int total = 0;
-    Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
-    if (slots != null) {
-      total = slots.size();
+
+    public String getId() {
+        return _nodeId;
     }
-    return total;
-  }
-
-  private void validateSlot(WorkerSlot ws) {
-    if (!_nodeId.equals(ws.getNodeId())) {
-      throw new IllegalArgumentException(
-          "Trying to add a slot to the wrong node " + ws + 
-          " is not a part of " + _nodeId);
+
+    public boolean isAlive() {
+        return _isAlive;
     }
-  }
- 
-  private void addOrphanedSlot(WorkerSlot ws) {
-    if (_isAlive) {
-      throw new IllegalArgumentException("Orphaned Slots " +
-        "only are allowed on dead nodes.");
+
+    /**
+     * @return a collection of the topology ids currently running on this node
+     */
+    public Collection<String> getRunningTopologies() {
+        return _topIdToUsedSlots.keySet();
     }
-    validateSlot(ws);
-    if (_freeSlots.contains(ws)) {
-      return;
+
+    public boolean isTotallyFree() {
+        return _topIdToUsedSlots.isEmpty();
     }
-    for (Set<WorkerSlot> used: _topIdToUsedSlots.values()) {
-      if (used.contains(ws)) {
-        return;
-      }
+
+    public int totalSlotsFree() {
+        return _freeSlots.size();
     }
-    _freeSlots.add(ws);
-  }
- 
-  boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
-    validateSlot(ws);
-    if (!_freeSlots.remove(ws)) {
-      for (Entry<String, Set<WorkerSlot>> topologySetEntry : _topIdToUsedSlots.entrySet()) {
-        if (topologySetEntry.getValue().contains(ws)) {
-          if (dontThrow) {
-            LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId +
-                    ". Its already assigned to " + topologySetEntry.getKey() + ".");
-            return true;
-          }
-          throw new IllegalStateException("Worker slot [" + ws + "] can't be assigned to "
-                  + topId + ". Its already assigned to " + topologySetEntry.getKey() + ".");
+
+    public int totalSlotsUsed() {
+        int total = 0;
+        for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) {
+            total += slots.size();
         }
-      }
-      LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the supervisor heartbeats," +
-              " but the worker is already running for topology " + topId + ".");
-    }
-    Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
-    if (usedSlots == null) {
-      usedSlots = new HashSet<WorkerSlot>();
-      _topIdToUsedSlots.put(topId, usedSlots);
+        return total;
     }
-    usedSlots.add(ws);
-    return false;
-  }
-  
-  /**
-   * Free all slots on this node.  This will update the Cluster too.
-   * @param cluster the cluster to be updated
-   */
-  public void freeAllSlots(Cluster cluster) {
-    if (!_isAlive) {
-      LOG.warn("Freeing all slots on a dead node {} ",_nodeId);
-    } 
-    for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-      cluster.freeSlots(entry.getValue());
-      if (_isAlive) {
-        _freeSlots.addAll(entry.getValue());
-      }
+
+    public int totalSlots() {
+        return totalSlotsFree() + totalSlotsUsed();
     }
-    _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>();
-  }
-  
-  /**
-   * Frees a single slot in this node
-   * @param ws the slot to free
-   * @param cluster the cluster to update
-   */
-  public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
-    if (_freeSlots.contains(ws)) return;
-    boolean wasFound = false;
-    for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-      Set<WorkerSlot> slots = entry.getValue();
-      if (slots.remove(ws)) {
-        cluster.freeSlot(ws);
-        if (_isAlive) {
-          _freeSlots.add(ws);
+
+    public int totalSlotsUsed(String topId) {
+        int total = 0;
+        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
+        if (slots != null) {
+            total = slots.size();
         }
-        wasFound = true;
-      }
+        return total;
     }
-    if(!wasFound)
-    {
-      if(forceFree)
-      {
-        LOG.info("Forcefully freeing the " + ws);
-        cluster.freeSlot(ws);
-        _freeSlots.add(ws);
-      } else {
-        throw new IllegalArgumentException("Tried to free a slot that was not" +
-              " part of this node " + _nodeId);
-      }
+
+    private void validateSlot(WorkerSlot ws) {
+        if (!_nodeId.equals(ws.getNodeId())) {
+            throw new IllegalArgumentException("Trying to add a slot to the wrong node " + ws + " is not a part of " + _nodeId);
+        }
     }
-  }
-   
-  /**
-   * Frees all the slots for a topology.
-   * @param topId the topology to free slots for
-   * @param cluster the cluster to update
-   */
-  public void freeTopology(String topId, Cluster cluster) {
-    Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
-    if (slots == null || slots.isEmpty()) return;
-    for (WorkerSlot ws : slots) {
-      cluster.freeSlot(ws);
-      if (_isAlive) {
+
+    private void addOrphanedSlot(WorkerSlot ws) {
+        if (_isAlive) {
+            throw new IllegalArgumentException("Orphaned Slots " + "only are allowed on dead nodes.");
+        }
+        validateSlot(ws);
+        if (_freeSlots.contains(ws)) {
+            return;
+        }
+        for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) {
+            if (used.contains(ws)) {
+                return;
+            }
+        }
         _freeSlots.add(ws);
-      }
     }
-    _topIdToUsedSlots.remove(topId);
-  }
- 
-  /**
-   * Assign a free slot on the node to the following topology and executors.
-   * This will update the cluster too.
-   * @param topId the topology to assign a free slot to.
-   * @param executors the executors to run in that slot.
-   * @param cluster the cluster to be updated
-   */
-  public void assign(String topId, Collection<ExecutorDetails> executors, 
-      Cluster cluster) {
-    if (!_isAlive) {
-      throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
+
+    boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
+        validateSlot(ws);
+        if (!_freeSlots.remove(ws)) {
+            for (Entry<String, Set<WorkerSlot>> topologySetEntry : _topIdToUsedSlots.entrySet()) {
+                if (topologySetEntry.getValue().contains(ws)) {
+                    if (dontThrow) {
+                        LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId + ". Its already assigned to " + topologySetEntry.getKey() + ".");
+                        return true;
+                    }
+                    throw new IllegalStateException("Worker slot [" + ws + "] can't be assigned to " + topId + ". Its already assigned to "
+                            + topologySetEntry.getKey() + ".");
+                }
+            }
+            LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the supervisor heartbeats," + " but the worker is already running for topology "
+                    + topId + ".");
+        }
+        Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
+        if (usedSlots == null) {
+            usedSlots = new HashSet<WorkerSlot>();
+            _topIdToUsedSlots.put(topId, usedSlots);
+        }
+        usedSlots.add(ws);
+        return false;
     }
-    if (_freeSlots.isEmpty()) {
-      throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
+
+    /**
+     * Free all slots on this node. This will update the Cluster too.
+     * 
+     * @param cluster the cluster to be updated
+     */
+    public void freeAllSlots(Cluster cluster) {
+        if (!_isAlive) {
+            LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
+        }
+        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+            cluster.freeSlots(entry.getValue());
+            if (_isAlive) {
+                _freeSlots.addAll(entry.getValue());
+            }
+        }
+        _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
     }
-    if (executors.size() == 0) {
-      LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " (Ignored)");
-    } else {
-      WorkerSlot slot = _freeSlots.iterator().next();
-      cluster.assign(slot, topId, executors);
-      assignInternal(slot, topId, false);
+
+    /**
+     * Frees a single slot in this node
+     * 
+     * @param ws the slot to free
+     * @param cluster the cluster to update
+     */
+    public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
+        if (_freeSlots.contains(ws))
+            return;
+        boolean wasFound = false;
+        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+            Set<WorkerSlot> slots = entry.getValue();
+            if (slots.remove(ws)) {
+                cluster.freeSlot(ws);
+                if (_isAlive) {
+                    _freeSlots.add(ws);
+                }
+                wasFound = true;
+            }
+        }
+        if (!wasFound) {
+            if (forceFree) {
+                LOG.info("Forcefully freeing the " + ws);
+                cluster.freeSlot(ws);
+                _freeSlots.add(ws);
+            } else {
+                throw new IllegalArgumentException("Tried to free a slot that was not" + " part of this node " + _nodeId);
+            }
+        }
     }
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof Node) {
-      return _nodeId.equals(((Node)other)._nodeId);
+
+    /**
+     * Frees all the slots for a topology.
+     * 
+     * @param topId the topology to free slots for
+     * @param cluster the cluster to update
+     */
+    public void freeTopology(String topId, Cluster cluster) {
+        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
+        if (slots == null || slots.isEmpty())
+            return;
+        for (WorkerSlot ws : slots) {
+            cluster.freeSlot(ws);
+            if (_isAlive) {
+                _freeSlots.add(ws);
+            }
+        }
+        _topIdToUsedSlots.remove(topId);
     }
-    return false;
-  }
-  
-  @Override
-  public int hashCode() {
-    return _nodeId.hashCode();
-  }
-  
-  @Override
-  public String toString() {
-    return "Node: " + _nodeId;
-  }
-
-  public static int countSlotsUsed(String topId, Collection<Node> nodes) {
-    int total = 0;
-    for (Node n: nodes) {
-      total += n.totalSlotsUsed(topId);
+
+    /**
+     * Assign a free slot on the node to the following topology and executors. This will update the cluster too.
+     * 
+     * @param topId the topology to assign a free slot to.
+     * @param executors the executors to run in that slot.
+     * @param cluster the cluster to be updated
+     */
+    public void assign(String topId, Collection<ExecutorDetails> executors, Cluster cluster) {
+        if (!_isAlive) {
+            throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
+        }
+        if (_freeSlots.isEmpty()) {
+            throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
+        }
+        if (executors.size() == 0) {
+            LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " (Ignored)");
+        } else {
+            WorkerSlot slot = _freeSlots.iterator().next();
+            cluster.assign(slot, topId, executors);
+            assignInternal(slot, topId, false);
+        }
     }
-    return total;
-  }
-  
-  public static int countSlotsUsed(Collection<Node> nodes) {
-    int total = 0;
-    for (Node n: nodes) {
-      total += n.totalSlotsUsed();
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof Node) {
+            return _nodeId.equals(((Node) other)._nodeId);
+        }
+        return false;
     }
-    return total;
-  }
-  
-  public static int countFreeSlotsAlive(Collection<Node> nodes) {
-    int total = 0;
-    for (Node n: nodes) {
-      if (n.isAlive()) {
-        total += n.totalSlotsFree();
-      }
+
+    @Override
+    public int hashCode() {
+        return _nodeId.hashCode();
     }
-    return total;
-  }
-  
-  public static int countTotalSlotsAlive(Collection<Node> nodes) {
-    int total = 0;
-    for (Node n: nodes) {
-      if (n.isAlive()) {
-        total += n.totalSlots();
-      }
+
+    @Override
+    public String toString() {
+        return "Node: " + _nodeId;
     }
-    return total;
-  }
-  
-  public static Map<String, Node> getAllNodesFrom(Cluster cluster) {
-    Map<String, Node> nodeIdToNode = new HashMap<String, Node>();
-    for (SupervisorDetails sup : cluster.getSupervisors().values()) {
-      //Node ID and supervisor ID are the same.
-      String id = sup.getId();
-      boolean isAlive = !cluster.isBlackListed(id);
-      LOG.debug("Found a {} Node {} {}",
-          new Object[] {isAlive? "living":"dead", id, sup.getAllPorts()});
-      nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive));
+
+    public static int countSlotsUsed(String topId, Collection<Node> nodes) {
+        int total = 0;
+        for (Node n : nodes) {
+            total += n.totalSlotsUsed(topId);
+        }
+        return total;
     }
-    
-    for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
-      String topId = entry.getValue().getTopologyId();
-      for (WorkerSlot ws: entry.getValue().getSlots()) {
-        String id = ws.getNodeId();
-        Node node = nodeIdToNode.get(id);
-        if (node == null) {
-          LOG.debug("Found an assigned slot on a dead supervisor {}", ws);
-          node = new Node(id, null, false);
-          nodeIdToNode.put(id, node);
+
+    public static int countSlotsUsed(Collection<Node> nodes) {
+        int total = 0;
+        for (Node n : nodes) {
+            total += n.totalSlotsUsed();
         }
-        if (!node.isAlive()) {
-          //The supervisor on the node down so add an orphaned slot to hold the unsupervised worker 
-          node.addOrphanedSlot(ws);
+        return total;
+    }
+
+    public static int countFreeSlotsAlive(Collection<Node> nodes) {
+        int total = 0;
+        for (Node n : nodes) {
+            if (n.isAlive()) {
+                total += n.totalSlotsFree();
+            }
         }
-        if (node.assignInternal(ws, topId, true)) {
-          LOG.warn("Bad scheduling state for topology [" + topId+ "], the slot " +
-                  ws + " assigned to multiple workers, un-assigning everything...");
-          node.free(ws, cluster, true);
+        return total;
+    }
+
+    public static int countTotalSlotsAlive(Collection<Node> nodes) {
+        int total = 0;
+        for (Node n : nodes) {
+            if (n.isAlive()) {
+                total += n.totalSlots();
+            }
         }
-      }
+        return total;
     }
-    
-    return nodeIdToNode;
-  }
-  
-  /**
-   * Used to sort a list of nodes so the node with the most free slots comes
-   * first.
-   */
-  public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new Comparator<Node>() {
-    @Override
-    public int compare(Node o1, Node o2) {
-      return o2.totalSlotsFree() - o1.totalSlotsFree();
+
+    public static Map<String, Node> getAllNodesFrom(Cluster cluster) {
+        Map<String, Node> nodeIdToNode = new HashMap<String, Node>();
+        for (SupervisorDetails sup : cluster.getSupervisors().values()) {
+            // Node ID and supervisor ID are the same.
+            String id = sup.getId();
+            boolean isAlive = !cluster.isBlackListed(id);
+            LOG.debug("Found a {} Node {} {}", new Object[] { isAlive ? "living" : "dead", id, sup.getAllPorts() });
+            nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive));
+        }
+
+        for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
+            String topId = entry.getValue().getTopologyId();
+            for (WorkerSlot ws : entry.getValue().getSlots()) {
+                String id = ws.getNodeId();
+                Node node = nodeIdToNode.get(id);
+                if (node == null) {
+                    LOG.debug("Found an assigned slot on a dead supervisor {}", ws);
+                    node = new Node(id, null, false);
+                    nodeIdToNode.put(id, node);
+                }
+                if (!node.isAlive()) {
+                    // The supervisor on the node down so add an orphaned slot to hold the unsupervised worker
+                    node.addOrphanedSlot(ws);
+                }
+                if (node.assignInternal(ws, topId, true)) {
+                    LOG.warn("Bad scheduling state for topology [" + topId + "], the slot " + ws + " assigned to multiple workers, un-assigning everything...");
+                    node.free(ws, cluster, true);
+                }
+            }
+        }
+
+        return nodeIdToNode;
     }
-  };
+
+    /**
+     * Used to sort a list of nodes so the node with the most free slots comes first.
+     */
+    public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new Comparator<Node>() {
+        @Override
+        public int compare(Node o1, Node o2) {
+            return o2.totalSlotsFree() - o1.totalSlotsFree();
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java
index 21d1577..9537fa8 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java
@@ -42,255 +42,259 @@ import backtype.storm.scheduler.WorkerSlot;
  * A pool of nodes that can be used to run topologies.
  */
 public abstract class NodePool {
-  protected Cluster _cluster;
-  protected Map<String, Node> _nodeIdToNode;
-  
-  public static class NodeAndSlotCounts {
-    public final int _nodes;
-    public final int _slots;
-    
-    public NodeAndSlotCounts(int nodes, int slots) {
-      _nodes = nodes;
-      _slots = slots;
+    protected Cluster _cluster;
+    protected Map<String, Node> _nodeIdToNode;
+
+    public static class NodeAndSlotCounts {
+        public final int _nodes;
+        public final int _slots;
+
+        public NodeAndSlotCounts(int nodes, int slots) {
+            _nodes = nodes;
+            _slots = slots;
+        }
     }
-  }
 
-  /**
-   * Place executors into slots in a round robin way, taking into account
-   * component spreading among different hosts.
-   */
-  public static class RoundRobinSlotScheduler {
-    private Map<String,Set<String>> _nodeToComps;
-    private HashMap<String, List<ExecutorDetails>> _spreadToSchedule;
-    private LinkedList<Set<ExecutorDetails>> _slots;
-    private Set<ExecutorDetails> _lastSlot;
-    private Cluster _cluster;
-    private String _topId;
-    
     /**
-     * Create a new scheduler for a given topology
-     * @param td the topology to schedule
-     * @param slotsToUse the number of slots to use for the executors left to 
-     * schedule.
-     * @param cluster the cluster to schedule this on. 
+     * Place executors into slots in a round robin way, taking into account component spreading among different hosts.
      */
-    public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, 
-        Cluster cluster) {
-      _topId = td.getId();
-      _cluster = cluster;
-      
-      Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent();
-      SchedulerAssignment assignment = _cluster.getAssignmentById(_topId);
-      _nodeToComps = new HashMap<String, Set<String>>();
+    public static class RoundRobinSlotScheduler {
+        private Map<String, Set<String>> _nodeToComps;
+        private HashMap<String, List<ExecutorDetails>> _spreadToSchedule;
+        private LinkedList<Set<ExecutorDetails>> _slots;
+        private Set<ExecutorDetails> _lastSlot;
+        private Cluster _cluster;
+        private String _topId;
 
-      if (assignment != null) {
-        Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot();
-        
-        for (Entry<ExecutorDetails, WorkerSlot> entry: execToSlot.entrySet()) {
-          String nodeId = entry.getValue().getNodeId();
-          Set<String> comps = _nodeToComps.get(nodeId);
-          if (comps == null) {
-            comps = new HashSet<String>();
-            _nodeToComps.put(nodeId, comps);
-          }
-          comps.add(execToComp.get(entry.getKey()));
-        }
-      }
-      
-      _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>();
-      List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
-      if (spreadComps != null) {
-        for (String comp: spreadComps) {
-          _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>());
+        /**
+         * Create a new scheduler for a given topology
+         * 
+         * @param td the topology to schedule
+         * @param slotsToUse the number of slots to use for the executors left to schedule.
+         * @param cluster the cluster to schedule this on.
+         */
+        public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, Cluster cluster) {
+            _topId = td.getId();
+            _cluster = cluster;
+
+            Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent();
+            SchedulerAssignment assignment = _cluster.getAssignmentById(_topId);
+            _nodeToComps = new HashMap<String, Set<String>>();
+
+            if (assignment != null) {
+                Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot();
+
+                for (Entry<ExecutorDetails, WorkerSlot> entry : execToSlot.entrySet()) {
+                    String nodeId = entry.getValue().getNodeId();
+                    Set<String> comps = _nodeToComps.get(nodeId);
+                    if (comps == null) {
+                        comps = new HashSet<String>();
+                        _nodeToComps.put(nodeId, comps);
+                    }
+                    comps.add(execToComp.get(entry.getKey()));
+                }
+            }
+
+            _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>();
+            List<String> spreadComps = (List<String>) td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
+            if (spreadComps != null) {
+                for (String comp : spreadComps) {
+                    _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>());
+                }
+            }
+
+            _slots = new LinkedList<Set<ExecutorDetails>>();
+            for (int i = 0; i < slotsToUse; i++) {
+                _slots.add(new HashSet<ExecutorDetails>());
+            }
+
+            int at = 0;
+            for (Entry<String, List<ExecutorDetails>> entry : _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) {
+                LOG.debug("Scheduling for {}", entry.getKey());
+                if (_spreadToSchedule.containsKey(entry.getKey())) {
+                    LOG.debug("Saving {} for spread...", entry.getKey());
+                    _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
+                } else {
+                    for (ExecutorDetails ed : entry.getValue()) {
+                        LOG.debug("Assigning {} {} to slot {}", new Object[] { entry.getKey(), ed, at });
+                        _slots.get(at).add(ed);
+                        at++;
+                        if (at >= _slots.size()) {
+                            at = 0;
+                        }
+                    }
+                }
+            }
+            _lastSlot = _slots.get(_slots.size() - 1);
         }
-      }
-      
-      _slots = new LinkedList<Set<ExecutorDetails>>();
-      for (int i = 0; i < slotsToUse; i++) {
-        _slots.add(new HashSet<ExecutorDetails>());
-      }
 
-      int at = 0;
-      for (Entry<String, List<ExecutorDetails>> entry: _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) {
-        LOG.debug("Scheduling for {}", entry.getKey());
-        if (_spreadToSchedule.containsKey(entry.getKey())) {
-          LOG.debug("Saving {} for spread...",entry.getKey());
-          _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
-        } else {
-          for (ExecutorDetails ed: entry.getValue()) {
-            LOG.debug("Assigning {} {} to slot {}", new Object[]{entry.getKey(), ed, at});
-            _slots.get(at).add(ed);
-            at++;
-            if (at >= _slots.size()) {
-              at = 0;
+        /**
+         * Assign a slot to the given node.
+         * 
+         * @param n the node to assign a slot to.
+         * @return true if there are more slots to assign else false.
+         */
+        public boolean assignSlotTo(Node n) {
+            if (_slots.isEmpty()) {
+                return false;
             }
-          }
+            Set<ExecutorDetails> slot = _slots.pop();
+            if (slot == _lastSlot) {
+                // The last slot fill it up
+                for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) {
+                    if (entry.getValue().size() > 0) {
+                        slot.addAll(entry.getValue());
+                    }
+                }
+            } else {
+                String nodeId = n.getId();
+                Set<String> nodeComps = _nodeToComps.get(nodeId);
+                if (nodeComps == null) {
+                    nodeComps = new HashSet<String>();
+                    _nodeToComps.put(nodeId, nodeComps);
+                }
+                for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) {
+                    if (entry.getValue().size() > 0) {
+                        String comp = entry.getKey();
+                        if (!nodeComps.contains(comp)) {
+                            nodeComps.add(comp);
+                            slot.add(entry.getValue().remove(0));
+                        }
+                    }
+                }
+            }
+            n.assign(_topId, slot, _cluster);
+            return !_slots.isEmpty();
         }
-      }
-      _lastSlot = _slots.get(_slots.size() - 1);
     }
-    
+
+    private static final Logger LOG = LoggerFactory.getLogger(NodePool.class);
+
+    /**
+     * Initialize the pool.
+     * 
+     * @param cluster the cluster
+     * @param nodeIdToNode the mapping of node id to nodes
+     */
+    public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
+        _cluster = cluster;
+        _nodeIdToNode = nodeIdToNode;
+    }
+
     /**
-     * Assign a slot to the given node.
-     * @param n the node to assign a slot to.
-     * @return true if there are more slots to assign else false.
+     * Add a topology to the pool
+     * 
+     * @param td the topology to add.
      */
-    public boolean assignSlotTo(Node n) {
-      if (_slots.isEmpty()) {
-        return false;
-      }
-      Set<ExecutorDetails> slot = _slots.pop();
-      if (slot == _lastSlot) {
-        //The last slot fill it up
-        for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) {
-          if (entry.getValue().size() > 0) {
-            slot.addAll(entry.getValue());
-          }
+    public abstract void addTopology(TopologyDetails td);
+
+    /**
+     * Check if this topology can be added to this pool
+     * 
+     * @param td the topology
+     * @return true if it can else false
+     */
+    public abstract boolean canAdd(TopologyDetails td);
+
+    /**
+     * @return the number of nodes that are available to be taken
+     */
+    public abstract int slotsAvailable();
+
+    /**
+     * Take nodes from this pool that can fulfill possibly up to the slotsNeeded
+     * 
+     * @param slotsNeeded the number of slots that are needed.
+     * @return a Collection of nodes with the removed nodes in it. This may be empty, but should not be null.
+     */
+    public abstract Collection<Node> takeNodesBySlots(int slotsNeeded);
+
+    /**
+     * Get the number of nodes and slots this would provide to get the slots needed
+     * 
+     * @param slots the number of slots needed
+     * @return the number of nodes and slots that would be returned.
+     */
+    public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots);
+
+    /**
+     * @return the number of nodes that are available to be taken
+     */
+    public abstract int nodesAvailable();
+
+    /**
+     * Take up to nodesNeeded from this pool
+     * 
+     * @param nodesNeeded the number of nodes that are needed.
+     * @return a Collection of nodes with the removed nodes in it. This may be empty, but should not be null.
+     */
+    public abstract Collection<Node> takeNodes(int nodesNeeded);
+
+    /**
+     * Reschedule any topologies as needed.
+     * 
+     * @param lesserPools pools that may be used to steal nodes from.
+     */
+    public abstract void scheduleAsNeeded(NodePool... lesserPools);
+
+    public static int slotsAvailable(NodePool[] pools) {
+        int slotsAvailable = 0;
+        for (NodePool pool : pools) {
+            slotsAvailable += pool.slotsAvailable();
         }
-      } else {
-        String nodeId = n.getId();
-        Set<String> nodeComps = _nodeToComps.get(nodeId);
-        if (nodeComps == null) {
-          nodeComps = new HashSet<String>();
-          _nodeToComps.put(nodeId, nodeComps);
+        return slotsAvailable;
+    }
+
+    public static int nodesAvailable(NodePool[] pools) {
+        int nodesAvailable = 0;
+        for (NodePool pool : pools) {
+            nodesAvailable += pool.nodesAvailable();
         }
-        for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) {
-          if (entry.getValue().size() > 0) {
-            String comp = entry.getKey();
-            if (!nodeComps.contains(comp)) {
-              nodeComps.add(comp);
-              slot.add(entry.getValue().remove(0));
+        return nodesAvailable;
+    }
+
+    public static Collection<Node> takeNodesBySlot(int slotsNeeded, NodePool[] pools) {
+        LOG.debug("Trying to grab {} free slots from {}", slotsNeeded, pools);
+        HashSet<Node> ret = new HashSet<Node>();
+        for (NodePool pool : pools) {
+            Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
+            ret.addAll(got);
+            slotsNeeded -= Node.countFreeSlotsAlive(got);
+            LOG.debug("Got {} nodes so far need {} more slots", ret.size(), slotsNeeded);
+            if (slotsNeeded <= 0) {
+                break;
             }
-          }
         }
-      }
-      n.assign(_topId, slot, _cluster);
-      return !_slots.isEmpty();
+        return ret;
     }
-  }
-  
-  private static final Logger LOG = LoggerFactory.getLogger(NodePool.class);
-  /**
-   * Initialize the pool.
-   * @param cluster the cluster
-   * @param nodeIdToNode the mapping of node id to nodes
-   */
-  public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
-    _cluster = cluster;
-    _nodeIdToNode = nodeIdToNode;
-  }
-  
-  /**
-   * Add a topology to the pool
-   * @param td the topology to add.
-   */
-  public abstract void addTopology(TopologyDetails td);
-  
-  /**
-   * Check if this topology can be added to this pool
-   * @param td the topology
-   * @return true if it can else false
-   */
-  public abstract boolean canAdd(TopologyDetails td);
-  
-  /**
-   * @return the number of nodes that are available to be taken
-   */
-  public abstract int slotsAvailable();
-  
-  /**
-   * Take nodes from this pool that can fulfill possibly up to the
-   * slotsNeeded
-   * @param slotsNeeded the number of slots that are needed.
-   * @return a Collection of nodes with the removed nodes in it.  
-   * This may be empty, but should not be null.
-   */
-  public abstract Collection<Node> takeNodesBySlots(int slotsNeeded);
 
-  /**
-   * Get the number of nodes and slots this would provide to get the slots needed
-   * @param slots the number of slots needed
-   * @return the number of nodes and slots that would be returned.
-   */
-  public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots);
-  
-  /**
-   * @return the number of nodes that are available to be taken
-   */
-  public abstract int nodesAvailable();
-  
-  /**
-   * Take up to nodesNeeded from this pool
-   * @param nodesNeeded the number of nodes that are needed.
-   * @return a Collection of nodes with the removed nodes in it.  
-   * This may be empty, but should not be null.
-   */
-  public abstract Collection<Node> takeNodes(int nodesNeeded);
-  
-  /**
-   * Reschedule any topologies as needed.
-   * @param lesserPools pools that may be used to steal nodes from.
-   */
-  public abstract void scheduleAsNeeded(NodePool ... lesserPools);
-  
-  public static int slotsAvailable(NodePool[] pools) {
-    int slotsAvailable = 0;
-    for (NodePool pool: pools) {
-      slotsAvailable += pool.slotsAvailable();
-    }
-    return slotsAvailable;
-  }
-  
-  public static int nodesAvailable(NodePool[] pools) {
-    int nodesAvailable = 0;
-    for (NodePool pool: pools) {
-      nodesAvailable += pool.nodesAvailable();
-    }
-    return nodesAvailable;
-  }
-  
-  public static Collection<Node> takeNodesBySlot(int slotsNeeded,NodePool[] pools) {
-    LOG.debug("Trying to grab {} free slots from {}",slotsNeeded, pools);
-    HashSet<Node> ret = new HashSet<Node>();
-    for (NodePool pool: pools) {
-      Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
-      ret.addAll(got);
-      slotsNeeded -= Node.countFreeSlotsAlive(got);
-      LOG.debug("Got {} nodes so far need {} more slots",ret.size(),slotsNeeded);
-      if (slotsNeeded <= 0) {
-        break;
-      }
-    }
-    return ret;
-  }
-  
-  public static Collection<Node> takeNodes(int nodesNeeded,NodePool[] pools) {
-    LOG.debug("Trying to grab {} free nodes from {}",nodesNeeded, pools);
-    HashSet<Node> ret = new HashSet<Node>();
-    for (NodePool pool: pools) {
-      Collection<Node> got = pool.takeNodes(nodesNeeded);
-      ret.addAll(got);
-      nodesNeeded -= got.size();
-      LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded);
-      if (nodesNeeded <= 0) {
-        break;
-      }
+    public static Collection<Node> takeNodes(int nodesNeeded, NodePool[] pools) {
+        LOG.debug("Trying to grab {} free nodes from {}", nodesNeeded, pools);
+        HashSet<Node> ret = new HashSet<Node>();
+        for (NodePool pool : pools) {
+            Collection<Node> got = pool.takeNodes(nodesNeeded);
+            ret.addAll(got);
+            nodesNeeded -= got.size();
+            LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded);
+            if (nodesNeeded <= 0) {
+                break;
+            }
+        }
+        return ret;
     }
-    return ret;
-  }
 
-  public static int getNodeCountIfSlotsWereTaken(int slots,NodePool[] pools) {
-    LOG.debug("How many nodes to get {} slots from {}",slots, pools);
-    int total = 0;
-    for (NodePool pool: pools) {
-      NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots);
-      total += ns._nodes;
-      slots -= ns._slots;
-      LOG.debug("Found {} nodes so far {} more slots needed", total, slots);
-      if (slots <= 0) {
-        break;
-      }
-    }    
-    return total;
-  }
+    public static int getNodeCountIfSlotsWereTaken(int slots, NodePool[] pools) {
+        LOG.debug("How many nodes to get {} slots from {}", slots, pools);
+        int total = 0;
+        for (NodePool pool : pools) {
+            NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots);
+            total += ns._nodes;
+            slots -= ns._slots;
+            LOG.debug("Found {} nodes so far {} more slots needed", total, slots);
+            if (slots <= 0) {
+                break;
+            }
+        }
+        return total;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java
index 9670045..761eac0 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java
@@ -22,23 +22,23 @@ import backtype.storm.daemon.Shutdownable;
 import java.util.Map;
 
 /**
- * Nimbus auto credential plugin that will be called on nimbus host
- * during submit topology option. User can specify a list of implementation using config key
+ * Nimbus auto credential plugin that will be called on nimbus host during submit topology option. User can specify a list of implementation using config key
  * nimbus.autocredential.plugins.classes.
  */
 public interface INimbusCredentialPlugin extends Shutdownable {
 
     /**
      * this method will be called when nimbus initializes.
+     * 
      * @param conf
      */
     void prepare(Map conf);
 
     /**
-     * Method that will be called on nimbus as part of submit topology. This plugin will be called
-     * at least once during the submit Topology action. It will be not be called during activate instead
-     * the credentials return by this method will be merged with the other credentials in the topology
-     * and stored in zookeeper.
+     * Method that will be called on nimbus as part of submit topology. This plugin will be called at least once during the submit Topology action. It will be
+     * not be called during activate instead the credentials return by this method will be merged with the other credentials in the topology and stored in
+     * zookeeper.
+     * 
      * @param credentials credentials map where more credentials will be added.
      * @param conf topology configuration
      * @return

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java b/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java
index ac3fb53..60653a1 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java
@@ -45,19 +45,19 @@ public class AuthUtils {
 
     /**
      * Construct a JAAS configuration object per storm configuration file
+     * 
      * @param storm_conf Storm configuration
      * @return JAAS configuration object
      */
     public static Configuration GetConfiguration(Map storm_conf) {
         Configuration login_conf = null;
 
-        //find login file configuration from Storm configuration
-        String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
-        if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
+        // find login file configuration from Storm configuration
+        String loginConfigurationFile = (String) storm_conf.get("java.security.auth.login.config");
+        if ((loginConfigurationFile != null) && (loginConfigurationFile.length() > 0)) {
             File config_file = new File(loginConfigurationFile);
-            if (! config_file.canRead()) {
-                throw new RuntimeException("File " + loginConfigurationFile +
-                        " cannot be read.");
+            if (!config_file.canRead()) {
+                throw new RuntimeException("File " + loginConfigurationFile + " cannot be read.");
             }
             try {
                 URI config_uri = config_file.toURI();
@@ -72,24 +72,26 @@ public class AuthUtils {
 
     /**
      * Construct a principal to local plugin
+     * 
      * @param conf storm configuration
      * @return the plugin
      */
     public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) {
         IPrincipalToLocal ptol = null;
         try {
-          String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
-          Class klass = Class.forName(ptol_klassName);
-          ptol = (IPrincipalToLocal)klass.newInstance();
-          ptol.prepare(storm_conf);
+            String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
+            Class klass = Class.forName(ptol_klassName);
+            ptol = (IPrincipalToLocal) klass.newInstance();
+            ptol.prepare(storm_conf);
         } catch (Exception e) {
-          throw new RuntimeException(e);
+            throw new RuntimeException(e);
         }
         return ptol;
     }
 
     /**
      * Construct a group mapping service provider plugin
+     * 
      * @param conf storm configuration
      * @return the plugin
      */
@@ -98,26 +100,27 @@ public class AuthUtils {
         try {
             String gmsp_klassName = (String) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
             Class klass = Class.forName(gmsp_klassName);
-            gmsp = (IGroupMappingServiceProvider)klass.newInstance();
+            gmsp = (IGroupMappingServiceProvider) klass.newInstance();
             gmsp.prepare(storm_conf);
         } catch (Exception e) {
-          throw new RuntimeException(e);
+            throw new RuntimeException(e);
         }
         return gmsp;
     }
 
     /**
      * Get all of the configured Credential Renwer Plugins.
+     * 
      * @param storm_conf the storm configuration to use.
      * @return the configured credential renewers.
      */
     public static Collection<ICredentialsRenewer> GetCredentialRenewers(Map conf) {
         try {
             Set<ICredentialsRenewer> ret = new HashSet<ICredentialsRenewer>();
-            Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS);
+            Collection<String> clazzes = (Collection<String>) conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS);
             if (clazzes != null) {
                 for (String clazz : clazzes) {
-                    ICredentialsRenewer inst = (ICredentialsRenewer)Class.forName(clazz).newInstance();
+                    ICredentialsRenewer inst = (ICredentialsRenewer) Class.forName(clazz).newInstance();
                     inst.prepare(conf);
                     ret.add(inst);
                 }
@@ -130,16 +133,17 @@ public class AuthUtils {
 
     /**
      * Get all the Nimbus Auto cred plugins.
+     * 
      * @param conf nimbus configuration to use.
      * @return nimbus auto credential plugins.
      */
     public static Collection<INimbusCredentialPlugin> getNimbusAutoCredPlugins(Map conf) {
         try {
             Set<INimbusCredentialPlugin> ret = new HashSet<INimbusCredentialPlugin>();
-            Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS);
+            Collection<String> clazzes = (Collection<String>) conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS);
             if (clazzes != null) {
                 for (String clazz : clazzes) {
-                    INimbusCredentialPlugin inst = (INimbusCredentialPlugin)Class.forName(clazz).newInstance();
+                    INimbusCredentialPlugin inst = (INimbusCredentialPlugin) Class.forName(clazz).newInstance();
                     inst.prepare(conf);
                     ret.add(inst);
                 }
@@ -152,21 +156,22 @@ public class AuthUtils {
 
     /**
      * Get all of the configured AutoCredential Plugins.
+     * 
      * @param storm_conf the storm configuration to use.
      * @return the configured auto credentials.
      */
     public static Collection<IAutoCredentials> GetAutoCredentials(Map storm_conf) {
         try {
             Set<IAutoCredentials> autos = new HashSet<IAutoCredentials>();
-            Collection<String> clazzes = (Collection<String>)storm_conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS);
+            Collection<String> clazzes = (Collection<String>) storm_conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS);
             if (clazzes != null) {
                 for (String clazz : clazzes) {
-                    IAutoCredentials a = (IAutoCredentials)Class.forName(clazz).newInstance();
+                    IAutoCredentials a = (IAutoCredentials) Class.forName(clazz).newInstance();
                     a.prepare(storm_conf);
                     autos.add(a);
                 }
             }
-            LOG.info("Got AutoCreds "+autos);
+            LOG.info("Got AutoCreds " + autos);
             return autos;
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -175,12 +180,13 @@ public class AuthUtils {
 
     /**
      * Populate a subject from credentials using the IAutoCredentials.
+     * 
      * @param subject the subject to populate or null if a new Subject should be created.
      * @param autos the IAutoCredentials to call to populate the subject.
      * @param credentials the credentials to pull from
      * @return the populated subject.
      */
-    public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) {
+    public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String, String> credentials) {
         try {
             if (subject == null) {
                 subject = new Subject();
@@ -196,11 +202,12 @@ public class AuthUtils {
 
     /**
      * Update a subject from credentials using the IAutoCredentials.
+     * 
      * @param subject the subject to update
      * @param autos the IAutoCredentials to call to update the subject.
      * @param credentials the credentials to pull from
      */
-    public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) {
+    public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String, String> credentials) {
         if (subject == null) {
             throw new RuntimeException("The subject cannot be null when updating a subject with credentials");
         }
@@ -216,68 +223,68 @@ public class AuthUtils {
 
     /**
      * Construct a transport plugin per storm configuration
+     * 
      * @param conf storm configuration
      * @return
      */
     public static ITransportPlugin GetTransportPlugin(ThriftConnectionType type, Map storm_conf, Configuration login_conf) {
-        ITransportPlugin  transportPlugin = null;
+        ITransportPlugin transportPlugin = null;
         try {
             String transport_plugin_klassName = type.getTransportPlugin(storm_conf);
             Class klass = Class.forName(transport_plugin_klassName);
-            transportPlugin = (ITransportPlugin)klass.newInstance();
+            transportPlugin = (ITransportPlugin) klass.newInstance();
             transportPlugin.prepare(type, storm_conf, login_conf);
-        } catch(Exception e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
         return transportPlugin;
     }
 
-    private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf,
-            String klassName) {
+    private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf, String klassName) {
         IHttpCredentialsPlugin plugin = null;
         try {
             Class klass = Class.forName(klassName);
-            plugin = (IHttpCredentialsPlugin)klass.newInstance();
+            plugin = (IHttpCredentialsPlugin) klass.newInstance();
             plugin.prepare(conf);
-        } catch(Exception e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
         return plugin;
     }
 
     /**
-     * Construct an HttpServletRequest credential plugin specified by the UI
-     * storm configuration
+     * Construct an HttpServletRequest credential plugin specified by the UI storm configuration
+     * 
      * @param conf storm configuration
      * @return the plugin
      */
     public static IHttpCredentialsPlugin GetUiHttpCredentialsPlugin(Map conf) {
-        String klassName = (String)conf.get(Config.UI_HTTP_CREDS_PLUGIN);
+        String klassName = (String) conf.get(Config.UI_HTTP_CREDS_PLUGIN);
         return AuthUtils.GetHttpCredentialsPlugin(conf, klassName);
     }
 
     /**
-     * Construct an HttpServletRequest credential plugin specified by the DRPC
-     * storm configuration
+     * Construct an HttpServletRequest credential plugin specified by the DRPC storm configuration
+     * 
      * @param conf storm configuration
      * @return the plugin
      */
     public static IHttpCredentialsPlugin GetDrpcHttpCredentialsPlugin(Map conf) {
-        String klassName = (String)conf.get(Config.DRPC_HTTP_CREDS_PLUGIN);
+        String klassName = (String) conf.get(Config.DRPC_HTTP_CREDS_PLUGIN);
         return AuthUtils.GetHttpCredentialsPlugin(conf, klassName);
     }
 
     public static String get(Configuration configuration, String section, String key) throws IOException {
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
+            String errorMessage = "Could not find a '" + section + "' entry in this configuration.";
             throw new IOException(errorMessage);
         }
 
-        for(AppConfigurationEntry entry: configurationEntries) {
+        for (AppConfigurationEntry entry : configurationEntries) {
             Object val = entry.getOptions().get(key);
             if (val != null)
-                return (String)val;
+                return (String) val;
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
index e2469e5..6386992 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
@@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory;
 import backtype.storm.security.auth.ReqContext;
 
 public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
-    private static final Logger LOG =
-            LoggerFactory.getLogger(DefaultHttpCredentialsPlugin.class);
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultHttpCredentialsPlugin.class);
 
     /**
      * No-op
+     * 
      * @param storm_conf Storm configuration
      */
     @Override
@@ -45,6 +45,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
 
     /**
      * Gets the user name from the request principal.
+     * 
      * @param req the servlet request
      * @return the authenticated user, or null if none is authenticated
      */
@@ -54,7 +55,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
         if (req != null && (princ = req.getUserPrincipal()) != null) {
             String userName = princ.getName();
             if (userName != null && !userName.isEmpty()) {
-                LOG.debug("HTTP request had user ("+userName+")");
+                LOG.debug("HTTP request had user (" + userName + ")");
                 return userName;
             }
         }
@@ -62,29 +63,28 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
     }
 
     /**
-     * Populates a given context with a new Subject derived from the
-     * credentials in a servlet request.
+     * Populates a given context with a new Subject derived from the credentials in a servlet request.
+     * 
      * @param context the context to be populated
      * @param req the servlet request
      * @return the context
      */
     @Override
-    public ReqContext populateContext(ReqContext context,
-            HttpServletRequest req) {
+    public ReqContext populateContext(ReqContext context, HttpServletRequest req) {
         String userName = getUserName(req);
 
         String doAsUser = req.getHeader("doAsUser");
-        if(doAsUser == null) {
+        if (doAsUser == null) {
             doAsUser = req.getParameter("doAsUser");
         }
 
-        if(doAsUser != null) {
+        if (doAsUser != null) {
             context.setRealPrincipal(new SingleUserPrincipal(userName));
             userName = doAsUser;
         }
 
         Set<Principal> principals = new HashSet<Principal>();
-        if(userName != null) {
+        if (userName != null) {
             Principal p = new SingleUserPrincipal(userName);
             principals.add(p);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java
index 729d744..47e23b0 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java
@@ -22,22 +22,24 @@ import java.util.Map;
 import java.security.Principal;
 
 /**
- * Storm can be configured to launch worker processed as a given user.
- * Some transports need to map the Principal to a local user name.
+ * Storm can be configured to launch worker processed as a given user. Some transports need to map the Principal to a local user name.
  */
 public class DefaultPrincipalToLocal implements IPrincipalToLocal {
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * 
+     * @param conf Storm configuration
      */
-    public void prepare(Map storm_conf) {}
-    
+    public void prepare(Map storm_conf) {
+    }
+
     /**
      * Convert a Principal to a local user name.
+     * 
      * @param principal the principal to convert
      * @return The local user name.
      */
     public String toLocal(Principal principal) {
-      return principal == null ? null : principal.getName();
+        return principal == null ? null : principal.getName();
     }
 }