You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:12 UTC
[34/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java
index dc7eded..3a50a3f 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/IsolatedPool.java
@@ -40,307 +40,297 @@ import backtype.storm.scheduler.WorkerSlot;
* A pool of machines that can be used to run isolated topologies
*/
public class IsolatedPool extends NodePool {
- private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class);
- private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>();
- private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
- private HashSet<String> _isolated = new HashSet<String>();
- private int _maxNodes;
- private int _usedNodes;
+ private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class);
+ private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>();
+ private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
+ private HashSet<String> _isolated = new HashSet<String>();
+ private int _maxNodes;
+ private int _usedNodes;
- public IsolatedPool(int maxNodes) {
- _maxNodes = maxNodes;
- _usedNodes = 0;
- }
-
- @Override
- public void addTopology(TopologyDetails td) {
- String topId = td.getId();
- LOG.debug("Adding in Topology {}", topId);
- SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
- Set<Node> assignedNodes = new HashSet<Node>();
- if (assignment != null) {
- for (WorkerSlot ws: assignment.getSlots()) {
- Node n = _nodeIdToNode.get(ws.getNodeId());
- assignedNodes.add(n);
- }
- }
- _usedNodes += assignedNodes.size();
- _topologyIdToNodes.put(topId, assignedNodes);
- _tds.put(topId, td);
- if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
- _isolated.add(topId);
+ public IsolatedPool(int maxNodes) {
+ _maxNodes = maxNodes;
+ _usedNodes = 0;
}
- }
- @Override
- public boolean canAdd(TopologyDetails td) {
- //Only add topologies that are not sharing nodes with other topologies
- String topId = td.getId();
- SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
- if (assignment != null) {
- for (WorkerSlot ws: assignment.getSlots()) {
- Node n = _nodeIdToNode.get(ws.getNodeId());
- if (n.getRunningTopologies().size() > 1) {
- return false;
- }
- }
- }
- return true;
- }
-
- @Override
- public void scheduleAsNeeded(NodePool ... lesserPools) {
- for (String topId : _topologyIdToNodes.keySet()) {
- TopologyDetails td = _tds.get(topId);
- if (_cluster.needsScheduling(td)) {
- LOG.debug("Scheduling topology {}",topId);
- Set<Node> allNodes = _topologyIdToNodes.get(topId);
- Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
- int slotsToUse = 0;
- if (nodesRequested == null) {
- slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
- } else {
- slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools,
- nodesRequested.intValue());
+ @Override
+ public void addTopology(TopologyDetails td) {
+ String topId = td.getId();
+ LOG.debug("Adding in Topology {}", topId);
+ SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
+ Set<Node> assignedNodes = new HashSet<Node>();
+ if (assignment != null) {
+ for (WorkerSlot ws : assignment.getSlots()) {
+ Node n = _nodeIdToNode.get(ws.getNodeId());
+ assignedNodes.add(n);
+ }
}
- //No slots to schedule for some reason, so skip it.
- if (slotsToUse <= 0) {
- continue;
+ _usedNodes += assignedNodes.size();
+ _topologyIdToNodes.put(topId, assignedNodes);
+ _tds.put(topId, td);
+ if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
+ _isolated.add(topId);
}
-
- RoundRobinSlotScheduler slotSched =
- new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
-
- LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes);
- Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+ }
- LOG.debug("Nodes sorted by free space {}", sortedNodes);
- while (true) {
- Node n = sortedNodes.remove();
- if (!slotSched.assignSlotTo(n)) {
- break;
- }
- int freeSlots = n.totalSlotsFree();
- for (int i = 0; i < sortedNodes.size(); i++) {
- if (freeSlots >= sortedNodes.get(i).totalSlotsFree()) {
- sortedNodes.add(i, n);
- n = null;
- break;
+ @Override
+ public boolean canAdd(TopologyDetails td) {
+ // Only add topologies that are not sharing nodes with other topologies
+ String topId = td.getId();
+ SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
+ if (assignment != null) {
+ for (WorkerSlot ws : assignment.getSlots()) {
+ Node n = _nodeIdToNode.get(ws.getNodeId());
+ if (n.getRunningTopologies().size() > 1) {
+ return false;
+ }
}
- }
- if (n != null) {
- sortedNodes.add(n);
- }
}
- }
- Set<Node> found = _topologyIdToNodes.get(topId);
- int nc = found == null ? 0 : found.size();
- _cluster.setStatus(topId,"Scheduled Isolated on "+nc+" Nodes");
+ return true;
}
- }
-
- /**
- * Get the nodes needed to schedule an isolated topology.
- * @param td the topology to be scheduled
- * @param allNodes the nodes already scheduled for this topology.
- * This will be updated to include new nodes if needed.
- * @param lesserPools node pools we can steal nodes from
- * @return the number of additional slots that should be used for scheduling.
- */
- private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes,
- NodePool[] lesserPools, int nodesRequested) {
- String topId = td.getId();
- LOG.debug("Topology {} is isolated", topId);
- int nodesFromUsAvailable = nodesAvailable();
- int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools);
- int nodesUsed = _topologyIdToNodes.get(topId).size();
- int nodesNeeded = nodesRequested - nodesUsed;
- LOG.debug("Nodes... requested {} used {} available from us {} " +
- "avail from other {} needed {}", new Object[] {nodesRequested,
- nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable,
- nodesNeeded});
- if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
- _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. "
- + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes))
- + " more nodes needed to run topology.");
- return 0;
- }
+ @Override
+ public void scheduleAsNeeded(NodePool... lesserPools) {
+ for (String topId : _topologyIdToNodes.keySet()) {
+ TopologyDetails td = _tds.get(topId);
+ if (_cluster.needsScheduling(td)) {
+ LOG.debug("Scheduling topology {}", topId);
+ Set<Node> allNodes = _topologyIdToNodes.get(topId);
+ Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
+ int slotsToUse = 0;
+ if (nodesRequested == null) {
+ slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
+ } else {
+ slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools, nodesRequested.intValue());
+ }
+ // No slots to schedule for some reason, so skip it.
+ if (slotsToUse <= 0) {
+ continue;
+ }
- //In order to avoid going over _maxNodes I may need to steal from
- // myself even though other pools have free nodes. so figure out how
- // much each group should provide
- int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes,
- nodesFromOthersAvailable), nodesNeeded);
- int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers;
- LOG.debug("Nodes... needed from us {} needed from others {}",
- nodesNeededFromUs, nodesNeededFromOthers);
+ RoundRobinSlotScheduler slotSched = new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
- if (nodesNeededFromUs > nodesFromUsAvailable) {
- _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology");
- return 0;
+ LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes);
+ Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+
+ LOG.debug("Nodes sorted by free space {}", sortedNodes);
+ while (true) {
+ Node n = sortedNodes.remove();
+ if (!slotSched.assignSlotTo(n)) {
+ break;
+ }
+ int freeSlots = n.totalSlotsFree();
+ for (int i = 0; i < sortedNodes.size(); i++) {
+ if (freeSlots >= sortedNodes.get(i).totalSlotsFree()) {
+ sortedNodes.add(i, n);
+ n = null;
+ break;
+ }
+ }
+ if (n != null) {
+ sortedNodes.add(n);
+ }
+ }
+ }
+ Set<Node> found = _topologyIdToNodes.get(topId);
+ int nc = found == null ? 0 : found.size();
+ _cluster.setStatus(topId, "Scheduled Isolated on " + nc + " Nodes");
+ }
}
- //Get the nodes
- Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools);
- _usedNodes += found.size();
- allNodes.addAll(found);
- Collection<Node> foundMore = takeNodes(nodesNeededFromUs);
- _usedNodes += foundMore.size();
- allNodes.addAll(foundMore);
+ /**
+ * Get the nodes needed to schedule an isolated topology.
+ *
+ * @param td the topology to be scheduled
+ * @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed.
+ * @param lesserPools node pools we can steal nodes from
+ * @return the number of additional slots that should be used for scheduling.
+ */
+ private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools, int nodesRequested) {
+ String topId = td.getId();
+ LOG.debug("Topology {} is isolated", topId);
+ int nodesFromUsAvailable = nodesAvailable();
+ int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools);
- int totalTasks = td.getExecutors().size();
- int origRequest = td.getNumWorkers();
- int slotsRequested = Math.min(totalTasks, origRequest);
- int slotsUsed = Node.countSlotsUsed(allNodes);
- int slotsFree = Node.countFreeSlotsAlive(allNodes);
- int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree);
- if (slotsToUse <= 0) {
- _cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology.");
- }
- return slotsToUse;
- }
-
- /**
- * Get the nodes needed to schedule a non-isolated topology.
- * @param td the topology to be scheduled
- * @param allNodes the nodes already scheduled for this topology.
- * This will be updated to include new nodes if needed.
- * @param lesserPools node pools we can steal nodes from
- * @return the number of additional slots that should be used for scheduling.
- */
- private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes,
- NodePool[] lesserPools) {
- String topId = td.getId();
- LOG.debug("Topology {} is not isolated",topId);
- int totalTasks = td.getExecutors().size();
- int origRequest = td.getNumWorkers();
- int slotsRequested = Math.min(totalTasks, origRequest);
- int slotsUsed = Node.countSlotsUsed(topId, allNodes);
- int slotsFree = Node.countFreeSlotsAlive(allNodes);
- //Check to see if we have enough slots before trying to get them
- int slotsAvailable = 0;
- if (slotsRequested > slotsFree) {
- slotsAvailable = NodePool.slotsAvailable(lesserPools);
- }
- int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
- LOG.debug("Slots... requested {} used {} free {} available {} to be used {}",
- new Object[] {slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse});
- if (slotsToUse <= 0) {
- _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
- return 0;
+ int nodesUsed = _topologyIdToNodes.get(topId).size();
+ int nodesNeeded = nodesRequested - nodesUsed;
+ LOG.debug("Nodes... requested {} used {} available from us {} " + "avail from other {} needed {}", new Object[] { nodesRequested, nodesUsed,
+ nodesFromUsAvailable, nodesFromOthersAvailable, nodesNeeded });
+ if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
+ _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. "
+ + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) + " more nodes needed to run topology.");
+ return 0;
+ }
+
+ // In order to avoid going over _maxNodes I may need to steal from
+ // myself even though other pools have free nodes. so figure out how
+ // much each group should provide
+ int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, nodesFromOthersAvailable), nodesNeeded);
+ int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers;
+ LOG.debug("Nodes... needed from us {} needed from others {}", nodesNeededFromUs, nodesNeededFromOthers);
+
+ if (nodesNeededFromUs > nodesFromUsAvailable) {
+ _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology");
+ return 0;
+ }
+
+ // Get the nodes
+ Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools);
+ _usedNodes += found.size();
+ allNodes.addAll(found);
+ Collection<Node> foundMore = takeNodes(nodesNeededFromUs);
+ _usedNodes += foundMore.size();
+ allNodes.addAll(foundMore);
+
+ int totalTasks = td.getExecutors().size();
+ int origRequest = td.getNumWorkers();
+ int slotsRequested = Math.min(totalTasks, origRequest);
+ int slotsUsed = Node.countSlotsUsed(allNodes);
+ int slotsFree = Node.countFreeSlotsAlive(allNodes);
+ int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree);
+ if (slotsToUse <= 0) {
+ _cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology.");
+ }
+ return slotsToUse;
}
- int slotsNeeded = slotsToUse - slotsFree;
- int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
- LOG.debug("Nodes... new {} used {} max {}",
- new Object[]{numNewNodes, _usedNodes, _maxNodes});
- if ((numNewNodes + _usedNodes) > _maxNodes) {
- _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. " +
- (numNewNodes - (_maxNodes - _usedNodes)) + " more nodes needed to run topology.");
- return 0;
+
+ /**
+ * Get the nodes needed to schedule a non-isolated topology.
+ *
+ * @param td the topology to be scheduled
+ * @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed.
+ * @param lesserPools node pools we can steal nodes from
+ * @return the number of additional slots that should be used for scheduling.
+ */
+ private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools) {
+ String topId = td.getId();
+ LOG.debug("Topology {} is not isolated", topId);
+ int totalTasks = td.getExecutors().size();
+ int origRequest = td.getNumWorkers();
+ int slotsRequested = Math.min(totalTasks, origRequest);
+ int slotsUsed = Node.countSlotsUsed(topId, allNodes);
+ int slotsFree = Node.countFreeSlotsAlive(allNodes);
+ // Check to see if we have enough slots before trying to get them
+ int slotsAvailable = 0;
+ if (slotsRequested > slotsFree) {
+ slotsAvailable = NodePool.slotsAvailable(lesserPools);
+ }
+ int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
+ LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", new Object[] { slotsRequested, slotsUsed, slotsFree, slotsAvailable,
+ slotsToUse });
+ if (slotsToUse <= 0) {
+ _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
+ return 0;
+ }
+ int slotsNeeded = slotsToUse - slotsFree;
+ int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
+ LOG.debug("Nodes... new {} used {} max {}", new Object[] { numNewNodes, _usedNodes, _maxNodes });
+ if ((numNewNodes + _usedNodes) > _maxNodes) {
+ _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + (numNewNodes - (_maxNodes - _usedNodes))
+ + " more nodes needed to run topology.");
+ return 0;
+ }
+
+ Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools);
+ _usedNodes += found.size();
+ allNodes.addAll(found);
+ return slotsToUse;
}
-
- Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools);
- _usedNodes += found.size();
- allNodes.addAll(found);
- return slotsToUse;
- }
- @Override
- public Collection<Node> takeNodes(int nodesNeeded) {
- LOG.debug("Taking {} from {}", nodesNeeded, this);
- HashSet<Node> ret = new HashSet<Node>();
- for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
- if (!_isolated.contains(entry.getKey())) {
- Iterator<Node> it = entry.getValue().iterator();
- while (it.hasNext()) {
- if (nodesNeeded <= 0) {
- return ret;
- }
- Node n = it.next();
- it.remove();
- n.freeAllSlots(_cluster);
- ret.add(n);
- nodesNeeded--;
- _usedNodes--;
+ @Override
+ public Collection<Node> takeNodes(int nodesNeeded) {
+ LOG.debug("Taking {} from {}", nodesNeeded, this);
+ HashSet<Node> ret = new HashSet<Node>();
+ for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+ if (!_isolated.contains(entry.getKey())) {
+ Iterator<Node> it = entry.getValue().iterator();
+ while (it.hasNext()) {
+ if (nodesNeeded <= 0) {
+ return ret;
+ }
+ Node n = it.next();
+ it.remove();
+ n.freeAllSlots(_cluster);
+ ret.add(n);
+ nodesNeeded--;
+ _usedNodes--;
+ }
+ }
}
- }
+ return ret;
}
- return ret;
- }
-
- @Override
- public int nodesAvailable() {
- int total = 0;
- for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
- if (!_isolated.contains(entry.getKey())) {
- total += entry.getValue().size();
- }
+
+ @Override
+ public int nodesAvailable() {
+ int total = 0;
+ for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+ if (!_isolated.contains(entry.getKey())) {
+ total += entry.getValue().size();
+ }
+ }
+ return total;
}
- return total;
- }
-
- @Override
- public int slotsAvailable() {
- int total = 0;
- for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
- if (!_isolated.contains(entry.getKey())) {
- total += Node.countTotalSlotsAlive(entry.getValue());
- }
+
+ @Override
+ public int slotsAvailable() {
+ int total = 0;
+ for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+ if (!_isolated.contains(entry.getKey())) {
+ total += Node.countTotalSlotsAlive(entry.getValue());
+ }
+ }
+ return total;
}
- return total;
- }
- @Override
- public Collection<Node> takeNodesBySlots(int slotsNeeded) {
- HashSet<Node> ret = new HashSet<Node>();
- for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
- if (!_isolated.contains(entry.getKey())) {
- Iterator<Node> it = entry.getValue().iterator();
- while (it.hasNext()) {
- Node n = it.next();
- if (n.isAlive()) {
- it.remove();
- _usedNodes--;
- n.freeAllSlots(_cluster);
- ret.add(n);
- slotsNeeded -= n.totalSlots();
- if (slotsNeeded <= 0) {
- return ret;
+ @Override
+ public Collection<Node> takeNodesBySlots(int slotsNeeded) {
+ HashSet<Node> ret = new HashSet<Node>();
+ for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+ if (!_isolated.contains(entry.getKey())) {
+ Iterator<Node> it = entry.getValue().iterator();
+ while (it.hasNext()) {
+ Node n = it.next();
+ if (n.isAlive()) {
+ it.remove();
+ _usedNodes--;
+ n.freeAllSlots(_cluster);
+ ret.add(n);
+ slotsNeeded -= n.totalSlots();
+ if (slotsNeeded <= 0) {
+ return ret;
+ }
+ }
+ }
}
- }
}
- }
+ return ret;
}
- return ret;
- }
-
- @Override
- public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
- int nodesFound = 0;
- int slotsFound = 0;
- for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
- if (!_isolated.contains(entry.getKey())) {
- Iterator<Node> it = entry.getValue().iterator();
- while (it.hasNext()) {
- Node n = it.next();
- if (n.isAlive()) {
- nodesFound++;
- int totalSlotsFree = n.totalSlots();
- slotsFound += totalSlotsFree;
- slotsNeeded -= totalSlotsFree;
- if (slotsNeeded <= 0) {
- return new NodeAndSlotCounts(nodesFound, slotsFound);
+
+ @Override
+ public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
+ int nodesFound = 0;
+ int slotsFound = 0;
+ for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+ if (!_isolated.contains(entry.getKey())) {
+ Iterator<Node> it = entry.getValue().iterator();
+ while (it.hasNext()) {
+ Node n = it.next();
+ if (n.isAlive()) {
+ nodesFound++;
+ int totalSlotsFree = n.totalSlots();
+ slotsFound += totalSlotsFree;
+ slotsNeeded -= totalSlotsFree;
+ if (slotsNeeded <= 0) {
+ return new NodeAndSlotCounts(nodesFound, slotsFound);
+ }
+ }
+ }
}
- }
}
- }
+ return new NodeAndSlotCounts(nodesFound, slotsFound);
+ }
+
+ @Override
+ public String toString() {
+ return "IsolatedPool... ";
}
- return new NodeAndSlotCounts(nodesFound, slotsFound);
- }
-
- @Override
- public String toString() {
- return "IsolatedPool... ";
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
index 320b388..27475d9 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -32,67 +32,66 @@ import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.utils.Utils;
public class MultitenantScheduler implements IScheduler {
- private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class);
- @SuppressWarnings("rawtypes")
- private Map _conf;
-
- @Override
- public void prepare(@SuppressWarnings("rawtypes") Map conf) {
- _conf = conf;
- }
-
- private Map<String, Number> getUserConf() {
- Map<String, Number> ret = (Map<String, Number>)_conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
- if (ret == null) {
- ret = new HashMap<String, Number>();
- } else {
- ret = new HashMap<String, Number>(ret);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class);
+ @SuppressWarnings("rawtypes")
+ private Map _conf;
- Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false);
- Map<String, Number> tmp = (Map<String, Number>)fromFile.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
- if (tmp != null) {
- ret.putAll(tmp);
+ @Override
+ public void prepare(@SuppressWarnings("rawtypes") Map conf) {
+ _conf = conf;
}
- return ret;
- }
-
- @Override
- public void schedule(Topologies topologies, Cluster cluster) {
- LOG.debug("Rerunning scheduling...");
- Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster);
-
- Map<String, Number> userConf = getUserConf();
-
- Map<String, IsolatedPool> userPools = new HashMap<String, IsolatedPool>();
- for (Map.Entry<String, Number> entry : userConf.entrySet()) {
- userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue()));
- }
- DefaultPool defaultPool = new DefaultPool();
- FreePool freePool = new FreePool();
-
- freePool.init(cluster, nodeIdToNode);
- for (IsolatedPool pool : userPools.values()) {
- pool.init(cluster, nodeIdToNode);
- }
- defaultPool.init(cluster, nodeIdToNode);
-
- for (TopologyDetails td: topologies.getTopologies()) {
- String user = (String)td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER);
- LOG.debug("Found top {} run by user {}",td.getId(), user);
- NodePool pool = userPools.get(user);
- if (pool == null || !pool.canAdd(td)) {
- pool = defaultPool;
- }
- pool.addTopology(td);
+ private Map<String, Number> getUserConf() {
+ Map<String, Number> ret = (Map<String, Number>) _conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
+ if (ret == null) {
+ ret = new HashMap<String, Number>();
+ } else {
+ ret = new HashMap<String, Number>(ret);
+ }
+
+ Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false);
+ Map<String, Number> tmp = (Map<String, Number>) fromFile.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
+ if (tmp != null) {
+ ret.putAll(tmp);
+ }
+ return ret;
}
-
- //Now schedule all of the topologies that need to be scheduled
- for (IsolatedPool pool : userPools.values()) {
- pool.scheduleAsNeeded(freePool, defaultPool);
+
+ @Override
+ public void schedule(Topologies topologies, Cluster cluster) {
+ LOG.debug("Rerunning scheduling...");
+ Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster);
+
+ Map<String, Number> userConf = getUserConf();
+
+ Map<String, IsolatedPool> userPools = new HashMap<String, IsolatedPool>();
+ for (Map.Entry<String, Number> entry : userConf.entrySet()) {
+ userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue()));
+ }
+ DefaultPool defaultPool = new DefaultPool();
+ FreePool freePool = new FreePool();
+
+ freePool.init(cluster, nodeIdToNode);
+ for (IsolatedPool pool : userPools.values()) {
+ pool.init(cluster, nodeIdToNode);
+ }
+ defaultPool.init(cluster, nodeIdToNode);
+
+ for (TopologyDetails td : topologies.getTopologies()) {
+ String user = (String) td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER);
+ LOG.debug("Found top {} run by user {}", td.getId(), user);
+ NodePool pool = userPools.get(user);
+ if (pool == null || !pool.canAdd(td)) {
+ pool = defaultPool;
+ }
+ pool.addTopology(td);
+ }
+
+ // Now schedule all of the topologies that need to be scheduled
+ for (IsolatedPool pool : userPools.values()) {
+ pool.scheduleAsNeeded(freePool, defaultPool);
+ }
+ defaultPool.scheduleAsNeeded(freePool);
+ LOG.debug("Scheduling done...");
}
- defaultPool.scheduleAsNeeded(freePool);
- LOG.debug("Scheduling done...");
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java
index 883c65f..2cc49a8 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/Node.java
@@ -39,305 +39,299 @@ import backtype.storm.scheduler.WorkerSlot;
* Represents a single node in the cluster.
*/
public class Node {
- private static final Logger LOG = LoggerFactory.getLogger(Node.class);
- private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>();
- private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
- private final String _nodeId;
- private boolean _isAlive;
-
- public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) {
- _nodeId = nodeId;
- _isAlive = isAlive;
- if (_isAlive && allPorts != null) {
- for (int port: allPorts) {
- _freeSlots.add(new WorkerSlot(_nodeId, port));
- }
- }
- }
-
- public String getId() {
- return _nodeId;
- }
-
- public boolean isAlive() {
- return _isAlive;
- }
-
- /**
- * @return a collection of the topology ids currently running on this node
- */
- public Collection<String> getRunningTopologies() {
- return _topIdToUsedSlots.keySet();
- }
-
- public boolean isTotallyFree() {
- return _topIdToUsedSlots.isEmpty();
- }
-
- public int totalSlotsFree() {
- return _freeSlots.size();
- }
-
- public int totalSlotsUsed() {
- int total = 0;
- for (Set<WorkerSlot> slots: _topIdToUsedSlots.values()) {
- total += slots.size();
+ private static final Logger LOG = LoggerFactory.getLogger(Node.class);
+ private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+ private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
+ private final String _nodeId;
+ private boolean _isAlive;
+
+ public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) {
+ _nodeId = nodeId;
+ _isAlive = isAlive;
+ if (_isAlive && allPorts != null) {
+ for (int port : allPorts) {
+ _freeSlots.add(new WorkerSlot(_nodeId, port));
+ }
+ }
}
- return total;
- }
-
- public int totalSlots() {
- return totalSlotsFree() + totalSlotsUsed();
- }
-
- public int totalSlotsUsed(String topId) {
- int total = 0;
- Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
- if (slots != null) {
- total = slots.size();
+
+ public String getId() {
+ return _nodeId;
}
- return total;
- }
-
- private void validateSlot(WorkerSlot ws) {
- if (!_nodeId.equals(ws.getNodeId())) {
- throw new IllegalArgumentException(
- "Trying to add a slot to the wrong node " + ws +
- " is not a part of " + _nodeId);
+
+ public boolean isAlive() {
+ return _isAlive;
}
- }
-
- private void addOrphanedSlot(WorkerSlot ws) {
- if (_isAlive) {
- throw new IllegalArgumentException("Orphaned Slots " +
- "only are allowed on dead nodes.");
+
+ /**
+ * @return a collection of the topology ids currently running on this node
+ */
+ public Collection<String> getRunningTopologies() {
+ return _topIdToUsedSlots.keySet();
}
- validateSlot(ws);
- if (_freeSlots.contains(ws)) {
- return;
+
+ public boolean isTotallyFree() {
+ return _topIdToUsedSlots.isEmpty();
}
- for (Set<WorkerSlot> used: _topIdToUsedSlots.values()) {
- if (used.contains(ws)) {
- return;
- }
+
+ public int totalSlotsFree() {
+ return _freeSlots.size();
}
- _freeSlots.add(ws);
- }
-
- boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
- validateSlot(ws);
- if (!_freeSlots.remove(ws)) {
- for (Entry<String, Set<WorkerSlot>> topologySetEntry : _topIdToUsedSlots.entrySet()) {
- if (topologySetEntry.getValue().contains(ws)) {
- if (dontThrow) {
- LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId +
- ". Its already assigned to " + topologySetEntry.getKey() + ".");
- return true;
- }
- throw new IllegalStateException("Worker slot [" + ws + "] can't be assigned to "
- + topId + ". Its already assigned to " + topologySetEntry.getKey() + ".");
+
+ public int totalSlotsUsed() {
+ int total = 0;
+ for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) {
+ total += slots.size();
}
- }
- LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the supervisor heartbeats," +
- " but the worker is already running for topology " + topId + ".");
- }
- Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
- if (usedSlots == null) {
- usedSlots = new HashSet<WorkerSlot>();
- _topIdToUsedSlots.put(topId, usedSlots);
+ return total;
}
- usedSlots.add(ws);
- return false;
- }
-
- /**
- * Free all slots on this node. This will update the Cluster too.
- * @param cluster the cluster to be updated
- */
- public void freeAllSlots(Cluster cluster) {
- if (!_isAlive) {
- LOG.warn("Freeing all slots on a dead node {} ",_nodeId);
- }
- for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
- cluster.freeSlots(entry.getValue());
- if (_isAlive) {
- _freeSlots.addAll(entry.getValue());
- }
+
+ public int totalSlots() {
+ return totalSlotsFree() + totalSlotsUsed();
}
- _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>();
- }
-
- /**
- * Frees a single slot in this node
- * @param ws the slot to free
- * @param cluster the cluster to update
- */
- public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
- if (_freeSlots.contains(ws)) return;
- boolean wasFound = false;
- for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
- Set<WorkerSlot> slots = entry.getValue();
- if (slots.remove(ws)) {
- cluster.freeSlot(ws);
- if (_isAlive) {
- _freeSlots.add(ws);
+
+ public int totalSlotsUsed(String topId) {
+ int total = 0;
+ Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
+ if (slots != null) {
+ total = slots.size();
}
- wasFound = true;
- }
+ return total;
}
- if(!wasFound)
- {
- if(forceFree)
- {
- LOG.info("Forcefully freeing the " + ws);
- cluster.freeSlot(ws);
- _freeSlots.add(ws);
- } else {
- throw new IllegalArgumentException("Tried to free a slot that was not" +
- " part of this node " + _nodeId);
- }
+
+ private void validateSlot(WorkerSlot ws) {
+ if (!_nodeId.equals(ws.getNodeId())) {
+ throw new IllegalArgumentException("Trying to add a slot to the wrong node " + ws + " is not a part of " + _nodeId);
+ }
}
- }
-
- /**
- * Frees all the slots for a topology.
- * @param topId the topology to free slots for
- * @param cluster the cluster to update
- */
- public void freeTopology(String topId, Cluster cluster) {
- Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
- if (slots == null || slots.isEmpty()) return;
- for (WorkerSlot ws : slots) {
- cluster.freeSlot(ws);
- if (_isAlive) {
+
+ private void addOrphanedSlot(WorkerSlot ws) {
+ if (_isAlive) {
+ throw new IllegalArgumentException("Orphaned Slots " + "only are allowed on dead nodes.");
+ }
+ validateSlot(ws);
+ if (_freeSlots.contains(ws)) {
+ return;
+ }
+ for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) {
+ if (used.contains(ws)) {
+ return;
+ }
+ }
_freeSlots.add(ws);
- }
}
- _topIdToUsedSlots.remove(topId);
- }
-
- /**
- * Assign a free slot on the node to the following topology and executors.
- * This will update the cluster too.
- * @param topId the topology to assign a free slot to.
- * @param executors the executors to run in that slot.
- * @param cluster the cluster to be updated
- */
- public void assign(String topId, Collection<ExecutorDetails> executors,
- Cluster cluster) {
- if (!_isAlive) {
- throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
+
+ boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
+ validateSlot(ws);
+ if (!_freeSlots.remove(ws)) {
+ for (Entry<String, Set<WorkerSlot>> topologySetEntry : _topIdToUsedSlots.entrySet()) {
+ if (topologySetEntry.getValue().contains(ws)) {
+ if (dontThrow) {
+ LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId + ". Its already assigned to " + topologySetEntry.getKey() + ".");
+ return true;
+ }
+ throw new IllegalStateException("Worker slot [" + ws + "] can't be assigned to " + topId + ". Its already assigned to "
+ + topologySetEntry.getKey() + ".");
+ }
+ }
+ LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the supervisor heartbeats," + " but the worker is already running for topology "
+ + topId + ".");
+ }
+ Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
+ if (usedSlots == null) {
+ usedSlots = new HashSet<WorkerSlot>();
+ _topIdToUsedSlots.put(topId, usedSlots);
+ }
+ usedSlots.add(ws);
+ return false;
}
- if (_freeSlots.isEmpty()) {
- throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
+
+ /**
+ * Free all slots on this node. This will update the Cluster too.
+ *
+ * @param cluster the cluster to be updated
+ */
+ public void freeAllSlots(Cluster cluster) {
+ if (!_isAlive) {
+ LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
+ }
+ for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+ cluster.freeSlots(entry.getValue());
+ if (_isAlive) {
+ _freeSlots.addAll(entry.getValue());
+ }
+ }
+ _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
}
- if (executors.size() == 0) {
- LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " (Ignored)");
- } else {
- WorkerSlot slot = _freeSlots.iterator().next();
- cluster.assign(slot, topId, executors);
- assignInternal(slot, topId, false);
+
+ /**
+ * Frees a single slot in this node
+ *
+ * @param ws the slot to free
+ * @param cluster the cluster to update
+ */
+ public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
+ if (_freeSlots.contains(ws))
+ return;
+ boolean wasFound = false;
+ for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+ Set<WorkerSlot> slots = entry.getValue();
+ if (slots.remove(ws)) {
+ cluster.freeSlot(ws);
+ if (_isAlive) {
+ _freeSlots.add(ws);
+ }
+ wasFound = true;
+ }
+ }
+ if (!wasFound) {
+ if (forceFree) {
+ LOG.info("Forcefully freeing the " + ws);
+ cluster.freeSlot(ws);
+ _freeSlots.add(ws);
+ } else {
+ throw new IllegalArgumentException("Tried to free a slot that was not" + " part of this node " + _nodeId);
+ }
+ }
}
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof Node) {
- return _nodeId.equals(((Node)other)._nodeId);
+
+ /**
+ * Frees all the slots for a topology.
+ *
+ * @param topId the topology to free slots for
+ * @param cluster the cluster to update
+ */
+ public void freeTopology(String topId, Cluster cluster) {
+ Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
+ if (slots == null || slots.isEmpty())
+ return;
+ for (WorkerSlot ws : slots) {
+ cluster.freeSlot(ws);
+ if (_isAlive) {
+ _freeSlots.add(ws);
+ }
+ }
+ _topIdToUsedSlots.remove(topId);
}
- return false;
- }
-
- @Override
- public int hashCode() {
- return _nodeId.hashCode();
- }
-
- @Override
- public String toString() {
- return "Node: " + _nodeId;
- }
-
- public static int countSlotsUsed(String topId, Collection<Node> nodes) {
- int total = 0;
- for (Node n: nodes) {
- total += n.totalSlotsUsed(topId);
+
+ /**
+ * Assign a free slot on the node to the following topology and executors. This will update the cluster too.
+ *
+ * @param topId the topology to assign a free slot to.
+ * @param executors the executors to run in that slot.
+ * @param cluster the cluster to be updated
+ */
+ public void assign(String topId, Collection<ExecutorDetails> executors, Cluster cluster) {
+ if (!_isAlive) {
+ throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
+ }
+ if (_freeSlots.isEmpty()) {
+ throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
+ }
+ if (executors.size() == 0) {
+ LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " (Ignored)");
+ } else {
+ WorkerSlot slot = _freeSlots.iterator().next();
+ cluster.assign(slot, topId, executors);
+ assignInternal(slot, topId, false);
+ }
}
- return total;
- }
-
- public static int countSlotsUsed(Collection<Node> nodes) {
- int total = 0;
- for (Node n: nodes) {
- total += n.totalSlotsUsed();
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof Node) {
+ return _nodeId.equals(((Node) other)._nodeId);
+ }
+ return false;
}
- return total;
- }
-
- public static int countFreeSlotsAlive(Collection<Node> nodes) {
- int total = 0;
- for (Node n: nodes) {
- if (n.isAlive()) {
- total += n.totalSlotsFree();
- }
+
+ @Override
+ public int hashCode() {
+ return _nodeId.hashCode();
}
- return total;
- }
-
- public static int countTotalSlotsAlive(Collection<Node> nodes) {
- int total = 0;
- for (Node n: nodes) {
- if (n.isAlive()) {
- total += n.totalSlots();
- }
+
+ @Override
+ public String toString() {
+ return "Node: " + _nodeId;
}
- return total;
- }
-
- public static Map<String, Node> getAllNodesFrom(Cluster cluster) {
- Map<String, Node> nodeIdToNode = new HashMap<String, Node>();
- for (SupervisorDetails sup : cluster.getSupervisors().values()) {
- //Node ID and supervisor ID are the same.
- String id = sup.getId();
- boolean isAlive = !cluster.isBlackListed(id);
- LOG.debug("Found a {} Node {} {}",
- new Object[] {isAlive? "living":"dead", id, sup.getAllPorts()});
- nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive));
+
+ public static int countSlotsUsed(String topId, Collection<Node> nodes) {
+ int total = 0;
+ for (Node n : nodes) {
+ total += n.totalSlotsUsed(topId);
+ }
+ return total;
}
-
- for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
- String topId = entry.getValue().getTopologyId();
- for (WorkerSlot ws: entry.getValue().getSlots()) {
- String id = ws.getNodeId();
- Node node = nodeIdToNode.get(id);
- if (node == null) {
- LOG.debug("Found an assigned slot on a dead supervisor {}", ws);
- node = new Node(id, null, false);
- nodeIdToNode.put(id, node);
+
+ public static int countSlotsUsed(Collection<Node> nodes) {
+ int total = 0;
+ for (Node n : nodes) {
+ total += n.totalSlotsUsed();
}
- if (!node.isAlive()) {
- //The supervisor on the node down so add an orphaned slot to hold the unsupervised worker
- node.addOrphanedSlot(ws);
+ return total;
+ }
+
+ public static int countFreeSlotsAlive(Collection<Node> nodes) {
+ int total = 0;
+ for (Node n : nodes) {
+ if (n.isAlive()) {
+ total += n.totalSlotsFree();
+ }
}
- if (node.assignInternal(ws, topId, true)) {
- LOG.warn("Bad scheduling state for topology [" + topId+ "], the slot " +
- ws + " assigned to multiple workers, un-assigning everything...");
- node.free(ws, cluster, true);
+ return total;
+ }
+
+ public static int countTotalSlotsAlive(Collection<Node> nodes) {
+ int total = 0;
+ for (Node n : nodes) {
+ if (n.isAlive()) {
+ total += n.totalSlots();
+ }
}
- }
+ return total;
}
-
- return nodeIdToNode;
- }
-
- /**
- * Used to sort a list of nodes so the node with the most free slots comes
- * first.
- */
- public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new Comparator<Node>() {
- @Override
- public int compare(Node o1, Node o2) {
- return o2.totalSlotsFree() - o1.totalSlotsFree();
+
+ public static Map<String, Node> getAllNodesFrom(Cluster cluster) {
+ Map<String, Node> nodeIdToNode = new HashMap<String, Node>();
+ for (SupervisorDetails sup : cluster.getSupervisors().values()) {
+ // Node ID and supervisor ID are the same.
+ String id = sup.getId();
+ boolean isAlive = !cluster.isBlackListed(id);
+ LOG.debug("Found a {} Node {} {}", new Object[] { isAlive ? "living" : "dead", id, sup.getAllPorts() });
+ nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive));
+ }
+
+ for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
+ String topId = entry.getValue().getTopologyId();
+ for (WorkerSlot ws : entry.getValue().getSlots()) {
+ String id = ws.getNodeId();
+ Node node = nodeIdToNode.get(id);
+ if (node == null) {
+ LOG.debug("Found an assigned slot on a dead supervisor {}", ws);
+ node = new Node(id, null, false);
+ nodeIdToNode.put(id, node);
+ }
+ if (!node.isAlive()) {
+ // The supervisor on the node down so add an orphaned slot to hold the unsupervised worker
+ node.addOrphanedSlot(ws);
+ }
+ if (node.assignInternal(ws, topId, true)) {
+ LOG.warn("Bad scheduling state for topology [" + topId + "], the slot " + ws + " assigned to multiple workers, un-assigning everything...");
+ node.free(ws, cluster, true);
+ }
+ }
+ }
+
+ return nodeIdToNode;
}
- };
+
+ /**
+ * Used to sort a list of nodes so the node with the most free slots comes first.
+ */
+ public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new Comparator<Node>() {
+ @Override
+ public int compare(Node o1, Node o2) {
+ return o2.totalSlotsFree() - o1.totalSlotsFree();
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java
index 21d1577..9537fa8 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/NodePool.java
@@ -42,255 +42,259 @@ import backtype.storm.scheduler.WorkerSlot;
* A pool of nodes that can be used to run topologies.
*/
public abstract class NodePool {
- protected Cluster _cluster;
- protected Map<String, Node> _nodeIdToNode;
-
- public static class NodeAndSlotCounts {
- public final int _nodes;
- public final int _slots;
-
- public NodeAndSlotCounts(int nodes, int slots) {
- _nodes = nodes;
- _slots = slots;
+ protected Cluster _cluster;
+ protected Map<String, Node> _nodeIdToNode;
+
+ public static class NodeAndSlotCounts {
+ public final int _nodes;
+ public final int _slots;
+
+ public NodeAndSlotCounts(int nodes, int slots) {
+ _nodes = nodes;
+ _slots = slots;
+ }
}
- }
- /**
- * Place executors into slots in a round robin way, taking into account
- * component spreading among different hosts.
- */
- public static class RoundRobinSlotScheduler {
- private Map<String,Set<String>> _nodeToComps;
- private HashMap<String, List<ExecutorDetails>> _spreadToSchedule;
- private LinkedList<Set<ExecutorDetails>> _slots;
- private Set<ExecutorDetails> _lastSlot;
- private Cluster _cluster;
- private String _topId;
-
/**
- * Create a new scheduler for a given topology
- * @param td the topology to schedule
- * @param slotsToUse the number of slots to use for the executors left to
- * schedule.
- * @param cluster the cluster to schedule this on.
+ * Place executors into slots in a round robin way, taking into account component spreading among different hosts.
*/
- public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse,
- Cluster cluster) {
- _topId = td.getId();
- _cluster = cluster;
-
- Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent();
- SchedulerAssignment assignment = _cluster.getAssignmentById(_topId);
- _nodeToComps = new HashMap<String, Set<String>>();
+ public static class RoundRobinSlotScheduler {
+ private Map<String, Set<String>> _nodeToComps;
+ private HashMap<String, List<ExecutorDetails>> _spreadToSchedule;
+ private LinkedList<Set<ExecutorDetails>> _slots;
+ private Set<ExecutorDetails> _lastSlot;
+ private Cluster _cluster;
+ private String _topId;
- if (assignment != null) {
- Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot();
-
- for (Entry<ExecutorDetails, WorkerSlot> entry: execToSlot.entrySet()) {
- String nodeId = entry.getValue().getNodeId();
- Set<String> comps = _nodeToComps.get(nodeId);
- if (comps == null) {
- comps = new HashSet<String>();
- _nodeToComps.put(nodeId, comps);
- }
- comps.add(execToComp.get(entry.getKey()));
- }
- }
-
- _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>();
- List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
- if (spreadComps != null) {
- for (String comp: spreadComps) {
- _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>());
+ /**
+ * Create a new scheduler for a given topology
+ *
+ * @param td the topology to schedule
+ * @param slotsToUse the number of slots to use for the executors left to schedule.
+ * @param cluster the cluster to schedule this on.
+ */
+ public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, Cluster cluster) {
+ _topId = td.getId();
+ _cluster = cluster;
+
+ Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent();
+ SchedulerAssignment assignment = _cluster.getAssignmentById(_topId);
+ _nodeToComps = new HashMap<String, Set<String>>();
+
+ if (assignment != null) {
+ Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot();
+
+ for (Entry<ExecutorDetails, WorkerSlot> entry : execToSlot.entrySet()) {
+ String nodeId = entry.getValue().getNodeId();
+ Set<String> comps = _nodeToComps.get(nodeId);
+ if (comps == null) {
+ comps = new HashSet<String>();
+ _nodeToComps.put(nodeId, comps);
+ }
+ comps.add(execToComp.get(entry.getKey()));
+ }
+ }
+
+ _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>();
+ List<String> spreadComps = (List<String>) td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
+ if (spreadComps != null) {
+ for (String comp : spreadComps) {
+ _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>());
+ }
+ }
+
+ _slots = new LinkedList<Set<ExecutorDetails>>();
+ for (int i = 0; i < slotsToUse; i++) {
+ _slots.add(new HashSet<ExecutorDetails>());
+ }
+
+ int at = 0;
+ for (Entry<String, List<ExecutorDetails>> entry : _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) {
+ LOG.debug("Scheduling for {}", entry.getKey());
+ if (_spreadToSchedule.containsKey(entry.getKey())) {
+ LOG.debug("Saving {} for spread...", entry.getKey());
+ _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
+ } else {
+ for (ExecutorDetails ed : entry.getValue()) {
+ LOG.debug("Assigning {} {} to slot {}", new Object[] { entry.getKey(), ed, at });
+ _slots.get(at).add(ed);
+ at++;
+ if (at >= _slots.size()) {
+ at = 0;
+ }
+ }
+ }
+ }
+ _lastSlot = _slots.get(_slots.size() - 1);
}
- }
-
- _slots = new LinkedList<Set<ExecutorDetails>>();
- for (int i = 0; i < slotsToUse; i++) {
- _slots.add(new HashSet<ExecutorDetails>());
- }
- int at = 0;
- for (Entry<String, List<ExecutorDetails>> entry: _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) {
- LOG.debug("Scheduling for {}", entry.getKey());
- if (_spreadToSchedule.containsKey(entry.getKey())) {
- LOG.debug("Saving {} for spread...",entry.getKey());
- _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
- } else {
- for (ExecutorDetails ed: entry.getValue()) {
- LOG.debug("Assigning {} {} to slot {}", new Object[]{entry.getKey(), ed, at});
- _slots.get(at).add(ed);
- at++;
- if (at >= _slots.size()) {
- at = 0;
+ /**
+ * Assign a slot to the given node.
+ *
+ * @param n the node to assign a slot to.
+ * @return true if there are more slots to assign else false.
+ */
+ public boolean assignSlotTo(Node n) {
+ if (_slots.isEmpty()) {
+ return false;
}
- }
+ Set<ExecutorDetails> slot = _slots.pop();
+ if (slot == _lastSlot) {
+ // The last slot fill it up
+ for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) {
+ if (entry.getValue().size() > 0) {
+ slot.addAll(entry.getValue());
+ }
+ }
+ } else {
+ String nodeId = n.getId();
+ Set<String> nodeComps = _nodeToComps.get(nodeId);
+ if (nodeComps == null) {
+ nodeComps = new HashSet<String>();
+ _nodeToComps.put(nodeId, nodeComps);
+ }
+ for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) {
+ if (entry.getValue().size() > 0) {
+ String comp = entry.getKey();
+ if (!nodeComps.contains(comp)) {
+ nodeComps.add(comp);
+ slot.add(entry.getValue().remove(0));
+ }
+ }
+ }
+ }
+ n.assign(_topId, slot, _cluster);
+ return !_slots.isEmpty();
}
- }
- _lastSlot = _slots.get(_slots.size() - 1);
}
-
+
+ private static final Logger LOG = LoggerFactory.getLogger(NodePool.class);
+
+ /**
+ * Initialize the pool.
+ *
+ * @param cluster the cluster
+ * @param nodeIdToNode the mapping of node id to nodes
+ */
+ public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
+ _cluster = cluster;
+ _nodeIdToNode = nodeIdToNode;
+ }
+
/**
- * Assign a slot to the given node.
- * @param n the node to assign a slot to.
- * @return true if there are more slots to assign else false.
+ * Add a topology to the pool
+ *
+ * @param td the topology to add.
*/
- public boolean assignSlotTo(Node n) {
- if (_slots.isEmpty()) {
- return false;
- }
- Set<ExecutorDetails> slot = _slots.pop();
- if (slot == _lastSlot) {
- //The last slot fill it up
- for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) {
- if (entry.getValue().size() > 0) {
- slot.addAll(entry.getValue());
- }
+ public abstract void addTopology(TopologyDetails td);
+
+ /**
+ * Check if this topology can be added to this pool
+ *
+ * @param td the topology
+ * @return true if it can else false
+ */
+ public abstract boolean canAdd(TopologyDetails td);
+
+ /**
+ * @return the number of nodes that are available to be taken
+ */
+ public abstract int slotsAvailable();
+
+ /**
+ * Take nodes from this pool that can fulfill possibly up to the slotsNeeded
+ *
+ * @param slotsNeeded the number of slots that are needed.
+ * @return a Collection of nodes with the removed nodes in it. This may be empty, but should not be null.
+ */
+ public abstract Collection<Node> takeNodesBySlots(int slotsNeeded);
+
+ /**
+ * Get the number of nodes and slots this would provide to get the slots needed
+ *
+ * @param slots the number of slots needed
+ * @return the number of nodes and slots that would be returned.
+ */
+ public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots);
+
+ /**
+ * @return the number of nodes that are available to be taken
+ */
+ public abstract int nodesAvailable();
+
+ /**
+ * Take up to nodesNeeded from this pool
+ *
+ * @param nodesNeeded the number of nodes that are needed.
+ * @return a Collection of nodes with the removed nodes in it. This may be empty, but should not be null.
+ */
+ public abstract Collection<Node> takeNodes(int nodesNeeded);
+
+ /**
+ * Reschedule any topologies as needed.
+ *
+ * @param lesserPools pools that may be used to steal nodes from.
+ */
+ public abstract void scheduleAsNeeded(NodePool... lesserPools);
+
+ public static int slotsAvailable(NodePool[] pools) {
+ int slotsAvailable = 0;
+ for (NodePool pool : pools) {
+ slotsAvailable += pool.slotsAvailable();
}
- } else {
- String nodeId = n.getId();
- Set<String> nodeComps = _nodeToComps.get(nodeId);
- if (nodeComps == null) {
- nodeComps = new HashSet<String>();
- _nodeToComps.put(nodeId, nodeComps);
+ return slotsAvailable;
+ }
+
+ public static int nodesAvailable(NodePool[] pools) {
+ int nodesAvailable = 0;
+ for (NodePool pool : pools) {
+ nodesAvailable += pool.nodesAvailable();
}
- for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) {
- if (entry.getValue().size() > 0) {
- String comp = entry.getKey();
- if (!nodeComps.contains(comp)) {
- nodeComps.add(comp);
- slot.add(entry.getValue().remove(0));
+ return nodesAvailable;
+ }
+
+ public static Collection<Node> takeNodesBySlot(int slotsNeeded, NodePool[] pools) {
+ LOG.debug("Trying to grab {} free slots from {}", slotsNeeded, pools);
+ HashSet<Node> ret = new HashSet<Node>();
+ for (NodePool pool : pools) {
+ Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
+ ret.addAll(got);
+ slotsNeeded -= Node.countFreeSlotsAlive(got);
+ LOG.debug("Got {} nodes so far need {} more slots", ret.size(), slotsNeeded);
+ if (slotsNeeded <= 0) {
+ break;
}
- }
}
- }
- n.assign(_topId, slot, _cluster);
- return !_slots.isEmpty();
+ return ret;
}
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(NodePool.class);
- /**
- * Initialize the pool.
- * @param cluster the cluster
- * @param nodeIdToNode the mapping of node id to nodes
- */
- public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
- _cluster = cluster;
- _nodeIdToNode = nodeIdToNode;
- }
-
- /**
- * Add a topology to the pool
- * @param td the topology to add.
- */
- public abstract void addTopology(TopologyDetails td);
-
- /**
- * Check if this topology can be added to this pool
- * @param td the topology
- * @return true if it can else false
- */
- public abstract boolean canAdd(TopologyDetails td);
-
- /**
- * @return the number of nodes that are available to be taken
- */
- public abstract int slotsAvailable();
-
- /**
- * Take nodes from this pool that can fulfill possibly up to the
- * slotsNeeded
- * @param slotsNeeded the number of slots that are needed.
- * @return a Collection of nodes with the removed nodes in it.
- * This may be empty, but should not be null.
- */
- public abstract Collection<Node> takeNodesBySlots(int slotsNeeded);
- /**
- * Get the number of nodes and slots this would provide to get the slots needed
- * @param slots the number of slots needed
- * @return the number of nodes and slots that would be returned.
- */
- public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots);
-
- /**
- * @return the number of nodes that are available to be taken
- */
- public abstract int nodesAvailable();
-
- /**
- * Take up to nodesNeeded from this pool
- * @param nodesNeeded the number of nodes that are needed.
- * @return a Collection of nodes with the removed nodes in it.
- * This may be empty, but should not be null.
- */
- public abstract Collection<Node> takeNodes(int nodesNeeded);
-
- /**
- * Reschedule any topologies as needed.
- * @param lesserPools pools that may be used to steal nodes from.
- */
- public abstract void scheduleAsNeeded(NodePool ... lesserPools);
-
- public static int slotsAvailable(NodePool[] pools) {
- int slotsAvailable = 0;
- for (NodePool pool: pools) {
- slotsAvailable += pool.slotsAvailable();
- }
- return slotsAvailable;
- }
-
- public static int nodesAvailable(NodePool[] pools) {
- int nodesAvailable = 0;
- for (NodePool pool: pools) {
- nodesAvailable += pool.nodesAvailable();
- }
- return nodesAvailable;
- }
-
- public static Collection<Node> takeNodesBySlot(int slotsNeeded,NodePool[] pools) {
- LOG.debug("Trying to grab {} free slots from {}",slotsNeeded, pools);
- HashSet<Node> ret = new HashSet<Node>();
- for (NodePool pool: pools) {
- Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
- ret.addAll(got);
- slotsNeeded -= Node.countFreeSlotsAlive(got);
- LOG.debug("Got {} nodes so far need {} more slots",ret.size(),slotsNeeded);
- if (slotsNeeded <= 0) {
- break;
- }
- }
- return ret;
- }
-
- public static Collection<Node> takeNodes(int nodesNeeded,NodePool[] pools) {
- LOG.debug("Trying to grab {} free nodes from {}",nodesNeeded, pools);
- HashSet<Node> ret = new HashSet<Node>();
- for (NodePool pool: pools) {
- Collection<Node> got = pool.takeNodes(nodesNeeded);
- ret.addAll(got);
- nodesNeeded -= got.size();
- LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded);
- if (nodesNeeded <= 0) {
- break;
- }
+ public static Collection<Node> takeNodes(int nodesNeeded, NodePool[] pools) {
+ LOG.debug("Trying to grab {} free nodes from {}", nodesNeeded, pools);
+ HashSet<Node> ret = new HashSet<Node>();
+ for (NodePool pool : pools) {
+ Collection<Node> got = pool.takeNodes(nodesNeeded);
+ ret.addAll(got);
+ nodesNeeded -= got.size();
+ LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded);
+ if (nodesNeeded <= 0) {
+ break;
+ }
+ }
+ return ret;
}
- return ret;
- }
- public static int getNodeCountIfSlotsWereTaken(int slots,NodePool[] pools) {
- LOG.debug("How many nodes to get {} slots from {}",slots, pools);
- int total = 0;
- for (NodePool pool: pools) {
- NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots);
- total += ns._nodes;
- slots -= ns._slots;
- LOG.debug("Found {} nodes so far {} more slots needed", total, slots);
- if (slots <= 0) {
- break;
- }
- }
- return total;
- }
+ public static int getNodeCountIfSlotsWereTaken(int slots, NodePool[] pools) {
+ LOG.debug("How many nodes to get {} slots from {}", slots, pools);
+ int total = 0;
+ for (NodePool pool : pools) {
+ NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots);
+ total += ns._nodes;
+ slots -= ns._slots;
+ LOG.debug("Found {} nodes so far {} more slots needed", total, slots);
+ if (slots <= 0) {
+ break;
+ }
+ }
+ return total;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java
index 9670045..761eac0 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/INimbusCredentialPlugin.java
@@ -22,23 +22,23 @@ import backtype.storm.daemon.Shutdownable;
import java.util.Map;
/**
- * Nimbus auto credential plugin that will be called on nimbus host
- * during submit topology option. User can specify a list of implementation using config key
+ * Nimbus auto credential plugin that will be called on nimbus host during submit topology option. User can specify a list of implementation using config key
* nimbus.autocredential.plugins.classes.
*/
public interface INimbusCredentialPlugin extends Shutdownable {
/**
* this method will be called when nimbus initializes.
+ *
* @param conf
*/
void prepare(Map conf);
/**
- * Method that will be called on nimbus as part of submit topology. This plugin will be called
- * at least once during the submit Topology action. It will be not be called during activate instead
- * the credentials return by this method will be merged with the other credentials in the topology
- * and stored in zookeeper.
+ * Method that will be called on nimbus as part of submit topology. This plugin will be called at least once during the submit Topology action. It will be
+ * not be called during activate instead the credentials return by this method will be merged with the other credentials in the topology and stored in
+ * zookeeper.
+ *
* @param credentials credentials map where more credentials will be added.
* @param conf topology configuration
* @return
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java b/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java
index ac3fb53..60653a1 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/AuthUtils.java
@@ -45,19 +45,19 @@ public class AuthUtils {
/**
* Construct a JAAS configuration object per storm configuration file
+ *
* @param storm_conf Storm configuration
* @return JAAS configuration object
*/
public static Configuration GetConfiguration(Map storm_conf) {
Configuration login_conf = null;
- //find login file configuration from Storm configuration
- String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
- if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
+ // find login file configuration from Storm configuration
+ String loginConfigurationFile = (String) storm_conf.get("java.security.auth.login.config");
+ if ((loginConfigurationFile != null) && (loginConfigurationFile.length() > 0)) {
File config_file = new File(loginConfigurationFile);
- if (! config_file.canRead()) {
- throw new RuntimeException("File " + loginConfigurationFile +
- " cannot be read.");
+ if (!config_file.canRead()) {
+ throw new RuntimeException("File " + loginConfigurationFile + " cannot be read.");
}
try {
URI config_uri = config_file.toURI();
@@ -72,24 +72,26 @@ public class AuthUtils {
/**
* Construct a principal to local plugin
+ *
* @param conf storm configuration
* @return the plugin
*/
public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) {
IPrincipalToLocal ptol = null;
try {
- String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
- Class klass = Class.forName(ptol_klassName);
- ptol = (IPrincipalToLocal)klass.newInstance();
- ptol.prepare(storm_conf);
+ String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
+ Class klass = Class.forName(ptol_klassName);
+ ptol = (IPrincipalToLocal) klass.newInstance();
+ ptol.prepare(storm_conf);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
}
return ptol;
}
/**
* Construct a group mapping service provider plugin
+ *
* @param conf storm configuration
* @return the plugin
*/
@@ -98,26 +100,27 @@ public class AuthUtils {
try {
String gmsp_klassName = (String) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
Class klass = Class.forName(gmsp_klassName);
- gmsp = (IGroupMappingServiceProvider)klass.newInstance();
+ gmsp = (IGroupMappingServiceProvider) klass.newInstance();
gmsp.prepare(storm_conf);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
}
return gmsp;
}
/**
* Get all of the configured Credential Renwer Plugins.
+ *
* @param storm_conf the storm configuration to use.
* @return the configured credential renewers.
*/
public static Collection<ICredentialsRenewer> GetCredentialRenewers(Map conf) {
try {
Set<ICredentialsRenewer> ret = new HashSet<ICredentialsRenewer>();
- Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS);
+ Collection<String> clazzes = (Collection<String>) conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS);
if (clazzes != null) {
for (String clazz : clazzes) {
- ICredentialsRenewer inst = (ICredentialsRenewer)Class.forName(clazz).newInstance();
+ ICredentialsRenewer inst = (ICredentialsRenewer) Class.forName(clazz).newInstance();
inst.prepare(conf);
ret.add(inst);
}
@@ -130,16 +133,17 @@ public class AuthUtils {
/**
* Get all the Nimbus Auto cred plugins.
+ *
* @param conf nimbus configuration to use.
* @return nimbus auto credential plugins.
*/
public static Collection<INimbusCredentialPlugin> getNimbusAutoCredPlugins(Map conf) {
try {
Set<INimbusCredentialPlugin> ret = new HashSet<INimbusCredentialPlugin>();
- Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS);
+ Collection<String> clazzes = (Collection<String>) conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS);
if (clazzes != null) {
for (String clazz : clazzes) {
- INimbusCredentialPlugin inst = (INimbusCredentialPlugin)Class.forName(clazz).newInstance();
+ INimbusCredentialPlugin inst = (INimbusCredentialPlugin) Class.forName(clazz).newInstance();
inst.prepare(conf);
ret.add(inst);
}
@@ -152,21 +156,22 @@ public class AuthUtils {
/**
* Get all of the configured AutoCredential Plugins.
+ *
* @param storm_conf the storm configuration to use.
* @return the configured auto credentials.
*/
public static Collection<IAutoCredentials> GetAutoCredentials(Map storm_conf) {
try {
Set<IAutoCredentials> autos = new HashSet<IAutoCredentials>();
- Collection<String> clazzes = (Collection<String>)storm_conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS);
+ Collection<String> clazzes = (Collection<String>) storm_conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS);
if (clazzes != null) {
for (String clazz : clazzes) {
- IAutoCredentials a = (IAutoCredentials)Class.forName(clazz).newInstance();
+ IAutoCredentials a = (IAutoCredentials) Class.forName(clazz).newInstance();
a.prepare(storm_conf);
autos.add(a);
}
}
- LOG.info("Got AutoCreds "+autos);
+ LOG.info("Got AutoCreds " + autos);
return autos;
} catch (Exception e) {
throw new RuntimeException(e);
@@ -175,12 +180,13 @@ public class AuthUtils {
/**
* Populate a subject from credentials using the IAutoCredentials.
+ *
* @param subject the subject to populate or null if a new Subject should be created.
* @param autos the IAutoCredentials to call to populate the subject.
* @param credentials the credentials to pull from
* @return the populated subject.
*/
- public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) {
+ public static Subject populateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String, String> credentials) {
try {
if (subject == null) {
subject = new Subject();
@@ -196,11 +202,12 @@ public class AuthUtils {
/**
* Update a subject from credentials using the IAutoCredentials.
+ *
* @param subject the subject to update
* @param autos the IAutoCredentials to call to update the subject.
* @param credentials the credentials to pull from
*/
- public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String,String> credentials) {
+ public static void updateSubject(Subject subject, Collection<IAutoCredentials> autos, Map<String, String> credentials) {
if (subject == null) {
throw new RuntimeException("The subject cannot be null when updating a subject with credentials");
}
@@ -216,68 +223,68 @@ public class AuthUtils {
/**
* Construct a transport plugin per storm configuration
+ *
* @param conf storm configuration
* @return
*/
public static ITransportPlugin GetTransportPlugin(ThriftConnectionType type, Map storm_conf, Configuration login_conf) {
- ITransportPlugin transportPlugin = null;
+ ITransportPlugin transportPlugin = null;
try {
String transport_plugin_klassName = type.getTransportPlugin(storm_conf);
Class klass = Class.forName(transport_plugin_klassName);
- transportPlugin = (ITransportPlugin)klass.newInstance();
+ transportPlugin = (ITransportPlugin) klass.newInstance();
transportPlugin.prepare(type, storm_conf, login_conf);
- } catch(Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
return transportPlugin;
}
- private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf,
- String klassName) {
+ private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf, String klassName) {
IHttpCredentialsPlugin plugin = null;
try {
Class klass = Class.forName(klassName);
- plugin = (IHttpCredentialsPlugin)klass.newInstance();
+ plugin = (IHttpCredentialsPlugin) klass.newInstance();
plugin.prepare(conf);
- } catch(Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
return plugin;
}
/**
- * Construct an HttpServletRequest credential plugin specified by the UI
- * storm configuration
+ * Construct an HttpServletRequest credential plugin specified by the UI storm configuration
+ *
* @param conf storm configuration
* @return the plugin
*/
public static IHttpCredentialsPlugin GetUiHttpCredentialsPlugin(Map conf) {
- String klassName = (String)conf.get(Config.UI_HTTP_CREDS_PLUGIN);
+ String klassName = (String) conf.get(Config.UI_HTTP_CREDS_PLUGIN);
return AuthUtils.GetHttpCredentialsPlugin(conf, klassName);
}
/**
- * Construct an HttpServletRequest credential plugin specified by the DRPC
- * storm configuration
+ * Construct an HttpServletRequest credential plugin specified by the DRPC storm configuration
+ *
* @param conf storm configuration
* @return the plugin
*/
public static IHttpCredentialsPlugin GetDrpcHttpCredentialsPlugin(Map conf) {
- String klassName = (String)conf.get(Config.DRPC_HTTP_CREDS_PLUGIN);
+ String klassName = (String) conf.get(Config.DRPC_HTTP_CREDS_PLUGIN);
return AuthUtils.GetHttpCredentialsPlugin(conf, klassName);
}
public static String get(Configuration configuration, String section, String key) throws IOException {
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
+ String errorMessage = "Could not find a '" + section + "' entry in this configuration.";
throw new IOException(errorMessage);
}
- for(AppConfigurationEntry entry: configurationEntries) {
+ for (AppConfigurationEntry entry : configurationEntries) {
Object val = entry.getOptions().get(key);
if (val != null)
- return (String)val;
+ return (String) val;
}
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
index e2469e5..6386992 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
@@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory;
import backtype.storm.security.auth.ReqContext;
public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
- private static final Logger LOG =
- LoggerFactory.getLogger(DefaultHttpCredentialsPlugin.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultHttpCredentialsPlugin.class);
/**
* No-op
+ *
* @param storm_conf Storm configuration
*/
@Override
@@ -45,6 +45,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
/**
* Gets the user name from the request principal.
+ *
* @param req the servlet request
* @return the authenticated user, or null if none is authenticated
*/
@@ -54,7 +55,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
if (req != null && (princ = req.getUserPrincipal()) != null) {
String userName = princ.getName();
if (userName != null && !userName.isEmpty()) {
- LOG.debug("HTTP request had user ("+userName+")");
+ LOG.debug("HTTP request had user (" + userName + ")");
return userName;
}
}
@@ -62,29 +63,28 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
}
/**
- * Populates a given context with a new Subject derived from the
- * credentials in a servlet request.
+ * Populates a given context with a new Subject derived from the credentials in a servlet request.
+ *
* @param context the context to be populated
* @param req the servlet request
* @return the context
*/
@Override
- public ReqContext populateContext(ReqContext context,
- HttpServletRequest req) {
+ public ReqContext populateContext(ReqContext context, HttpServletRequest req) {
String userName = getUserName(req);
String doAsUser = req.getHeader("doAsUser");
- if(doAsUser == null) {
+ if (doAsUser == null) {
doAsUser = req.getParameter("doAsUser");
}
- if(doAsUser != null) {
+ if (doAsUser != null) {
context.setRealPrincipal(new SingleUserPrincipal(userName));
userName = doAsUser;
}
Set<Principal> principals = new HashSet<Principal>();
- if(userName != null) {
+ if (userName != null) {
Principal p = new SingleUserPrincipal(userName);
principals.add(p);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java
index 729d744..47e23b0 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/DefaultPrincipalToLocal.java
@@ -22,22 +22,24 @@ import java.util.Map;
import java.security.Principal;
/**
- * Storm can be configured to launch worker processed as a given user.
- * Some transports need to map the Principal to a local user name.
+ * Storm can be configured to launch worker processed as a given user. Some transports need to map the Principal to a local user name.
*/
public class DefaultPrincipalToLocal implements IPrincipalToLocal {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ *
+ * @param conf Storm configuration
*/
- public void prepare(Map storm_conf) {}
-
+ public void prepare(Map storm_conf) {
+ }
+
/**
* Convert a Principal to a local user name.
+ *
* @param principal the principal to convert
* @return The local user name.
*/
public String toLocal(Principal principal) {
- return principal == null ? null : principal.getName();
+ return principal == null ? null : principal.getName();
}
}