You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:15 UTC
[17/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming
package policy
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalancingStrategy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalancingStrategy.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalancingStrategy.java
deleted file mode 100644
index 714b70c..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalancingStrategy.java
+++ /dev/null
@@ -1,622 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.location.Location;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Represents an abstract algorithm for optimally balancing worker "items" among several "containers" based on the workloads
- * of the items, and corresponding high- and low-thresholds on the containers.
- *
- * TODO: extract interface, provide default implementation
- * TODO: remove legacy code comments
- */
-public class BalancingStrategy<NodeType extends Entity, ItemType extends Movable> {
-
- // FIXME Bad idea to use MessageFormat.format in this way; if toString of entity contains
- // special characters interpreted by MessageFormat, then it will all break!
-
- // This is a modified version of the watermark elasticity policy from Monterey v3.
-
- private static final Logger LOG = LoggerFactory.getLogger(BalancingStrategy.class);
-
- private static final int MAX_MIGRATIONS_PER_BALANCING_NODE = 20; // arbitrary (Splodge)
- private static final boolean BALANCE_COLD_PULLS_IN_SAME_RUN_AS_HOT_PUSHES = false;
-
- private final String name;
- private final BalanceablePoolModel<NodeType, ItemType> model;
- private final PolicyUtilForPool<NodeType, ItemType> helper;
-// private boolean loggedColdestTooHigh = false;
-// private boolean loggedHottestTooLow = false;
-
-
- public BalancingStrategy(String name, BalanceablePoolModel<NodeType, ItemType> model) {
- this.name = name;
- this.model = model;
- this.helper = new PolicyUtilForPool<NodeType, ItemType>(model);
- }
-
- public String getName() {
- return name;
- }
-
- public void rebalance() {
- checkAndApplyOn(model.getPoolContents());
- }
-
- public int getMaxMigrationsPerBalancingNode() {
- return MAX_MIGRATIONS_PER_BALANCING_NODE;
- }
-
- public BalanceablePoolModel<NodeType, ItemType> getDataProvider() {
- return model;
- }
-
- // This was the entry point for the legacy policy.
- private void checkAndApplyOn(final Collection<NodeType> dirtyNodesSupplied) {
- Collection<NodeType> dirtyNodes = dirtyNodesSupplied;
-
-// if (startTime + FORCE_ALL_NODES_IF_DELAYED_FOR_MILLIS < System.currentTimeMillis()) {
-// Set<NodeType> allNodes = new LinkedHashSet<NodeType>();
-// allNodes.addAll(dirtyNodes);
-// dirtyNodes = allNodes;
-// allNodes.addAll(getDataProvider().getPoolContents());
-// if (LOG.isDebugEnabled())
-// LOG.debug("policy "+getDataProvider().getAbbr()+" running after delay ("+
-// TimeUtils.makeTimeString(System.currentTimeMillis() - startTime)+", analysing all nodes: "+
-// dirtyNodes);
-// }
-
-// nodeFinder.optionalCachedNodesWithBacklogDetected.clear();
-// boolean gonnaGrow = growPool(dirtyNodes);
-// getDataProvider().waitForAllTransitionsComplete();
- boolean gonnaGrow = false;
-
- Set<NodeType> nonFrozenDirtyNodes = new LinkedHashSet<NodeType>(dirtyNodes);
-// boolean gonnaShrink = false;
-// if (!gonnaGrow && !DONT_SHRINK_UNLESS_BALANCED) {
-// gonnaShrink = shrinkPool(nonFrozenDirtyNodes);
-// getDataProvider().waitForAllTransitionsComplete();
-// }
-
- if (getDataProvider().getPoolSize() >= 2) {
- boolean didBalancing = false;
- for (NodeType a : nonFrozenDirtyNodes) {
- didBalancing |= balanceItemsOnNodesInQuestion(a, gonnaGrow);
-// getMutator().waitForAllTransitionsComplete();
- }
- if (didBalancing) {
- return;
- }
- }
-
-// if (!gonnaGrow && DONT_SHRINK_UNLESS_BALANCED) {
-// gonnaShrink = shrinkPool(nonFrozenDirtyNodes);
-// getDataProvider().waitForAllTransitionsComplete();
-// }
-
-// if (gonnaGrow || gonnaShrink)
-// //don't log 'doing nothing' message
-// return;
-
-// if (LOG.isDebugEnabled()) {
-// double poolTotal = getDataProvider().getPoolPredictedWorkrateTotal();
-// int poolSize = getDataProvider().getPoolPredictedSize();
-// LOG.debug(MessageFormat.format("policy "+getDataProvider().getAbbr()+" did nothing; pool workrate is {0,number,#.##} x {1} nodes",
-// 1.0*poolTotal/poolSize, poolSize));
-// }
- }
-
- protected boolean balanceItemsOnNodesInQuestion(NodeType questionedNode, boolean gonnaGrow) {
- double questionedNodeTotalWorkrate = getDataProvider().getTotalWorkrate(questionedNode);
-
- boolean balanced = balanceItemsOnHotNode(questionedNode, questionedNodeTotalWorkrate, gonnaGrow);
-// getMutator().waitForAllTransitionsComplete();
-
- if (!balanced || BALANCE_COLD_PULLS_IN_SAME_RUN_AS_HOT_PUSHES) {
- balanced |= balanceItemsOnColdNode(questionedNode, questionedNodeTotalWorkrate, gonnaGrow);
-// getMutator().waitForAllTransitionsComplete();
- }
- if (balanced)
- return true;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" not balancing "+questionedNode+"; " +
- "its workrate {0,number,#.##} is acceptable (or cannot be balanced)", questionedNodeTotalWorkrate) );
- }
- return false;
- }
-
- protected boolean balanceItemsOnHotNode(NodeType node, double nodeWorkrate, boolean gonnaGrow) {
- double originalNodeWorkrate = nodeWorkrate;
- int migrationCount = 0;
- int iterationCount = 0;
-
- Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>();
- Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>();
-
-// if (nodeFinder.config.COUNT_BACKLOG_AS_EXTRA_WORKRATE) {
-// int backlog = nodeFinder.getBacklogQueueLength(questionedNode);
-// if (backlog>0) {
-// Level l = backlog>1000 ? Level.INFO : backlog>10 ? Level.FINE : Level.FINER;
-// if (LOG.isLoggable(l)) {
-// LOG.log( l, MessageFormat.format(
-// "policy "+getDataProvider().getAbbr()+" detected queue at node "+questionedNode+", " +
-// "inflating workrate {0,number,#.##} + "+backlog, questionedNodeTotalWorkrate) );
-// }
-// questionedNodeTotalWorkrate += backlog;
-// }
-// }
-
- Double highThreshold = model.getHighThreshold(node);
- if (highThreshold == -1) {
- // node presumably has been removed; TODO log
- return false;
- }
-
- while (nodeWorkrate > highThreshold && migrationCount < getMaxMigrationsPerBalancingNode()) {
- iterationCount++;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(MessageFormat.format(
- "policy "+getDataProvider().getName()+" considering balancing hot node "+node+" " +
- "(workrate {0,number,#.##}); iteration "+iterationCount, nodeWorkrate) );
- }
-
- // move from hot node, to coldest
-
- NodeType coldNode = helper.findColdestContainer(nodesChecked);
-
- if (coldNode == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" not balancing hot node "+node+" " +
- "(workrate {0,number,#.##}); no coldest node available", nodeWorkrate) );
- }
- break;
- }
-
- if (coldNode.equals(node)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" not balancing hot node "+node+" " +
- "(workrate {0,number,#.##}); it is also the coldest modifiable node", nodeWorkrate) );
- }
- break;
- }
-
- double coldNodeWorkrate = getDataProvider().getTotalWorkrate(coldNode);
- boolean emergencyLoadBalancing = coldNodeWorkrate < nodeWorkrate*2/3;
- double coldNodeHighThreshold = model.getHighThreshold(coldNode);
- if (coldNodeWorkrate >= coldNodeHighThreshold && !emergencyLoadBalancing) {
- //don't balance if all nodes are approx equally hot (and very hot)
-
- //for now, stop warning if it is a recurring theme!
-// Level level = loggedColdestTooHigh ? Level.FINER : Level.INFO;
-// LOG.log(level, MessageFormat.format(
-// "policy "+getDataProvider().getAbbr()+" not balancing hot node "+questionedNode+" " +
-// "(workrate {0,number,#.##}); coldest node "+coldNode+" has workrate {1,number,#.##} also too high"+
-// (loggedColdestTooHigh ? "" : " (future cases will be logged at finer)"),
-// questionedNodeTotalWorkrate, coldNodeWorkrate) );
-// loggedColdestTooHigh = true;
- break;
- }
- double poolLowWatermark = Double.MAX_VALUE; // TODO
- if (gonnaGrow && (coldNodeWorkrate >= poolLowWatermark && !emergencyLoadBalancing)) {
- //if we're growing the pool, refuse to balance unless the cold node is indeed very cold, or hot node very hot
-
- //for now, stop warning if it is a recurring theme!
-// Level level = loggedColdestTooHigh ? Level.FINER : Level.INFO;
-// LOG.log(level, MessageFormat.format(
-// "policy "+getDataProvider().getAbbr()+" not balancing hot node "+questionedNode+" " +
-// "(workrate {0,number,#.##}); coldest node "+coldNode+" has workrate {1,number,#.##} also too high to accept while pool is growing" +
-// (loggedColdestTooHigh ? "" : " (future cases will be logged at finer)"),
-// questionedNodeTotalWorkrate, coldNodeWorkrate) );
-// loggedColdestTooHigh = true;
- break;
- }
-
- String questionedNodeName = getDataProvider().getName(node);
- String coldNodeName = getDataProvider().getName(coldNode);
- Location coldNodeLocation = getDataProvider().getLocation(coldNode);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
- "("+node+", workrate {0,number,#.##}), " +
- "considering target "+coldNodeName+" ("+coldNode+", workrate {1,number,#.##})",
- nodeWorkrate, coldNodeWorkrate) );
- }
-
- double idealSizeToMove = (nodeWorkrate - coldNodeWorkrate) / 2;
- //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount
-
- if (idealSizeToMove + coldNodeWorkrate > coldNodeHighThreshold)
- idealSizeToMove = coldNodeHighThreshold - coldNodeWorkrate;
-
-
- double maxSizeToMoveIdeally = Math.min(
- nodeWorkrate/2 + 0.00001,
- //permit it to exceed node high if there is no alternative (this is 'max' not 'ideal'),
- //so long as it still gives a significant benefit
- // getConfiguration().nodeHighWaterMark - coldNodeWorkrate,
- (nodeWorkrate - coldNodeWorkrate)*0.9);
- double maxSizeToMoveIfNoSmallButLarger = nodeWorkrate*3/4;
-
- Map<ItemType, Double> questionedNodeItems = getDataProvider().getItemWorkrates(node);
- if (questionedNodeItems == null) {
- if (LOG.isDebugEnabled())
- LOG.debug(MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
- "("+node+", workrate {0,number,#.##}), abandoned; " +
- "item report for " + questionedNodeName + " unavailable",
- nodeWorkrate));
- break;
- }
- ItemType itemToMove = findBestItemToMove(questionedNodeItems, idealSizeToMove, maxSizeToMoveIdeally,
- maxSizeToMoveIfNoSmallButLarger, itemsMoved, coldNodeLocation);
-
- if (itemToMove == null) {
- if (LOG.isDebugEnabled())
- LOG.debug(MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
- "("+node+", workrate {0,number,#.##}), ending; " +
- "no suitable segment found " +
- "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " +
- "moving to coldest node "+coldNodeName+" ("+coldNode+", workrate {3,number,#.##}); available items: {4}",
- nodeWorkrate, idealSizeToMove, maxSizeToMoveIdeally, coldNodeWorkrate, questionedNodeItems) );
- break;
- }
-
- itemsMoved.add(itemToMove);
- double itemWorkrate = questionedNodeItems.get(itemToMove);
-
-// if (LOG.isLoggable(Level.FINE))
-// LOG.fine( MessageFormat.format(
-// "policy "+getDataProvider().getAbbr()+" balancing hot node "+questionedNodeName+" " +
-// "(workrate {0,number,#.##}, too high), transitioning " + itemToMove +
-// " to "+coldNodeName+" (workrate {1,number,#.##}, now += {2,number,#.##})",
-// questionedNodeTotalWorkrate, coldNodeWorkrate, segmentRate) );
-
- nodeWorkrate -= itemWorkrate;
- coldNodeWorkrate += itemWorkrate;
-
- moveItem(itemToMove, node, coldNode);
- ++migrationCount;
- }
-
- if (LOG.isDebugEnabled()) {
- if (iterationCount == 0) {
- if (LOG.isTraceEnabled())
- LOG.trace( MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing if hot finished at node "+node+"; " +
- "workrate {0,number,#.##} not hot",
- originalNodeWorkrate) );
- }
- else if (itemsMoved.isEmpty()) {
- if (LOG.isTraceEnabled())
- LOG.trace( MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing finished at hot node "+node+" " +
- "(workrate {0,number,#.##}); no way to improve it",
- originalNodeWorkrate) );
- } else {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing finished at hot node "+node+"; " +
- "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " +
- "by moving off {3}",
- originalNodeWorkrate,
- nodeWorkrate,
- getDataProvider().getTotalWorkrate(node),
- itemsMoved
- ) );
- }
- }
- return !itemsMoved.isEmpty();
- }
-
- protected boolean balanceItemsOnColdNode(NodeType questionedNode, double questionedNodeTotalWorkrate, boolean gonnaGrow) {
- // Abort if the node has pending adjustments.
- Map<ItemType, Double> items = getDataProvider().getItemWorkrates(questionedNode);
- if (items == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
- "(workrate {0,number,#.##}); workrate breakdown unavailable (probably reverting)",
- questionedNodeTotalWorkrate) );
- }
- return false;
- }
- for (ItemType item : items.keySet()) {
- if (!model.isItemMoveable(item)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
- "(workrate {0,number,#.##}); at least one item ("+item+") is in flux",
- questionedNodeTotalWorkrate) );
- }
- return false;
- }
- }
-
- double originalQuestionedNodeTotalWorkrate = questionedNodeTotalWorkrate;
- int numMigrations = 0;
-
- Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>();
- Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>();
-
- int iters = 0;
- Location questionedLocation = getDataProvider().getLocation(questionedNode);
-
- double lowThreshold = model.getLowThreshold(questionedNode);
- while (questionedNodeTotalWorkrate < lowThreshold) {
- iters++;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" considering balancing cold node "+questionedNode+" " +
- "(workrate {0,number,#.##}); iteration "+iters, questionedNodeTotalWorkrate));
- }
-
- // move from cold node, to hottest
-
- NodeType hotNode = helper.findHottestContainer(nodesChecked);
-
- if (hotNode == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
- "(workrate {0,number,#.##}); no hottest node available", questionedNodeTotalWorkrate) );
- }
-
- break;
- }
- if (hotNode.equals(questionedNode)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
- "(workrate {0,number,#.##}); it is also the hottest modfiable node", questionedNodeTotalWorkrate) );
- }
- break;
- }
-
-
- double hotNodeWorkrate = getDataProvider().getTotalWorkrate(hotNode);
- double hotNodeLowThreshold = model.getLowThreshold(hotNode);
- double hotNodeHighThreshold = model.getHighThreshold(hotNode);
- boolean emergencyLoadBalancing = false; //doesn't apply to cold
- if (hotNodeWorkrate == -1 || hotNodeLowThreshold == -1 || hotNodeHighThreshold == -1) {
- // hotNode presumably has been removed; TODO log
- break;
- }
- if (hotNodeWorkrate <= hotNodeLowThreshold && !emergencyLoadBalancing) {
- //don't balance if all nodes are too low
-
- //for now, stop warning if it is a recurring theme!
-// Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
-// LOG.log(level, MessageFormat.format(
-// "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
-// "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low" +
-// (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
-// questionedNodeTotalWorkrate, hotNodeWorkrate) );
-// loggedHottestTooLow = true;
- break;
- }
- if (gonnaGrow && (hotNodeWorkrate <= hotNodeHighThreshold && !emergencyLoadBalancing)) {
- //if we're growing the pool, refuse to balance unless the hot node is quite hot
-
- //again, stop warning if it is a recurring theme!
-// Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
-// LOG.log(level, MessageFormat.format(
-// "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
-// "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low to accept while pool is growing"+
-// (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
-// questionedNodeTotalWorkrate, hotNodeWorkrate) );
-// loggedHottestTooLow = true;
- break;
- }
-
- String questionedNodeName = getDataProvider().getName(questionedNode);
- String hotNodeName = getDataProvider().getName(hotNode);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
- "("+questionedNode+", workrate {0,number,#.##}), " +
- "considering source "+hotNodeName+" ("+hotNode+", workrate {1,number,#.##})",
- questionedNodeTotalWorkrate, hotNodeWorkrate) );
- }
-
- double idealSizeToMove = (hotNodeWorkrate - questionedNodeTotalWorkrate) / 2;
- //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount
- double targetNodeHighThreshold = model.getHighThreshold(questionedNode);
- if (idealSizeToMove + questionedNodeTotalWorkrate > targetNodeHighThreshold)
- idealSizeToMove = targetNodeHighThreshold - questionedNodeTotalWorkrate;
- double maxSizeToMoveIdeally = Math.min(
- hotNodeWorkrate/2,
- //allow to swap order, but not very much (0.9 was allowed when balancing high)
- (hotNodeWorkrate - questionedNodeTotalWorkrate)*0.6);
- double maxSizeToMoveIfNoSmallButLarger = questionedNodeTotalWorkrate*3/4;
-
- Map<ItemType, Double> hotNodeItems = getDataProvider().getItemWorkrates(hotNode);
- if (hotNodeItems == null) {
- if (LOG.isDebugEnabled())
- LOG.debug(MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
- "("+questionedNode+", workrate {0,number,#.##}), " +
- "excluding hot node "+hotNodeName+" because its item report unavailable",
- questionedNodeTotalWorkrate));
- nodesChecked.add(hotNode);
- continue;
- }
-
- ItemType itemToMove = findBestItemToMove(hotNodeItems, idealSizeToMove, maxSizeToMoveIdeally,
- maxSizeToMoveIfNoSmallButLarger, itemsMoved, questionedLocation);
- if (itemToMove == null) {
- if (LOG.isDebugEnabled())
- LOG.debug(MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
- "("+questionedNode+", workrate {0,number,#.##}), " +
- "excluding hot node "+hotNodeName+" because it has no appilcable items " +
- "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " +
- "moving from hot node "+hotNodeName+" ("+hotNode+", workrate {3,number,#.##}); available items: {4}",
- questionedNodeTotalWorkrate, idealSizeToMove, maxSizeToMoveIdeally, hotNodeWorkrate, hotNodeItems) );
-
- nodesChecked.add(hotNode);
- continue;
- }
-
- itemsMoved.add(itemToMove);
- double segmentRate = hotNodeItems.get(itemToMove);
-
-// if (LOG.isLoggable(Level.FINE))
-// LOG.fine( MessageFormat.format(
-// "policy "+getDataProvider().getAbbr()+" balancing cold node "+questionedNodeName+" " +
-// "(workrate {0,number,#.##}, too low), transitioning " + itemToMove +
-// " from "+hotNodeName+" (workrate {1,number,#.##}, now -= {2,number,#.##})",
-// questionedNodeTotalWorkrate, hotNodeWorkrate, segmentRate) );
-
- questionedNodeTotalWorkrate += segmentRate;
- hotNodeWorkrate -= segmentRate;
-
- moveItem(itemToMove, hotNode, questionedNode);
-
- if (++numMigrations >= getMaxMigrationsPerBalancingNode()) {
- break;
- }
- }
-
- if (LOG.isDebugEnabled()) {
- if (iters == 0) {
- if (LOG.isTraceEnabled())
- LOG.trace( MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing if cold finished at node "+questionedNode+"; " +
- "workrate {0,number,#.##} not cold",
- originalQuestionedNodeTotalWorkrate) );
- }
- else if (itemsMoved.isEmpty()) {
- if (LOG.isTraceEnabled())
- LOG.trace( MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+" " +
- "(workrate {0,number,#.##}); no way to improve it",
- originalQuestionedNodeTotalWorkrate) );
- } else {
- LOG.debug( MessageFormat.format(
- "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+"; " +
- "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " +
- "by moving in {3}",
- originalQuestionedNodeTotalWorkrate,
- questionedNodeTotalWorkrate,
- getDataProvider().getTotalWorkrate(questionedNode),
- itemsMoved) );
- }
- }
- return !itemsMoved.isEmpty();
- }
-
- protected void moveItem(ItemType item, NodeType oldNode, NodeType newNode) {
- item.move(newNode);
- model.onItemMoved(item, newNode);
- }
-
- /**
- * "Best" is defined as nearest to the targetCost, without exceeding maxCost, unless maxCostIfNothingSmallerButLarger > 0
- * which does just that (useful if the ideal and target are estimates and aren't quite right, typically it will take
- * something larger than maxRate but less than half the total rate, which is only possible when the estimates don't agree)
- */
- protected ItemType findBestItemToMove(Map<ItemType, Double> costsPerItem, double targetCost, double maxCost,
- double maxCostIfNothingSmallerButLarger, Set<ItemType> excludedItems, Location locationIfKnown) {
-
- ItemType closestMatch = null;
- ItemType smallestMoveable = null, largest = null;
- double minDiff = Double.MAX_VALUE, smallestC = Double.MAX_VALUE, largestC = Double.MIN_VALUE;
- boolean exclusions = false;
-
- for (Entry<ItemType, Double> entry : costsPerItem.entrySet()) {
- ItemType item = entry.getKey();
- Double cost = entry.getValue();
-
- if (cost == null) {
- if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' has null workrate: skipping", item));
- continue;
- }
-
- if (!model.isItemMoveable(item)) {
- if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' cannot be moved: skipping", item));
- continue;
- }
- if (cost < 0) {
- if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' subject to recent adjustment: skipping", item));
- continue;
- }
- if (excludedItems.contains(item)) {
- exclusions = true;
- continue;
- }
- if (cost < 0) { // FIXME: already tested above
- exclusions = true;
- continue;
- }
- if (cost <= 0) { // FIXME: overlaps previous condition
- continue;
- }
- if (largest == null || cost > largestC) {
- largest = item;
- largestC = cost;
- }
- if (!model.isItemMoveable(item)) { // FIXME: already tested above
- continue;
- }
- if (locationIfKnown != null && !model.isItemAllowedIn(item, locationIfKnown)) {
- continue;
- }
- if (smallestMoveable == null || cost < smallestC) {
- smallestMoveable = item;
- smallestC = cost;
- }
- if (cost > maxCost) {
- continue;
- }
- double diff = Math.abs(targetCost - cost);
- if (closestMatch == null || diff < minDiff) {
- closestMatch = item;
- minDiff = diff;
- }
- }
-
- if (closestMatch != null)
- return closestMatch;
-
- if (smallestC < maxCostIfNothingSmallerButLarger && smallestC < largestC && !exclusions)
- return smallestMoveable;
-
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java b/policy/src/main/java/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
deleted file mode 100644
index 7e273ba..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.location.Location;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.SetMultimap;
-
-/**
- * Standard implementation of {@link BalanceablePoolModel}, providing essential arithmetic for item and container
- * workrates and thresholds. See subclasses for specific requirements for migrating items.
- */
-public class DefaultBalanceablePoolModel<ContainerType, ItemType> implements BalanceablePoolModel<ContainerType, ItemType> {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultBalanceablePoolModel.class);
-
- /*
- * Performance comments.
- * - Used hprof with LoadBalancingPolicySoakTest.testLoadBalancingManyManyItemsTest (1000 items)
- * - Prior to adding containerToItems, it created a new set by iterating over all items.
- * This was the biggest percentage of any brooklyn code.
- * Hence it's worth duplicating the values, keyed by item and keyed by container.
- * - Unfortunately changing threading model (so have a "rebalancer" thread, and a thread that
- * processes events to update the model), get ConcurrentModificationException if don't take
- * copy of containerToItems.get(node)...
- */
-
- // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item
- private static final String NULL_CONTAINER = "null-container";
-
- private final String name;
- private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>());
- private final Map<ContainerType, Double> containerToLowThreshold = new ConcurrentHashMap<ContainerType, Double>();
- private final Map<ContainerType, Double> containerToHighThreshold = new ConcurrentHashMap<ContainerType, Double>();
- private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>();
- private final SetMultimap<ContainerType, ItemType> containerToItems = Multimaps.synchronizedSetMultimap(HashMultimap.<ContainerType, ItemType>create());
- private final Map<ItemType, Double> itemToWorkrate = new ConcurrentHashMap<ItemType, Double>();
- private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>());
-
- private volatile double poolLowThreshold = 0;
- private volatile double poolHighThreshold = 0;
- private volatile double currentPoolWorkrate = 0;
-
- public DefaultBalanceablePoolModel(String name) {
- this.name = name;
- }
-
- public ContainerType getParentContainer(ItemType item) {
- ContainerType result = itemToContainer.get(item);
- return (result != NULL_CONTAINER) ? result : null;
- }
-
- public Set<ItemType> getItemsForContainer(ContainerType node) {
- Set<ItemType> result = containerToItems.get(node);
- synchronized (containerToItems) {
- return (result != null) ? ImmutableSet.copyOf(result) : Collections.<ItemType>emptySet();
- }
- }
-
- public Double getItemWorkrate(ItemType item) {
- return itemToWorkrate.get(item);
- }
-
- @Override public double getPoolLowThreshold() { return poolLowThreshold; }
- @Override public double getPoolHighThreshold() { return poolHighThreshold; }
- @Override public double getCurrentPoolWorkrate() { return currentPoolWorkrate; }
- @Override public boolean isHot() { return !containers.isEmpty() && currentPoolWorkrate > poolHighThreshold; }
- @Override public boolean isCold() { return !containers.isEmpty() && currentPoolWorkrate < poolLowThreshold; }
-
-
- // Provider methods.
-
- @Override public String getName() { return name; }
- @Override public int getPoolSize() { return containers.size(); }
- @Override public Set<ContainerType> getPoolContents() { return containers; }
- @Override public String getName(ContainerType container) { return container.toString(); } // TODO: delete?
- @Override public Location getLocation(ContainerType container) { return null; } // TODO?
-
- @Override public double getLowThreshold(ContainerType container) {
- Double result = containerToLowThreshold.get(container);
- return (result != null) ? result : -1;
- }
-
- @Override public double getHighThreshold(ContainerType container) {
- Double result = containerToHighThreshold.get(container);
- return (result != null) ? result : -1;
- }
-
- @Override public double getTotalWorkrate(ContainerType container) {
- double totalWorkrate = 0;
- for (ItemType item : getItemsForContainer(container)) {
- Double workrate = itemToWorkrate.get(item);
- if (workrate != null)
- totalWorkrate += Math.abs(workrate);
- }
- return totalWorkrate;
- }
-
- @Override public Map<ContainerType, Double> getContainerWorkrates() {
- Map<ContainerType, Double> result = new LinkedHashMap<ContainerType, Double>();
- for (ContainerType node : containers)
- result.put(node, getTotalWorkrate(node));
- return result;
- }
-
- @Override public Map<ItemType, Double> getItemWorkrates(ContainerType node) {
- Map<ItemType, Double> result = new LinkedHashMap<ItemType, Double>();
- for (ItemType item : getItemsForContainer(node))
- result.put(item, itemToWorkrate.get(item));
- return result;
- }
-
- @Override public boolean isItemMoveable(ItemType item) {
- // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable?
- return itemToContainer.containsKey(item) && !immovableItems.contains(item);
- }
-
- @Override public boolean isItemAllowedIn(ItemType item, Location location) {
- return true; // TODO?
- }
-
-
- // Mutators.
-
- @Override
- public void onItemMoved(ItemType item, ContainerType newNode) {
- if (!itemToContainer.containsKey(item)) {
- // Item may have been deleted; order of events received from different sources
- // (i.e. item itself and for itemGroup membership) is non-deterministic.
- LOG.info("Balanceable pool model ignoring onItemMoved for unknown item {} to container {}; " +
- "if onItemAdded subsequently received will get new container then", item, newNode);
- return;
- }
- ContainerType newNodeNonNull = toNonNullContainer(newNode);
- ContainerType oldNode = itemToContainer.put(item, newNodeNonNull);
- if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
- if (newNode != null) containerToItems.put(newNode, item);
- }
-
- @Override
- public void onContainerAdded(ContainerType newContainer, double lowThreshold, double highThreshold) {
- boolean added = containers.add(newContainer);
- if (!added) {
- // See LoadBalancingPolicy.onContainerAdded for possible explanation of why can get duplicate calls
- LOG.debug("Duplicate container-added event for {}; ignoring", newContainer);
- return;
- }
- containerToLowThreshold.put(newContainer, lowThreshold);
- containerToHighThreshold.put(newContainer, highThreshold);
- poolLowThreshold += lowThreshold;
- poolHighThreshold += highThreshold;
- }
-
- @Override
- public void onContainerRemoved(ContainerType oldContainer) {
- containers.remove(oldContainer);
- Double containerLowThreshold = containerToLowThreshold.remove(oldContainer);
- Double containerHighThresold = containerToHighThreshold.remove(oldContainer);
- poolLowThreshold -= (containerLowThreshold != null ? containerLowThreshold : 0);
- poolHighThreshold -= (containerHighThresold != null ? containerHighThresold : 0);
-
- // TODO: assert no orphaned items
- }
-
- @Override
- public void onItemAdded(ItemType item, ContainerType parentContainer) {
- onItemAdded(item, parentContainer, false);
- }
-
- @Override
- public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable) {
- // Duplicate calls to onItemAdded do no harm, as long as most recent is most accurate!
- // Important that it stays that way for now - See LoadBalancingPolicy.onContainerAdded for explanation.
-
- if (immovable)
- immovableItems.add(item);
-
- ContainerType parentContainerNonNull = toNonNullContainer(parentContainer);
- ContainerType oldNode = itemToContainer.put(item, parentContainerNonNull);
- if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
- if (parentContainer != null) containerToItems.put(parentContainer, item);
- }
-
- @Override
- public void onItemRemoved(ItemType item) {
- ContainerType oldNode = itemToContainer.remove(item);
- if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
- Double workrate = itemToWorkrate.remove(item);
- if (workrate != null)
- currentPoolWorkrate -= workrate;
- immovableItems.remove(item);
- }
-
- @Override
- public void onItemWorkrateUpdated(ItemType item, double newValue) {
- if (hasItem(item)) {
- Double oldValue = itemToWorkrate.put(item, newValue);
- double delta = ( newValue - (oldValue != null ? oldValue : 0) );
- currentPoolWorkrate += delta;
- } else {
- // Can happen when item removed - get notification of removal and workrate from group and item
- // respectively, so can overtake each other
- if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of workrate for unknown item {}, to {}", item, newValue);
- }
- }
-
- private boolean hasItem(ItemType item) {
- return itemToContainer.containsKey(item);
- }
-
-
- // Additional methods for tests.
-
- /**
- * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers.
- */
- @VisibleForTesting
- public String itemDistributionToString() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- dumpItemDistribution(new PrintStream(baos));
- return new String(baos.toByteArray());
- }
-
- @VisibleForTesting
- public void dumpItemDistribution() {
- dumpItemDistribution(System.out);
- }
-
- @VisibleForTesting
- public void dumpItemDistribution(PrintStream out) {
- for (ContainerType container : getPoolContents()) {
- out.println("Container '"+container+"': ");
- for (ItemType item : getItemsForContainer(container)) {
- Double workrate = getItemWorkrate(item);
- out.println("\t"+"Item '"+item+"' ("+workrate+")");
- }
- }
- out.flush();
- }
-
- @SuppressWarnings("unchecked")
- private ContainerType nullContainer() {
- return (ContainerType) NULL_CONTAINER; // relies on erasure
- }
-
- private ContainerType toNonNullContainer(ContainerType container) {
- return (container != null) ? container : nullContainer();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java b/policy/src/main/java/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
deleted file mode 100644
index efd41d8..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.DynamicGroup;
-import brooklyn.event.basic.BasicConfigKey;
-
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-
-/**
- * A group of items that are contained within a given (dynamically changing) set of containers.
- *
- * The {@link setContainers(Group)} sets the group of containers. The membership of that group
- * is dynamically tracked.
- *
- * When containers are added/removed, or when an items is added/removed, or when an {@link Moveable} item
- * is moved then the membership of this group of items is automatically updated accordingly.
- *
- * For example: in Monterey, this could be used to track the actors that are within a given cluster of venues.
- */
-@ImplementedBy(ItemsInContainersGroupImpl.class)
-public interface ItemsInContainersGroup extends DynamicGroup {
-
- @SetFromFlag("itemFilter")
- public static final ConfigKey<Predicate<? super Entity>> ITEM_FILTER = new BasicConfigKey(
- Predicate.class, "itemsInContainerGroup.itemFilter", "Filter for which items within the containers will automatically be in group", Predicates.alwaysTrue());
-
- public void setContainers(Group containerGroup);
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java b/policy/src/main/java/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
deleted file mode 100644
index daf5dfe..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.AbstractGroup;
-import brooklyn.entity.basic.DynamicGroupImpl;
-
-import com.google.common.base.Predicate;
-
-/**
- * A group of items that are contained within a given (dynamically changing) set of containers.
- *
- * The {@link setContainers(Group)} sets the group of containers. The membership of that group
- * is dynamically tracked.
- *
- * When containers are added/removed, or when an items is added/removed, or when an {@link Moveable} item
- * is moved then the membership of this group of items is automatically updated accordingly.
- *
- * For example: in Monterey, this could be used to track the actors that are within a given cluster of venues.
- */
-public class ItemsInContainersGroupImpl extends DynamicGroupImpl implements ItemsInContainersGroup {
-
- // TODO Inefficient: will not scale to many 1000s of items
-
- private static final Logger LOG = LoggerFactory.getLogger(ItemsInContainersGroup.class);
-
- private Group containerGroup;
-
- private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
- @Override
- public void onEvent(SensorEvent<Object> event) {
- Entity source = event.getSource();
- Object value = event.getValue();
- Sensor sensor = event.getSensor();
-
- if (sensor.equals(AbstractGroup.MEMBER_ADDED)) {
- onContainerAdded((Entity) value);
- } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) {
- onContainerRemoved((Entity) value);
- } else if (sensor.equals(Movable.CONTAINER)) {
- onItemMoved((Movable)source, (BalanceableContainer<?>) value);
- } else {
- throw new IllegalStateException("Unhandled event type "+sensor+": "+event);
- }
- }
- };
-
- public ItemsInContainersGroupImpl() {
- }
-
- @Override
- public void init() {
- super.init();
- setEntityFilter(new Predicate<Entity>() {
- @Override public boolean apply(Entity e) {
- return acceptsEntity(e);
- }});
- }
-
- protected Predicate<? super Entity> getItemFilter() {
- return getConfig(ITEM_FILTER);
- }
-
- @Override
- protected boolean acceptsEntity(Entity e) {
- if (e instanceof Movable) {
- return acceptsItem((Movable)e, ((Movable)e).getAttribute(Movable.CONTAINER));
- }
- return false;
- }
-
- boolean acceptsItem(Movable e, BalanceableContainer c) {
- return (containerGroup != null && c != null) ? getItemFilter().apply(e) && containerGroup.hasMember(c) : false;
- }
-
- @Override
- public void setContainers(Group containerGroup) {
- this.containerGroup = containerGroup;
- subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
- subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
- subscribe(null, Movable.CONTAINER, eventHandler);
-
- if (LOG.isTraceEnabled()) LOG.trace("{} scanning entities on container group set", this);
- rescanEntities();
- }
-
- private void onContainerAdded(Entity newContainer) {
- if (LOG.isTraceEnabled()) LOG.trace("{} rescanning entities on container {} added", this, newContainer);
- rescanEntities();
- }
-
- private void onContainerRemoved(Entity oldContainer) {
- if (LOG.isTraceEnabled()) LOG.trace("{} rescanning entities on container {} removed", this, oldContainer);
- rescanEntities();
- }
-
- protected void onEntityAdded(Entity item) {
- if (acceptsEntity(item)) {
- if (LOG.isDebugEnabled()) LOG.debug("{} adding new item {}", this, item);
- addMember(item);
- }
- }
-
- protected void onEntityRemoved(Entity item) {
- if (removeMember(item)) {
- if (LOG.isDebugEnabled()) LOG.debug("{} removing deleted item {}", this, item);
- }
- }
-
- private void onItemMoved(Movable item, BalanceableContainer container) {
- if (LOG.isTraceEnabled()) LOG.trace("{} processing moved item {}, to container {}", new Object[] {this, item, container});
- if (hasMember(item)) {
- if (!acceptsItem(item, container)) {
- if (LOG.isDebugEnabled()) LOG.debug("{} removing moved item {} from group, as new container {} is not a member", new Object[] {this, item, container});
- removeMember(item);
- }
- } else {
- if (acceptsItem(item, container)) {
- if (LOG.isDebugEnabled()) LOG.debug("{} adding moved item {} to group, as new container {} is a member", new Object[] {this, item, container});
- addMember(item);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java b/policy/src/main/java/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
deleted file mode 100644
index 57b736b..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import static brooklyn.util.JavaGroovyEquivalents.elvis;
-import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.api.event.AttributeSensor;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.policy.autoscaling.AutoScalerPolicy;
-import brooklyn.policy.loadbalancing.BalanceableWorkerPool.ContainerItemPair;
-import brooklyn.util.collections.MutableMap;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-
-/**
- * <p>Policy that is attached to a pool of "containers", each of which can host one or more migratable "items".
- * The policy monitors the workrates of the items and effects migrations in an attempt to ensure that the containers
- * are all sufficiently utilized without any of them being overloaded.
- *
- * <p>The particular sensor that defines the items' workrates is specified when the policy is constructed. High- and
- * low-thresholds are defined as <strong>configuration keys</strong> on each of the container entities in the pool:
- * for an item sensor named {@code foo.bar.sensorName}, the corresponding container config keys would be named
- * {@code foo.bar.sensorName.threshold.low} and {@code foo.bar.sensorName.threshold.high}.
- *
- * <p>In addition to balancing items among the available containers, this policy causes the pool Entity to emit
- * {@code POOL_COLD} and {@code POOL_HOT} events when it is determined that there is a surplus or shortfall
- * of container resource in the pool respectively. These events may be consumed by a separate policy that is capable
- * of resizing the container pool.
- */
- // removed from catalog because it cannot currently be configured via catalog mechanisms
- // PolicySpec.create fails due to no no-arg constructor
- // TODO make metric and model things which can be initialized from config then reinstate in catalog
-//@Catalog(name="Load Balancer", description="Policy that is attached to a pool of \"containers\", each of which "
-// + "can host one or more migratable \"items\". The policy monitors the workrates of the items and effects "
-// + "migrations in an attempt to ensure that the containers are all sufficiently utilized without any of "
-// + "them being overloaded.")
-public class LoadBalancingPolicy<NodeType extends Entity, ItemType extends Movable> extends AbstractPolicy {
-
- private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicy.class);
-
- @SetFromFlag(defaultVal="100")
- private long minPeriodBetweenExecs;
-
- private final AttributeSensor<? extends Number> metric;
- private final String lowThresholdConfigKeyName;
- private final String highThresholdConfigKeyName;
- private final BalanceablePoolModel<NodeType, ItemType> model;
- private final BalancingStrategy<NodeType, ItemType> strategy;
- private BalanceableWorkerPool poolEntity;
-
- private volatile ScheduledExecutorService executor;
- private final AtomicBoolean executorQueued = new AtomicBoolean(false);
- private volatile long executorTime = 0;
-
- private int lastEmittedDesiredPoolSize = 0;
- private static enum TemperatureStates { COLD, HOT }
- private TemperatureStates lastEmittedPoolTemperature = null; // "cold" or "hot"
-
- private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void onEvent(SensorEvent<Object> event) {
- if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", LoadBalancingPolicy.this, event);
- Entity source = event.getSource();
- Object value = event.getValue();
- Sensor sensor = event.getSensor();
-
- if (sensor.equals(metric)) {
- onItemMetricUpdate((ItemType)source, ((Number) value).doubleValue(), true);
- } else if (sensor.equals(BalanceableWorkerPool.CONTAINER_ADDED)) {
- onContainerAdded((NodeType) value, true);
- } else if (sensor.equals(BalanceableWorkerPool.CONTAINER_REMOVED)) {
- onContainerRemoved((NodeType) value, true);
- } else if (sensor.equals(BalanceableWorkerPool.ITEM_ADDED)) {
- ContainerItemPair pair = (ContainerItemPair) value;
- onItemAdded((ItemType)pair.item, (NodeType)pair.container, true);
- } else if (sensor.equals(BalanceableWorkerPool.ITEM_REMOVED)) {
- ContainerItemPair pair = (ContainerItemPair) value;
- onItemRemoved((ItemType)pair.item, (NodeType)pair.container, true);
- } else if (sensor.equals(BalanceableWorkerPool.ITEM_MOVED)) {
- ContainerItemPair pair = (ContainerItemPair) value;
- onItemMoved((ItemType)pair.item, (NodeType)pair.container, true);
- }
- }
- };
-
- public LoadBalancingPolicy() {
- this(null, null);
- }
-
- public LoadBalancingPolicy(AttributeSensor<? extends Number> metric,
- BalanceablePoolModel<NodeType, ItemType> model) {
- this(MutableMap.of(), metric, model);
- }
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public LoadBalancingPolicy(Map props, AttributeSensor<? extends Number> metric,
- BalanceablePoolModel<NodeType, ItemType> model) {
-
- super(props);
- this.metric = metric;
- this.lowThresholdConfigKeyName = metric.getName()+".threshold.low";
- this.highThresholdConfigKeyName = metric.getName()+".threshold.high";
- this.model = model;
- this.strategy = new BalancingStrategy(getDisplayName(), model); // TODO: extract interface, inject impl
-
- // TODO Should re-use the execution manager's thread pool, somehow
- executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void setEntity(EntityLocal entity) {
- Preconditions.checkArgument(entity instanceof BalanceableWorkerPool, "Provided entity must be a BalanceableWorkerPool");
- super.setEntity(entity);
- this.poolEntity = (BalanceableWorkerPool) entity;
-
- // Detect when containers are added to or removed from the pool.
- subscribe(poolEntity, BalanceableWorkerPool.CONTAINER_ADDED, eventHandler);
- subscribe(poolEntity, BalanceableWorkerPool.CONTAINER_REMOVED, eventHandler);
- subscribe(poolEntity, BalanceableWorkerPool.ITEM_ADDED, eventHandler);
- subscribe(poolEntity, BalanceableWorkerPool.ITEM_REMOVED, eventHandler);
- subscribe(poolEntity, BalanceableWorkerPool.ITEM_MOVED, eventHandler);
-
- // Take heed of any extant containers.
- for (Entity container : poolEntity.getContainerGroup().getMembers()) {
- onContainerAdded((NodeType)container, false);
- }
- for (Entity item : poolEntity.getItemGroup().getMembers()) {
- onItemAdded((ItemType)item, (NodeType)item.getAttribute(Movable.CONTAINER), false);
- }
-
- scheduleRebalance();
- }
-
- @Override
- public void suspend() {
- // TODO unsubscribe from everything? And resubscribe on resume?
- super.suspend();
- if (executor != null) executor.shutdownNow();;
- executorQueued.set(false);
- }
-
- @Override
- public void resume() {
- super.resume();
- executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
- executorTime = 0;
- executorQueued.set(false);
- }
-
- private ThreadFactory newThreadFactory() {
- return new ThreadFactoryBuilder()
- .setNameFormat("brooklyn-followthesunpolicy-%d")
- .build();
- }
-
- private void scheduleRebalance() {
- if (isRunning() && executorQueued.compareAndSet(false, true)) {
- long now = System.currentTimeMillis();
- long delay = Math.max(0, (executorTime + minPeriodBetweenExecs) - now);
-
- executor.schedule(new Runnable() {
- @SuppressWarnings("rawtypes")
- public void run() {
- try {
- executorTime = System.currentTimeMillis();
- executorQueued.set(false);
- strategy.rebalance();
-
- if (LOG.isDebugEnabled()) LOG.debug("{} post-rebalance: poolSize={}; workrate={}; lowThreshold={}; " +
- "highThreshold={}", new Object[] {this, model.getPoolSize(), model.getCurrentPoolWorkrate(),
- model.getPoolLowThreshold(), model.getPoolHighThreshold()});
-
- if (model.isCold()) {
- Map eventVal = ImmutableMap.of(
- AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, model.getPoolSize(),
- AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, model.getCurrentPoolWorkrate(),
- AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, model.getPoolLowThreshold(),
- AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, model.getPoolHighThreshold());
-
- ((EntityLocal)poolEntity).emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, eventVal);
-
- if (LOG.isInfoEnabled()) {
- int desiredPoolSize = (int) Math.ceil(model.getCurrentPoolWorkrate() / (model.getPoolLowThreshold()/model.getPoolSize()));
- if (desiredPoolSize != lastEmittedDesiredPoolSize || lastEmittedPoolTemperature != TemperatureStates.COLD) {
- LOG.info("{} emitted COLD (suggesting {}): {}", new Object[] {this, desiredPoolSize, eventVal});
- lastEmittedDesiredPoolSize = desiredPoolSize;
- lastEmittedPoolTemperature = TemperatureStates.COLD;
- }
- }
-
- } else if (model.isHot()) {
- Map eventVal = ImmutableMap.of(
- AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, model.getPoolSize(),
- AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, model.getCurrentPoolWorkrate(),
- AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, model.getPoolLowThreshold(),
- AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, model.getPoolHighThreshold());
-
- ((EntityLocal)poolEntity).emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, eventVal);
-
- if (LOG.isInfoEnabled()) {
- int desiredPoolSize = (int) Math.ceil(model.getCurrentPoolWorkrate() / (model.getPoolHighThreshold()/model.getPoolSize()));
- if (desiredPoolSize != lastEmittedDesiredPoolSize || lastEmittedPoolTemperature != TemperatureStates.HOT) {
- LOG.info("{} emitted HOT (suggesting {}): {}", new Object[] {this, desiredPoolSize, eventVal});
- lastEmittedDesiredPoolSize = desiredPoolSize;
- lastEmittedPoolTemperature = TemperatureStates.HOT;
- }
- }
- }
-
- } catch (Exception e) {
- if (isRunning()) {
- LOG.error("Error rebalancing", e);
- } else {
- LOG.debug("Error rebalancing, but no longer running", e);
- }
- }
- }},
- delay,
- TimeUnit.MILLISECONDS);
- }
- }
-
- // TODO Can get duplicate onContainerAdded events.
- // I presume it's because we subscribe and then iterate over the extant containers.
- // Solution would be for subscription to give you events for existing / current value(s).
- // Also current impl messes up single-threaded updates model: the setEntity is a different thread than for subscription events.
- private void onContainerAdded(NodeType newContainer, boolean rebalanceNow) {
- Preconditions.checkArgument(newContainer instanceof BalanceableContainer, "Added container must be a BalanceableContainer");
- if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of container {}", this, newContainer);
- // Low and high thresholds for the metric we're interested in are assumed to be present
- // in the container's configuration.
- Number lowThreshold = (Number) findConfigValue(newContainer, lowThresholdConfigKeyName);
- Number highThreshold = (Number) findConfigValue(newContainer, highThresholdConfigKeyName);
- if (lowThreshold == null || highThreshold == null) {
- LOG.warn(
- "Balanceable container '"+newContainer+"' does not define low- and high- threshold configuration keys: '"+
- lowThresholdConfigKeyName+"' and '"+highThresholdConfigKeyName+"', skipping");
- return;
- }
-
- model.onContainerAdded(newContainer, lowThreshold.doubleValue(), highThreshold.doubleValue());
-
- // Note: no need to scan the container for items; they will appear via the ITEM_ADDED events.
- // Also, must abide by any item-filters etc defined in the pool.
-
- if (rebalanceNow) scheduleRebalance();
- }
-
- private static Object findConfigValue(Entity entity, String configKeyName) {
- Map<ConfigKey<?>, Object> config = ((EntityInternal)entity).getAllConfig();
- for (Entry<ConfigKey<?>, Object> entry : config.entrySet()) {
- if (configKeyName.equals(entry.getKey().getName()))
- return entry.getValue();
- }
- return null;
- }
-
- // TODO Receiving duplicates of onContainerRemoved (e.g. when running LoadBalancingInmemorySoakTest)
- private void onContainerRemoved(NodeType oldContainer, boolean rebalanceNow) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of container {}", this, oldContainer);
- model.onContainerRemoved(oldContainer);
- if (rebalanceNow) scheduleRebalance();
- }
-
- private void onItemAdded(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
- Preconditions.checkArgument(item instanceof Movable, "Added item "+item+" must implement Movable");
- if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of item {} in container {}", new Object[] {this, item, parentContainer});
-
- subscribe(item, metric, eventHandler);
-
- // Update the model, including the current metric value (if any).
- boolean immovable = (Boolean)elvis(item.getConfig(Movable.IMMOVABLE), false);
- Number currentValue = item.getAttribute(metric);
- model.onItemAdded(item, parentContainer, immovable);
- if (currentValue != null)
- model.onItemWorkrateUpdated(item, currentValue.doubleValue());
-
- if (rebalanceNow) scheduleRebalance();
- }
-
- private void onItemRemoved(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of item {}", this, item);
- unsubscribe(item);
- model.onItemRemoved(item);
- if (rebalanceNow) scheduleRebalance();
- }
-
- private void onItemMoved(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording moving of item {} to {}", new Object[] {this, item, parentContainer});
- model.onItemMoved(item, parentContainer);
- if (rebalanceNow) scheduleRebalance();
- }
-
- private void onItemMetricUpdate(ItemType item, double newValue, boolean rebalanceNow) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording metric update for item {}, new value {}", new Object[] {this, item, newValue});
- model.onItemWorkrateUpdated(item, newValue);
- if (rebalanceNow) scheduleRebalance();
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : "");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/LocationConstraint.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/LocationConstraint.java b/policy/src/main/java/brooklyn/policy/loadbalancing/LocationConstraint.java
deleted file mode 100644
index 5ae5723..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/LocationConstraint.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import org.apache.brooklyn.api.location.Location;
-
-/**
- * Temporary stub to resolve dependencies in ported LoadBalancingPolicy.
- */
-public class LocationConstraint {
- public boolean isPermitted(Location l) { return true; } // TODO
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/Movable.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/Movable.java b/policy/src/main/java/brooklyn/policy/loadbalancing/Movable.java
deleted file mode 100644
index b0f1658..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/Movable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.annotation.Effector;
-import brooklyn.entity.annotation.EffectorParam;
-import brooklyn.entity.basic.MethodEffector;
-import brooklyn.event.basic.BasicAttributeSensor;
-import brooklyn.event.basic.BasicConfigKey;
-
-
-/**
- * Represents an item that can be migrated between balanceable containers.
- */
-public interface Movable extends Entity {
-
- @SetFromFlag("immovable")
- public static ConfigKey<Boolean> IMMOVABLE = new BasicConfigKey<Boolean>(
- Boolean.class, "movable.item.immovable", "Indicates whether this item instance is immovable, so cannot be moved by policies", false);
-
- public static BasicAttributeSensor<BalanceableContainer> CONTAINER = new BasicAttributeSensor<BalanceableContainer>(
- BalanceableContainer.class, "movable.item.container", "The container that this item is on");
-
- public static final MethodEffector<Void> MOVE = new MethodEffector<Void>(Movable.class, "move");
-
- public String getContainerId();
-
- @Effector(description="Moves this entity to the given container")
- public void move(@EffectorParam(name="destination") Entity destination);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/PolicyUtilForPool.java b/policy/src/main/java/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
deleted file mode 100644
index ceead1c..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.util.Set;
-
-/**
- * Provides conveniences for searching for hot/cold containers in a provided pool model.
- * Ported from Monterey v3, with irrelevant bits removed.
- */
-public class PolicyUtilForPool<ContainerType, ItemType> {
-
- private final BalanceablePoolModel<ContainerType, ItemType> model;
-
-
- public PolicyUtilForPool (BalanceablePoolModel<ContainerType, ItemType> model) {
- this.model = model;
- }
-
- public ContainerType findColdestContainer(Set<ContainerType> excludedContainers) {
- return findColdestContainer(excludedContainers, null);
- }
-
- /**
- * Identifies the container with the maximum spare capacity (highThreshold - currentWorkrate),
- * returns null if none of the model's nodes has spare capacity.
- */
- public ContainerType findColdestContainer(Set<ContainerType> excludedContainers, LocationConstraint locationConstraint) {
- double maxSpareCapacity = 0;
- ContainerType coldest = null;
-
- for (ContainerType c : model.getPoolContents()) {
- if (excludedContainers.contains(c))
- continue;
- if (locationConstraint != null && !locationConstraint.isPermitted(model.getLocation(c)))
- continue;
-
- double highThreshold = model.getHighThreshold(c);
- double totalWorkrate = model.getTotalWorkrate(c);
- double spareCapacity = highThreshold - totalWorkrate;
-
- if (highThreshold == -1 || totalWorkrate == -1) {
- continue; // container presumably has been removed
- }
- if (spareCapacity > maxSpareCapacity) {
- maxSpareCapacity = spareCapacity;
- coldest = c;
- }
- }
- return coldest;
- }
-
- /**
- * Identifies the container with the maximum overshoot (currentWorkrate - highThreshold),
- * returns null if none of the model's nodes has an overshoot.
- */
- public ContainerType findHottestContainer(Set<ContainerType> excludedContainers) {
- double maxOvershoot = 0;
- ContainerType hottest = null;
-
- for (ContainerType c : model.getPoolContents()) {
- if (excludedContainers.contains(c))
- continue;
-
- double totalWorkrate = model.getTotalWorkrate(c);
- double highThreshold = model.getHighThreshold(c);
- double overshoot = totalWorkrate - highThreshold;
-
- if (highThreshold == -1 || totalWorkrate == -1) {
- continue; // container presumably has been removed
- }
- if (overshoot > maxOvershoot) {
- maxOvershoot = overshoot;
- hottest = c;
- }
- }
- return hottest;
- }
-
-}
\ No newline at end of file