You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:11 UTC
[13/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming
package policy
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java
new file mode 100644
index 0000000..9d3968b
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an abstract algorithm for optimally balancing worker "items" among several "containers" based on the workloads
+ * of the items, and corresponding high- and low-thresholds on the containers.
+ *
+ * TODO: extract interface, provide default implementation
+ * TODO: remove legacy code comments
+ */
+public class BalancingStrategy<NodeType extends Entity, ItemType extends Movable> {
+
+ // FIXME Bad idea to use MessageFormat.format in this way; if toString of entity contains
+ // special characters interpreted by MessageFormat, then it will all break!
+
+ // This is a modified version of the watermark elasticity policy from Monterey v3.
+
+ private static final Logger LOG = LoggerFactory.getLogger(BalancingStrategy.class);
+
+ private static final int MAX_MIGRATIONS_PER_BALANCING_NODE = 20; // arbitrary (Splodge)
+ private static final boolean BALANCE_COLD_PULLS_IN_SAME_RUN_AS_HOT_PUSHES = false;
+
+ private final String name;
+ private final BalanceablePoolModel<NodeType, ItemType> model;
+ private final PolicyUtilForPool<NodeType, ItemType> helper;
+// private boolean loggedColdestTooHigh = false;
+// private boolean loggedHottestTooLow = false;
+
+
+ public BalancingStrategy(String name, BalanceablePoolModel<NodeType, ItemType> model) {
+ this.name = name;
+ this.model = model;
+ this.helper = new PolicyUtilForPool<NodeType, ItemType>(model);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void rebalance() {
+ checkAndApplyOn(model.getPoolContents());
+ }
+
+ public int getMaxMigrationsPerBalancingNode() {
+ return MAX_MIGRATIONS_PER_BALANCING_NODE;
+ }
+
+ public BalanceablePoolModel<NodeType, ItemType> getDataProvider() {
+ return model;
+ }
+
+ // This was the entry point for the legacy policy.
+ private void checkAndApplyOn(final Collection<NodeType> dirtyNodesSupplied) {
+ Collection<NodeType> dirtyNodes = dirtyNodesSupplied;
+
+// if (startTime + FORCE_ALL_NODES_IF_DELAYED_FOR_MILLIS < System.currentTimeMillis()) {
+// Set<NodeType> allNodes = new LinkedHashSet<NodeType>();
+// allNodes.addAll(dirtyNodes);
+// dirtyNodes = allNodes;
+// allNodes.addAll(getDataProvider().getPoolContents());
+// if (LOG.isDebugEnabled())
+// LOG.debug("policy "+getDataProvider().getAbbr()+" running after delay ("+
+// TimeUtils.makeTimeString(System.currentTimeMillis() - startTime)+", analysing all nodes: "+
+// dirtyNodes);
+// }
+
+// nodeFinder.optionalCachedNodesWithBacklogDetected.clear();
+// boolean gonnaGrow = growPool(dirtyNodes);
+// getDataProvider().waitForAllTransitionsComplete();
+ boolean gonnaGrow = false;
+
+ Set<NodeType> nonFrozenDirtyNodes = new LinkedHashSet<NodeType>(dirtyNodes);
+// boolean gonnaShrink = false;
+// if (!gonnaGrow && !DONT_SHRINK_UNLESS_BALANCED) {
+// gonnaShrink = shrinkPool(nonFrozenDirtyNodes);
+// getDataProvider().waitForAllTransitionsComplete();
+// }
+
+ if (getDataProvider().getPoolSize() >= 2) {
+ boolean didBalancing = false;
+ for (NodeType a : nonFrozenDirtyNodes) {
+ didBalancing |= balanceItemsOnNodesInQuestion(a, gonnaGrow);
+// getMutator().waitForAllTransitionsComplete();
+ }
+ if (didBalancing) {
+ return;
+ }
+ }
+
+// if (!gonnaGrow && DONT_SHRINK_UNLESS_BALANCED) {
+// gonnaShrink = shrinkPool(nonFrozenDirtyNodes);
+// getDataProvider().waitForAllTransitionsComplete();
+// }
+
+// if (gonnaGrow || gonnaShrink)
+// //don't log 'doing nothing' message
+// return;
+
+// if (LOG.isDebugEnabled()) {
+// double poolTotal = getDataProvider().getPoolPredictedWorkrateTotal();
+// int poolSize = getDataProvider().getPoolPredictedSize();
+// LOG.debug(MessageFormat.format("policy "+getDataProvider().getAbbr()+" did nothing; pool workrate is {0,number,#.##} x {1} nodes",
+// 1.0*poolTotal/poolSize, poolSize));
+// }
+ }
+
+ protected boolean balanceItemsOnNodesInQuestion(NodeType questionedNode, boolean gonnaGrow) {
+ double questionedNodeTotalWorkrate = getDataProvider().getTotalWorkrate(questionedNode);
+
+ boolean balanced = balanceItemsOnHotNode(questionedNode, questionedNodeTotalWorkrate, gonnaGrow);
+// getMutator().waitForAllTransitionsComplete();
+
+ if (!balanced || BALANCE_COLD_PULLS_IN_SAME_RUN_AS_HOT_PUSHES) {
+ balanced |= balanceItemsOnColdNode(questionedNode, questionedNodeTotalWorkrate, gonnaGrow);
+// getMutator().waitForAllTransitionsComplete();
+ }
+ if (balanced)
+ return true;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" not balancing "+questionedNode+"; " +
+ "its workrate {0,number,#.##} is acceptable (or cannot be balanced)", questionedNodeTotalWorkrate) );
+ }
+ return false;
+ }
+
+ protected boolean balanceItemsOnHotNode(NodeType node, double nodeWorkrate, boolean gonnaGrow) {
+ double originalNodeWorkrate = nodeWorkrate;
+ int migrationCount = 0;
+ int iterationCount = 0;
+
+ Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>();
+ Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>();
+
+// if (nodeFinder.config.COUNT_BACKLOG_AS_EXTRA_WORKRATE) {
+// int backlog = nodeFinder.getBacklogQueueLength(questionedNode);
+// if (backlog>0) {
+// Level l = backlog>1000 ? Level.INFO : backlog>10 ? Level.FINE : Level.FINER;
+// if (LOG.isLoggable(l)) {
+// LOG.log( l, MessageFormat.format(
+// "policy "+getDataProvider().getAbbr()+" detected queue at node "+questionedNode+", " +
+// "inflating workrate {0,number,#.##} + "+backlog, questionedNodeTotalWorkrate) );
+// }
+// questionedNodeTotalWorkrate += backlog;
+// }
+// }
+
+ Double highThreshold = model.getHighThreshold(node);
+ if (highThreshold == -1) {
+ // node presumably has been removed; TODO log
+ return false;
+ }
+
+ while (nodeWorkrate > highThreshold && migrationCount < getMaxMigrationsPerBalancingNode()) {
+ iterationCount++;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(MessageFormat.format(
+ "policy "+getDataProvider().getName()+" considering balancing hot node "+node+" " +
+ "(workrate {0,number,#.##}); iteration "+iterationCount, nodeWorkrate) );
+ }
+
+ // move from hot node, to coldest
+
+ NodeType coldNode = helper.findColdestContainer(nodesChecked);
+
+ if (coldNode == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" not balancing hot node "+node+" " +
+ "(workrate {0,number,#.##}); no coldest node available", nodeWorkrate) );
+ }
+ break;
+ }
+
+ if (coldNode.equals(node)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" not balancing hot node "+node+" " +
+ "(workrate {0,number,#.##}); it is also the coldest modifiable node", nodeWorkrate) );
+ }
+ break;
+ }
+
+ double coldNodeWorkrate = getDataProvider().getTotalWorkrate(coldNode);
+ boolean emergencyLoadBalancing = coldNodeWorkrate < nodeWorkrate*2/3;
+ double coldNodeHighThreshold = model.getHighThreshold(coldNode);
+ if (coldNodeWorkrate >= coldNodeHighThreshold && !emergencyLoadBalancing) {
+ //don't balance if all nodes are approx equally hot (and very hot)
+
+ //for now, stop warning if it is a recurring theme!
+// Level level = loggedColdestTooHigh ? Level.FINER : Level.INFO;
+// LOG.log(level, MessageFormat.format(
+// "policy "+getDataProvider().getAbbr()+" not balancing hot node "+questionedNode+" " +
+// "(workrate {0,number,#.##}); coldest node "+coldNode+" has workrate {1,number,#.##} also too high"+
+// (loggedColdestTooHigh ? "" : " (future cases will be logged at finer)"),
+// questionedNodeTotalWorkrate, coldNodeWorkrate) );
+// loggedColdestTooHigh = true;
+ break;
+ }
+ double poolLowWatermark = Double.MAX_VALUE; // TODO
+ if (gonnaGrow && (coldNodeWorkrate >= poolLowWatermark && !emergencyLoadBalancing)) {
+ //if we're growing the pool, refuse to balance unless the cold node is indeed very cold, or hot node very hot
+
+ //for now, stop warning if it is a recurring theme!
+// Level level = loggedColdestTooHigh ? Level.FINER : Level.INFO;
+// LOG.log(level, MessageFormat.format(
+// "policy "+getDataProvider().getAbbr()+" not balancing hot node "+questionedNode+" " +
+// "(workrate {0,number,#.##}); coldest node "+coldNode+" has workrate {1,number,#.##} also too high to accept while pool is growing" +
+// (loggedColdestTooHigh ? "" : " (future cases will be logged at finer)"),
+// questionedNodeTotalWorkrate, coldNodeWorkrate) );
+// loggedColdestTooHigh = true;
+ break;
+ }
+
+ String questionedNodeName = getDataProvider().getName(node);
+ String coldNodeName = getDataProvider().getName(coldNode);
+ Location coldNodeLocation = getDataProvider().getLocation(coldNode);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
+ "("+node+", workrate {0,number,#.##}), " +
+ "considering target "+coldNodeName+" ("+coldNode+", workrate {1,number,#.##})",
+ nodeWorkrate, coldNodeWorkrate) );
+ }
+
+ double idealSizeToMove = (nodeWorkrate - coldNodeWorkrate) / 2;
+ //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount
+
+ if (idealSizeToMove + coldNodeWorkrate > coldNodeHighThreshold)
+ idealSizeToMove = coldNodeHighThreshold - coldNodeWorkrate;
+
+
+ double maxSizeToMoveIdeally = Math.min(
+ nodeWorkrate/2 + 0.00001,
+ //permit it to exceed node high if there is no alternative (this is 'max' not 'ideal'),
+ //so long as it still gives a significant benefit
+ // getConfiguration().nodeHighWaterMark - coldNodeWorkrate,
+ (nodeWorkrate - coldNodeWorkrate)*0.9);
+ double maxSizeToMoveIfNoSmallButLarger = nodeWorkrate*3/4;
+
+ Map<ItemType, Double> questionedNodeItems = getDataProvider().getItemWorkrates(node);
+ if (questionedNodeItems == null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
+ "("+node+", workrate {0,number,#.##}), abandoned; " +
+ "item report for " + questionedNodeName + " unavailable",
+ nodeWorkrate));
+ break;
+ }
+ ItemType itemToMove = findBestItemToMove(questionedNodeItems, idealSizeToMove, maxSizeToMoveIdeally,
+ maxSizeToMoveIfNoSmallButLarger, itemsMoved, coldNodeLocation);
+
+ if (itemToMove == null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
+ "("+node+", workrate {0,number,#.##}), ending; " +
+ "no suitable segment found " +
+ "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " +
+ "moving to coldest node "+coldNodeName+" ("+coldNode+", workrate {3,number,#.##}); available items: {4}",
+ nodeWorkrate, idealSizeToMove, maxSizeToMoveIdeally, coldNodeWorkrate, questionedNodeItems) );
+ break;
+ }
+
+ itemsMoved.add(itemToMove);
+ double itemWorkrate = questionedNodeItems.get(itemToMove);
+
+// if (LOG.isLoggable(Level.FINE))
+// LOG.fine( MessageFormat.format(
+// "policy "+getDataProvider().getAbbr()+" balancing hot node "+questionedNodeName+" " +
+// "(workrate {0,number,#.##}, too high), transitioning " + itemToMove +
+// " to "+coldNodeName+" (workrate {1,number,#.##}, now += {2,number,#.##})",
+// questionedNodeTotalWorkrate, coldNodeWorkrate, segmentRate) );
+
+ nodeWorkrate -= itemWorkrate;
+ coldNodeWorkrate += itemWorkrate;
+
+ moveItem(itemToMove, node, coldNode);
+ ++migrationCount;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ if (iterationCount == 0) {
+ if (LOG.isTraceEnabled())
+ LOG.trace( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing if hot finished at node "+node+"; " +
+ "workrate {0,number,#.##} not hot",
+ originalNodeWorkrate) );
+ }
+ else if (itemsMoved.isEmpty()) {
+ if (LOG.isTraceEnabled())
+ LOG.trace( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing finished at hot node "+node+" " +
+ "(workrate {0,number,#.##}); no way to improve it",
+ originalNodeWorkrate) );
+ } else {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing finished at hot node "+node+"; " +
+ "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " +
+ "by moving off {3}",
+ originalNodeWorkrate,
+ nodeWorkrate,
+ getDataProvider().getTotalWorkrate(node),
+ itemsMoved
+ ) );
+ }
+ }
+ return !itemsMoved.isEmpty();
+ }
+
+ protected boolean balanceItemsOnColdNode(NodeType questionedNode, double questionedNodeTotalWorkrate, boolean gonnaGrow) {
+ // Abort if the node has pending adjustments.
+ Map<ItemType, Double> items = getDataProvider().getItemWorkrates(questionedNode);
+ if (items == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
+ "(workrate {0,number,#.##}); workrate breakdown unavailable (probably reverting)",
+ questionedNodeTotalWorkrate) );
+ }
+ return false;
+ }
+ for (ItemType item : items.keySet()) {
+ if (!model.isItemMoveable(item)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
+ "(workrate {0,number,#.##}); at least one item ("+item+") is in flux",
+ questionedNodeTotalWorkrate) );
+ }
+ return false;
+ }
+ }
+
+ double originalQuestionedNodeTotalWorkrate = questionedNodeTotalWorkrate;
+ int numMigrations = 0;
+
+ Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>();
+ Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>();
+
+ int iters = 0;
+ Location questionedLocation = getDataProvider().getLocation(questionedNode);
+
+ double lowThreshold = model.getLowThreshold(questionedNode);
+ while (questionedNodeTotalWorkrate < lowThreshold) {
+ iters++;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" considering balancing cold node "+questionedNode+" " +
+ "(workrate {0,number,#.##}); iteration "+iters, questionedNodeTotalWorkrate));
+ }
+
+ // move from cold node, to hottest
+
+ NodeType hotNode = helper.findHottestContainer(nodesChecked);
+
+ if (hotNode == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
+ "(workrate {0,number,#.##}); no hottest node available", questionedNodeTotalWorkrate) );
+ }
+
+ break;
+ }
+ if (hotNode.equals(questionedNode)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
+ "(workrate {0,number,#.##}); it is also the hottest modfiable node", questionedNodeTotalWorkrate) );
+ }
+ break;
+ }
+
+
+ double hotNodeWorkrate = getDataProvider().getTotalWorkrate(hotNode);
+ double hotNodeLowThreshold = model.getLowThreshold(hotNode);
+ double hotNodeHighThreshold = model.getHighThreshold(hotNode);
+ boolean emergencyLoadBalancing = false; //doesn't apply to cold
+ if (hotNodeWorkrate == -1 || hotNodeLowThreshold == -1 || hotNodeHighThreshold == -1) {
+ // hotNode presumably has been removed; TODO log
+ break;
+ }
+ if (hotNodeWorkrate <= hotNodeLowThreshold && !emergencyLoadBalancing) {
+ //don't balance if all nodes are too low
+
+ //for now, stop warning if it is a recurring theme!
+// Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
+// LOG.log(level, MessageFormat.format(
+// "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
+// "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low" +
+// (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
+// questionedNodeTotalWorkrate, hotNodeWorkrate) );
+// loggedHottestTooLow = true;
+ break;
+ }
+ if (gonnaGrow && (hotNodeWorkrate <= hotNodeHighThreshold && !emergencyLoadBalancing)) {
+ //if we're growing the pool, refuse to balance unless the hot node is quite hot
+
+ //again, stop warning if it is a recurring theme!
+// Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
+// LOG.log(level, MessageFormat.format(
+// "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
+// "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low to accept while pool is growing"+
+// (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
+// questionedNodeTotalWorkrate, hotNodeWorkrate) );
+// loggedHottestTooLow = true;
+ break;
+ }
+
+ String questionedNodeName = getDataProvider().getName(questionedNode);
+ String hotNodeName = getDataProvider().getName(hotNode);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
+ "("+questionedNode+", workrate {0,number,#.##}), " +
+ "considering source "+hotNodeName+" ("+hotNode+", workrate {1,number,#.##})",
+ questionedNodeTotalWorkrate, hotNodeWorkrate) );
+ }
+
+ double idealSizeToMove = (hotNodeWorkrate - questionedNodeTotalWorkrate) / 2;
+ //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount
+ double targetNodeHighThreshold = model.getHighThreshold(questionedNode);
+ if (idealSizeToMove + questionedNodeTotalWorkrate > targetNodeHighThreshold)
+ idealSizeToMove = targetNodeHighThreshold - questionedNodeTotalWorkrate;
+ double maxSizeToMoveIdeally = Math.min(
+ hotNodeWorkrate/2,
+ //allow to swap order, but not very much (0.9 was allowed when balancing high)
+ (hotNodeWorkrate - questionedNodeTotalWorkrate)*0.6);
+ double maxSizeToMoveIfNoSmallButLarger = questionedNodeTotalWorkrate*3/4;
+
+ Map<ItemType, Double> hotNodeItems = getDataProvider().getItemWorkrates(hotNode);
+ if (hotNodeItems == null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
+ "("+questionedNode+", workrate {0,number,#.##}), " +
+ "excluding hot node "+hotNodeName+" because its item report unavailable",
+ questionedNodeTotalWorkrate));
+ nodesChecked.add(hotNode);
+ continue;
+ }
+
+ ItemType itemToMove = findBestItemToMove(hotNodeItems, idealSizeToMove, maxSizeToMoveIdeally,
+ maxSizeToMoveIfNoSmallButLarger, itemsMoved, questionedLocation);
+ if (itemToMove == null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
+ "("+questionedNode+", workrate {0,number,#.##}), " +
+ "excluding hot node "+hotNodeName+" because it has no appilcable items " +
+ "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " +
+ "moving from hot node "+hotNodeName+" ("+hotNode+", workrate {3,number,#.##}); available items: {4}",
+ questionedNodeTotalWorkrate, idealSizeToMove, maxSizeToMoveIdeally, hotNodeWorkrate, hotNodeItems) );
+
+ nodesChecked.add(hotNode);
+ continue;
+ }
+
+ itemsMoved.add(itemToMove);
+ double segmentRate = hotNodeItems.get(itemToMove);
+
+// if (LOG.isLoggable(Level.FINE))
+// LOG.fine( MessageFormat.format(
+// "policy "+getDataProvider().getAbbr()+" balancing cold node "+questionedNodeName+" " +
+// "(workrate {0,number,#.##}, too low), transitioning " + itemToMove +
+// " from "+hotNodeName+" (workrate {1,number,#.##}, now -= {2,number,#.##})",
+// questionedNodeTotalWorkrate, hotNodeWorkrate, segmentRate) );
+
+ questionedNodeTotalWorkrate += segmentRate;
+ hotNodeWorkrate -= segmentRate;
+
+ moveItem(itemToMove, hotNode, questionedNode);
+
+ if (++numMigrations >= getMaxMigrationsPerBalancingNode()) {
+ break;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ if (iters == 0) {
+ if (LOG.isTraceEnabled())
+ LOG.trace( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing if cold finished at node "+questionedNode+"; " +
+ "workrate {0,number,#.##} not cold",
+ originalQuestionedNodeTotalWorkrate) );
+ }
+ else if (itemsMoved.isEmpty()) {
+ if (LOG.isTraceEnabled())
+ LOG.trace( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+" " +
+ "(workrate {0,number,#.##}); no way to improve it",
+ originalQuestionedNodeTotalWorkrate) );
+ } else {
+ LOG.debug( MessageFormat.format(
+ "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+"; " +
+ "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " +
+ "by moving in {3}",
+ originalQuestionedNodeTotalWorkrate,
+ questionedNodeTotalWorkrate,
+ getDataProvider().getTotalWorkrate(questionedNode),
+ itemsMoved) );
+ }
+ }
+ return !itemsMoved.isEmpty();
+ }
+
+ protected void moveItem(ItemType item, NodeType oldNode, NodeType newNode) {
+ item.move(newNode);
+ model.onItemMoved(item, newNode);
+ }
+
+ /**
+ * "Best" is defined as nearest to the targetCost, without exceeding maxCost, unless maxCostIfNothingSmallerButLarger > 0
+ * which does just that (useful if the ideal and target are estimates and aren't quite right, typically it will take
+ * something larger than maxRate but less than half the total rate, which is only possible when the estimates don't agree)
+ */
+ protected ItemType findBestItemToMove(Map<ItemType, Double> costsPerItem, double targetCost, double maxCost,
+ double maxCostIfNothingSmallerButLarger, Set<ItemType> excludedItems, Location locationIfKnown) {
+
+ ItemType closestMatch = null;
+ ItemType smallestMoveable = null, largest = null;
+ double minDiff = Double.MAX_VALUE, smallestC = Double.MAX_VALUE, largestC = Double.MIN_VALUE;
+ boolean exclusions = false;
+
+ for (Entry<ItemType, Double> entry : costsPerItem.entrySet()) {
+ ItemType item = entry.getKey();
+ Double cost = entry.getValue();
+
+ if (cost == null) {
+ if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' has null workrate: skipping", item));
+ continue;
+ }
+
+ if (!model.isItemMoveable(item)) {
+ if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' cannot be moved: skipping", item));
+ continue;
+ }
+ if (cost < 0) {
+ if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' subject to recent adjustment: skipping", item));
+ continue;
+ }
+ if (excludedItems.contains(item)) {
+ exclusions = true;
+ continue;
+ }
+ if (cost < 0) { // FIXME: already tested above
+ exclusions = true;
+ continue;
+ }
+ if (cost <= 0) { // FIXME: overlaps previous condition
+ continue;
+ }
+ if (largest == null || cost > largestC) {
+ largest = item;
+ largestC = cost;
+ }
+ if (!model.isItemMoveable(item)) { // FIXME: already tested above
+ continue;
+ }
+ if (locationIfKnown != null && !model.isItemAllowedIn(item, locationIfKnown)) {
+ continue;
+ }
+ if (smallestMoveable == null || cost < smallestC) {
+ smallestMoveable = item;
+ smallestC = cost;
+ }
+ if (cost > maxCost) {
+ continue;
+ }
+ double diff = Math.abs(targetCost - cost);
+ if (closestMatch == null || diff < minDiff) {
+ closestMatch = item;
+ minDiff = diff;
+ }
+ }
+
+ if (closestMatch != null)
+ return closestMatch;
+
+ if (smallestC < maxCostIfNothingSmallerButLarger && smallestC < largestC && !exclusions)
+ return smallestMoveable;
+
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
new file mode 100644
index 0000000..5f6cabc
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.location.Location;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+
+/**
+ * Standard implementation of {@link BalanceablePoolModel}, providing essential arithmetic for item and container
+ * workrates and thresholds. See subclasses for specific requirements for migrating items.
+ */
+public class DefaultBalanceablePoolModel<ContainerType, ItemType> implements BalanceablePoolModel<ContainerType, ItemType> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultBalanceablePoolModel.class);
+
+ /*
+ * Performance comments.
+ * - Used hprof with LoadBalancingPolicySoakTest.testLoadBalancingManyManyItemsTest (1000 items)
+ * - Prior to adding containerToItems, it created a new set by iterating over all items.
+ * This was the biggest percentage of any brooklyn code.
+ * Hence it's worth duplicating the values, keyed by item and keyed by container.
+ * - Unfortunately changing threading model (so have a "rebalancer" thread, and a thread that
+ * processes events to update the model), get ConcurrentModificationException if don't take
+ * copy of containerToItems.get(node)...
+ */
+
+ // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item
+ private static final String NULL_CONTAINER = "null-container";
+
+ private final String name;
+ private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>());
+ private final Map<ContainerType, Double> containerToLowThreshold = new ConcurrentHashMap<ContainerType, Double>();
+ private final Map<ContainerType, Double> containerToHighThreshold = new ConcurrentHashMap<ContainerType, Double>();
+ private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>();
+ private final SetMultimap<ContainerType, ItemType> containerToItems = Multimaps.synchronizedSetMultimap(HashMultimap.<ContainerType, ItemType>create());
+ private final Map<ItemType, Double> itemToWorkrate = new ConcurrentHashMap<ItemType, Double>();
+ private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>());
+
+ private volatile double poolLowThreshold = 0;
+ private volatile double poolHighThreshold = 0;
+ private volatile double currentPoolWorkrate = 0;
+
+ public DefaultBalanceablePoolModel(String name) {
+ this.name = name;
+ }
+
+ public ContainerType getParentContainer(ItemType item) {
+ ContainerType result = itemToContainer.get(item);
+ return (result != NULL_CONTAINER) ? result : null;
+ }
+
+ public Set<ItemType> getItemsForContainer(ContainerType node) {
+ Set<ItemType> result = containerToItems.get(node);
+ synchronized (containerToItems) {
+ return (result != null) ? ImmutableSet.copyOf(result) : Collections.<ItemType>emptySet();
+ }
+ }
+
+ public Double getItemWorkrate(ItemType item) {
+ return itemToWorkrate.get(item);
+ }
+
+ @Override public double getPoolLowThreshold() { return poolLowThreshold; }
+ @Override public double getPoolHighThreshold() { return poolHighThreshold; }
+ @Override public double getCurrentPoolWorkrate() { return currentPoolWorkrate; }
+ @Override public boolean isHot() { return !containers.isEmpty() && currentPoolWorkrate > poolHighThreshold; }
+ @Override public boolean isCold() { return !containers.isEmpty() && currentPoolWorkrate < poolLowThreshold; }
+
+
+ // Provider methods.
+
+ @Override public String getName() { return name; }
+ @Override public int getPoolSize() { return containers.size(); }
+ @Override public Set<ContainerType> getPoolContents() { return containers; }
+ @Override public String getName(ContainerType container) { return container.toString(); } // TODO: delete?
+ @Override public Location getLocation(ContainerType container) { return null; } // TODO?
+
+ @Override public double getLowThreshold(ContainerType container) {
+ Double result = containerToLowThreshold.get(container);
+ return (result != null) ? result : -1;
+ }
+
+ @Override public double getHighThreshold(ContainerType container) {
+ Double result = containerToHighThreshold.get(container);
+ return (result != null) ? result : -1;
+ }
+
+ @Override public double getTotalWorkrate(ContainerType container) {
+ double totalWorkrate = 0;
+ for (ItemType item : getItemsForContainer(container)) {
+ Double workrate = itemToWorkrate.get(item);
+ if (workrate != null)
+ totalWorkrate += Math.abs(workrate);
+ }
+ return totalWorkrate;
+ }
+
+ @Override public Map<ContainerType, Double> getContainerWorkrates() {
+ Map<ContainerType, Double> result = new LinkedHashMap<ContainerType, Double>();
+ for (ContainerType node : containers)
+ result.put(node, getTotalWorkrate(node));
+ return result;
+ }
+
+ @Override public Map<ItemType, Double> getItemWorkrates(ContainerType node) {
+ Map<ItemType, Double> result = new LinkedHashMap<ItemType, Double>();
+ for (ItemType item : getItemsForContainer(node))
+ result.put(item, itemToWorkrate.get(item));
+ return result;
+ }
+
+ @Override public boolean isItemMoveable(ItemType item) {
+ // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable?
+ return itemToContainer.containsKey(item) && !immovableItems.contains(item);
+ }
+
+ @Override public boolean isItemAllowedIn(ItemType item, Location location) {
+ return true; // TODO?
+ }
+
+
+ // Mutators.
+
+ @Override
+ public void onItemMoved(ItemType item, ContainerType newNode) {
+ if (!itemToContainer.containsKey(item)) {
+ // Item may have been deleted; order of events received from different sources
+ // (i.e. item itself and for itemGroup membership) is non-deterministic.
+ LOG.info("Balanceable pool model ignoring onItemMoved for unknown item {} to container {}; " +
+ "if onItemAdded subsequently received will get new container then", item, newNode);
+ return;
+ }
+ ContainerType newNodeNonNull = toNonNullContainer(newNode);
+ ContainerType oldNode = itemToContainer.put(item, newNodeNonNull);
+ if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
+ if (newNode != null) containerToItems.put(newNode, item);
+ }
+
+ @Override
+ public void onContainerAdded(ContainerType newContainer, double lowThreshold, double highThreshold) {
+ boolean added = containers.add(newContainer);
+ if (!added) {
+ // See LoadBalancingPolicy.onContainerAdded for possible explanation of why can get duplicate calls
+ LOG.debug("Duplicate container-added event for {}; ignoring", newContainer);
+ return;
+ }
+ containerToLowThreshold.put(newContainer, lowThreshold);
+ containerToHighThreshold.put(newContainer, highThreshold);
+ poolLowThreshold += lowThreshold;
+ poolHighThreshold += highThreshold;
+ }
+
+ @Override
+ public void onContainerRemoved(ContainerType oldContainer) {
+ containers.remove(oldContainer);
+ Double containerLowThreshold = containerToLowThreshold.remove(oldContainer);
+ Double containerHighThresold = containerToHighThreshold.remove(oldContainer);
+ poolLowThreshold -= (containerLowThreshold != null ? containerLowThreshold : 0);
+ poolHighThreshold -= (containerHighThresold != null ? containerHighThresold : 0);
+
+ // TODO: assert no orphaned items
+ }
+
+ @Override
+ public void onItemAdded(ItemType item, ContainerType parentContainer) {
+ onItemAdded(item, parentContainer, false);
+ }
+
+ @Override
+ public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable) {
+ // Duplicate calls to onItemAdded do no harm, as long as most recent is most accurate!
+ // Important that it stays that way for now - See LoadBalancingPolicy.onContainerAdded for explanation.
+
+ if (immovable)
+ immovableItems.add(item);
+
+ ContainerType parentContainerNonNull = toNonNullContainer(parentContainer);
+ ContainerType oldNode = itemToContainer.put(item, parentContainerNonNull);
+ if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
+ if (parentContainer != null) containerToItems.put(parentContainer, item);
+ }
+
+ @Override
+ public void onItemRemoved(ItemType item) {
+ ContainerType oldNode = itemToContainer.remove(item);
+ if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
+ Double workrate = itemToWorkrate.remove(item);
+ if (workrate != null)
+ currentPoolWorkrate -= workrate;
+ immovableItems.remove(item);
+ }
+
+ @Override
+ public void onItemWorkrateUpdated(ItemType item, double newValue) {
+ if (hasItem(item)) {
+ Double oldValue = itemToWorkrate.put(item, newValue);
+ double delta = ( newValue - (oldValue != null ? oldValue : 0) );
+ currentPoolWorkrate += delta;
+ } else {
+ // Can happen when item removed - get notification of removal and workrate from group and item
+ // respectively, so can overtake each other
+ if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of workrate for unknown item {}, to {}", item, newValue);
+ }
+ }
+
+ private boolean hasItem(ItemType item) {
+ return itemToContainer.containsKey(item);
+ }
+
+
+ // Additional methods for tests.
+
+ /**
+ * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers.
+ */
+ @VisibleForTesting
+ public String itemDistributionToString() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ dumpItemDistribution(new PrintStream(baos));
+ return new String(baos.toByteArray());
+ }
+
+ @VisibleForTesting
+ public void dumpItemDistribution() {
+ dumpItemDistribution(System.out);
+ }
+
+ @VisibleForTesting
+ public void dumpItemDistribution(PrintStream out) {
+ for (ContainerType container : getPoolContents()) {
+ out.println("Container '"+container+"': ");
+ for (ItemType item : getItemsForContainer(container)) {
+ Double workrate = getItemWorkrate(item);
+ out.println("\t"+"Item '"+item+"' ("+workrate+")");
+ }
+ }
+ out.flush();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ContainerType nullContainer() {
+ return (ContainerType) NULL_CONTAINER; // relies on erasure
+ }
+
+ private ContainerType toNonNullContainer(ContainerType container) {
+ return (container != null) ? container : nullContainer();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
new file mode 100644
index 0000000..2538bc6
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.event.basic.BasicConfigKey;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+/**
+ * A group of items that are contained within a given (dynamically changing) set of containers.
+ *
+ * The {@link setContainers(Group)} sets the group of containers. The membership of that group
+ * is dynamically tracked.
+ *
+ * When containers are added/removed, or when an items is added/removed, or when an {@link Moveable} item
+ * is moved then the membership of this group of items is automatically updated accordingly.
+ *
+ * For example: in Monterey, this could be used to track the actors that are within a given cluster of venues.
+ */
+@ImplementedBy(ItemsInContainersGroupImpl.class)
+public interface ItemsInContainersGroup extends DynamicGroup {
+
+ @SetFromFlag("itemFilter")
+ public static final ConfigKey<Predicate<? super Entity>> ITEM_FILTER = new BasicConfigKey(
+ Predicate.class, "itemsInContainerGroup.itemFilter", "Filter for which items within the containers will automatically be in group", Predicates.alwaysTrue());
+
+ public void setContainers(Group containerGroup);
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
new file mode 100644
index 0000000..225a2b6
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.AbstractGroup;
+import brooklyn.entity.basic.DynamicGroupImpl;
+
+import com.google.common.base.Predicate;
+
+/**
+ * A group of items that are contained within a given (dynamically changing) set of containers.
+ *
+ * The {@link setContainers(Group)} sets the group of containers. The membership of that group
+ * is dynamically tracked.
+ *
+ * When containers are added/removed, or when an items is added/removed, or when an {@link Moveable} item
+ * is moved then the membership of this group of items is automatically updated accordingly.
+ *
+ * For example: in Monterey, this could be used to track the actors that are within a given cluster of venues.
+ */
+public class ItemsInContainersGroupImpl extends DynamicGroupImpl implements ItemsInContainersGroup {
+
+ // TODO Inefficient: will not scale to many 1000s of items
+
+ private static final Logger LOG = LoggerFactory.getLogger(ItemsInContainersGroup.class);
+
+ private Group containerGroup;
+
+ private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
+ @Override
+ public void onEvent(SensorEvent<Object> event) {
+ Entity source = event.getSource();
+ Object value = event.getValue();
+ Sensor sensor = event.getSensor();
+
+ if (sensor.equals(AbstractGroup.MEMBER_ADDED)) {
+ onContainerAdded((Entity) value);
+ } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) {
+ onContainerRemoved((Entity) value);
+ } else if (sensor.equals(Movable.CONTAINER)) {
+ onItemMoved((Movable)source, (BalanceableContainer<?>) value);
+ } else {
+ throw new IllegalStateException("Unhandled event type "+sensor+": "+event);
+ }
+ }
+ };
+
+ public ItemsInContainersGroupImpl() {
+ }
+
+ @Override
+ public void init() {
+ super.init();
+ setEntityFilter(new Predicate<Entity>() {
+ @Override public boolean apply(Entity e) {
+ return acceptsEntity(e);
+ }});
+ }
+
+ protected Predicate<? super Entity> getItemFilter() {
+ return getConfig(ITEM_FILTER);
+ }
+
+ @Override
+ protected boolean acceptsEntity(Entity e) {
+ if (e instanceof Movable) {
+ return acceptsItem((Movable)e, ((Movable)e).getAttribute(Movable.CONTAINER));
+ }
+ return false;
+ }
+
+ boolean acceptsItem(Movable e, BalanceableContainer c) {
+ return (containerGroup != null && c != null) ? getItemFilter().apply(e) && containerGroup.hasMember(c) : false;
+ }
+
+ @Override
+ public void setContainers(Group containerGroup) {
+ this.containerGroup = containerGroup;
+ subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
+ subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
+ subscribe(null, Movable.CONTAINER, eventHandler);
+
+ if (LOG.isTraceEnabled()) LOG.trace("{} scanning entities on container group set", this);
+ rescanEntities();
+ }
+
+ private void onContainerAdded(Entity newContainer) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} rescanning entities on container {} added", this, newContainer);
+ rescanEntities();
+ }
+
+ private void onContainerRemoved(Entity oldContainer) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} rescanning entities on container {} removed", this, oldContainer);
+ rescanEntities();
+ }
+
+ protected void onEntityAdded(Entity item) {
+ if (acceptsEntity(item)) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} adding new item {}", this, item);
+ addMember(item);
+ }
+ }
+
+ protected void onEntityRemoved(Entity item) {
+ if (removeMember(item)) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} removing deleted item {}", this, item);
+ }
+ }
+
+ private void onItemMoved(Movable item, BalanceableContainer container) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} processing moved item {}, to container {}", new Object[] {this, item, container});
+ if (hasMember(item)) {
+ if (!acceptsItem(item, container)) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} removing moved item {} from group, as new container {} is not a member", new Object[] {this, item, container});
+ removeMember(item);
+ }
+ } else {
+ if (acceptsItem(item, container)) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} adding moved item {} to group, as new container {} is a member", new Object[] {this, item, container});
+ addMember(item);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
new file mode 100644
index 0000000..f437fc8
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static brooklyn.util.JavaGroovyEquivalents.elvis;
+import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.EntityInternal;
+
+import org.apache.brooklyn.policy.autoscaling.AutoScalerPolicy;
+
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * <p>Policy that is attached to a pool of "containers", each of which can host one or more migratable "items".
+ * The policy monitors the workrates of the items and effects migrations in an attempt to ensure that the containers
+ * are all sufficiently utilized without any of them being overloaded.
+ *
+ * <p>The particular sensor that defines the items' workrates is specified when the policy is constructed. High- and
+ * low-thresholds are defined as <strong>configuration keys</strong> on each of the container entities in the pool:
+ * for an item sensor named {@code foo.bar.sensorName}, the corresponding container config keys would be named
+ * {@code foo.bar.sensorName.threshold.low} and {@code foo.bar.sensorName.threshold.high}.
+ *
+ * <p>In addition to balancing items among the available containers, this policy causes the pool Entity to emit
+ * {@code POOL_COLD} and {@code POOL_HOT} events when it is determined that there is a surplus or shortfall
+ * of container resource in the pool respectively. These events may be consumed by a separate policy that is capable
+ * of resizing the container pool.
+ */
+ // removed from catalog because it cannot currently be configured via catalog mechanisms
+ // PolicySpec.create fails due to no no-arg constructor
+ // TODO make metric and model things which can be initialized from config then reinstate in catalog
+//@Catalog(name="Load Balancer", description="Policy that is attached to a pool of \"containers\", each of which "
+// + "can host one or more migratable \"items\". The policy monitors the workrates of the items and effects "
+// + "migrations in an attempt to ensure that the containers are all sufficiently utilized without any of "
+// + "them being overloaded.")
+public class LoadBalancingPolicy<NodeType extends Entity, ItemType extends Movable> extends AbstractPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicy.class);
+
+ @SetFromFlag(defaultVal="100")
+ private long minPeriodBetweenExecs;
+
+ private final AttributeSensor<? extends Number> metric;
+ private final String lowThresholdConfigKeyName;
+ private final String highThresholdConfigKeyName;
+ private final BalanceablePoolModel<NodeType, ItemType> model;
+ private final BalancingStrategy<NodeType, ItemType> strategy;
+ private BalanceableWorkerPool poolEntity;
+
+ private volatile ScheduledExecutorService executor;
+ private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+ private volatile long executorTime = 0;
+
+ private int lastEmittedDesiredPoolSize = 0;
+ private static enum TemperatureStates { COLD, HOT }
+ private TemperatureStates lastEmittedPoolTemperature = null; // "cold" or "hot"
+
+ private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void onEvent(SensorEvent<Object> event) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", LoadBalancingPolicy.this, event);
+ Entity source = event.getSource();
+ Object value = event.getValue();
+ Sensor sensor = event.getSensor();
+
+ if (sensor.equals(metric)) {
+ onItemMetricUpdate((ItemType)source, ((Number) value).doubleValue(), true);
+ } else if (sensor.equals(BalanceableWorkerPool.CONTAINER_ADDED)) {
+ onContainerAdded((NodeType) value, true);
+ } else if (sensor.equals(BalanceableWorkerPool.CONTAINER_REMOVED)) {
+ onContainerRemoved((NodeType) value, true);
+ } else if (sensor.equals(BalanceableWorkerPool.ITEM_ADDED)) {
+ BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value;
+ onItemAdded((ItemType)pair.item, (NodeType)pair.container, true);
+ } else if (sensor.equals(BalanceableWorkerPool.ITEM_REMOVED)) {
+ BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value;
+ onItemRemoved((ItemType)pair.item, (NodeType)pair.container, true);
+ } else if (sensor.equals(BalanceableWorkerPool.ITEM_MOVED)) {
+ BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value;
+ onItemMoved((ItemType)pair.item, (NodeType)pair.container, true);
+ }
+ }
+ };
+
+ public LoadBalancingPolicy() {
+ this(null, null);
+ }
+
+ public LoadBalancingPolicy(AttributeSensor<? extends Number> metric,
+ BalanceablePoolModel<NodeType, ItemType> model) {
+ this(MutableMap.of(), metric, model);
+ }
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public LoadBalancingPolicy(Map props, AttributeSensor<? extends Number> metric,
+ BalanceablePoolModel<NodeType, ItemType> model) {
+
+ super(props);
+ this.metric = metric;
+ this.lowThresholdConfigKeyName = metric.getName()+".threshold.low";
+ this.highThresholdConfigKeyName = metric.getName()+".threshold.high";
+ this.model = model;
+ this.strategy = new BalancingStrategy(getDisplayName(), model); // TODO: extract interface, inject impl
+
+ // TODO Should re-use the execution manager's thread pool, somehow
+ executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setEntity(EntityLocal entity) {
+ Preconditions.checkArgument(entity instanceof BalanceableWorkerPool, "Provided entity must be a BalanceableWorkerPool");
+ super.setEntity(entity);
+ this.poolEntity = (BalanceableWorkerPool) entity;
+
+ // Detect when containers are added to or removed from the pool.
+ subscribe(poolEntity, BalanceableWorkerPool.CONTAINER_ADDED, eventHandler);
+ subscribe(poolEntity, BalanceableWorkerPool.CONTAINER_REMOVED, eventHandler);
+ subscribe(poolEntity, BalanceableWorkerPool.ITEM_ADDED, eventHandler);
+ subscribe(poolEntity, BalanceableWorkerPool.ITEM_REMOVED, eventHandler);
+ subscribe(poolEntity, BalanceableWorkerPool.ITEM_MOVED, eventHandler);
+
+ // Take heed of any extant containers.
+ for (Entity container : poolEntity.getContainerGroup().getMembers()) {
+ onContainerAdded((NodeType)container, false);
+ }
+ for (Entity item : poolEntity.getItemGroup().getMembers()) {
+ onItemAdded((ItemType)item, (NodeType)item.getAttribute(Movable.CONTAINER), false);
+ }
+
+ scheduleRebalance();
+ }
+
+ @Override
+ public void suspend() {
+ // TODO unsubscribe from everything? And resubscribe on resume?
+ super.suspend();
+ if (executor != null) executor.shutdownNow();;
+ executorQueued.set(false);
+ }
+
+ @Override
+ public void resume() {
+ super.resume();
+ executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+ executorTime = 0;
+ executorQueued.set(false);
+ }
+
+ private ThreadFactory newThreadFactory() {
+ return new ThreadFactoryBuilder()
+ .setNameFormat("brooklyn-followthesunpolicy-%d")
+ .build();
+ }
+
+ private void scheduleRebalance() {
+ if (isRunning() && executorQueued.compareAndSet(false, true)) {
+ long now = System.currentTimeMillis();
+ long delay = Math.max(0, (executorTime + minPeriodBetweenExecs) - now);
+
+ executor.schedule(new Runnable() {
+ @SuppressWarnings("rawtypes")
+ public void run() {
+ try {
+ executorTime = System.currentTimeMillis();
+ executorQueued.set(false);
+ strategy.rebalance();
+
+ if (LOG.isDebugEnabled()) LOG.debug("{} post-rebalance: poolSize={}; workrate={}; lowThreshold={}; " +
+ "highThreshold={}", new Object[] {this, model.getPoolSize(), model.getCurrentPoolWorkrate(),
+ model.getPoolLowThreshold(), model.getPoolHighThreshold()});
+
+ if (model.isCold()) {
+ Map eventVal = ImmutableMap.of(
+ AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, model.getPoolSize(),
+ AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, model.getCurrentPoolWorkrate(),
+ AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, model.getPoolLowThreshold(),
+ AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, model.getPoolHighThreshold());
+
+ ((EntityLocal)poolEntity).emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, eventVal);
+
+ if (LOG.isInfoEnabled()) {
+ int desiredPoolSize = (int) Math.ceil(model.getCurrentPoolWorkrate() / (model.getPoolLowThreshold()/model.getPoolSize()));
+ if (desiredPoolSize != lastEmittedDesiredPoolSize || lastEmittedPoolTemperature != TemperatureStates.COLD) {
+ LOG.info("{} emitted COLD (suggesting {}): {}", new Object[] {this, desiredPoolSize, eventVal});
+ lastEmittedDesiredPoolSize = desiredPoolSize;
+ lastEmittedPoolTemperature = TemperatureStates.COLD;
+ }
+ }
+
+ } else if (model.isHot()) {
+ Map eventVal = ImmutableMap.of(
+ AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, model.getPoolSize(),
+ AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, model.getCurrentPoolWorkrate(),
+ AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, model.getPoolLowThreshold(),
+ AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, model.getPoolHighThreshold());
+
+ ((EntityLocal)poolEntity).emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, eventVal);
+
+ if (LOG.isInfoEnabled()) {
+ int desiredPoolSize = (int) Math.ceil(model.getCurrentPoolWorkrate() / (model.getPoolHighThreshold()/model.getPoolSize()));
+ if (desiredPoolSize != lastEmittedDesiredPoolSize || lastEmittedPoolTemperature != TemperatureStates.HOT) {
+ LOG.info("{} emitted HOT (suggesting {}): {}", new Object[] {this, desiredPoolSize, eventVal});
+ lastEmittedDesiredPoolSize = desiredPoolSize;
+ lastEmittedPoolTemperature = TemperatureStates.HOT;
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ if (isRunning()) {
+ LOG.error("Error rebalancing", e);
+ } else {
+ LOG.debug("Error rebalancing, but no longer running", e);
+ }
+ }
+ }},
+ delay,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ // TODO Can get duplicate onContainerAdded events.
+ // I presume it's because we subscribe and then iterate over the extant containers.
+ // Solution would be for subscription to give you events for existing / current value(s).
+ // Also current impl messes up single-threaded updates model: the setEntity is a different thread than for subscription events.
+ private void onContainerAdded(NodeType newContainer, boolean rebalanceNow) {
+ Preconditions.checkArgument(newContainer instanceof BalanceableContainer, "Added container must be a BalanceableContainer");
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of container {}", this, newContainer);
+ // Low and high thresholds for the metric we're interested in are assumed to be present
+ // in the container's configuration.
+ Number lowThreshold = (Number) findConfigValue(newContainer, lowThresholdConfigKeyName);
+ Number highThreshold = (Number) findConfigValue(newContainer, highThresholdConfigKeyName);
+ if (lowThreshold == null || highThreshold == null) {
+ LOG.warn(
+ "Balanceable container '"+newContainer+"' does not define low- and high- threshold configuration keys: '"+
+ lowThresholdConfigKeyName+"' and '"+highThresholdConfigKeyName+"', skipping");
+ return;
+ }
+
+ model.onContainerAdded(newContainer, lowThreshold.doubleValue(), highThreshold.doubleValue());
+
+ // Note: no need to scan the container for items; they will appear via the ITEM_ADDED events.
+ // Also, must abide by any item-filters etc defined in the pool.
+
+ if (rebalanceNow) scheduleRebalance();
+ }
+
+ private static Object findConfigValue(Entity entity, String configKeyName) {
+ Map<ConfigKey<?>, Object> config = ((EntityInternal)entity).getAllConfig();
+ for (Entry<ConfigKey<?>, Object> entry : config.entrySet()) {
+ if (configKeyName.equals(entry.getKey().getName()))
+ return entry.getValue();
+ }
+ return null;
+ }
+
+ // TODO Receiving duplicates of onContainerRemoved (e.g. when running LoadBalancingInmemorySoakTest)
+ private void onContainerRemoved(NodeType oldContainer, boolean rebalanceNow) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of container {}", this, oldContainer);
+ model.onContainerRemoved(oldContainer);
+ if (rebalanceNow) scheduleRebalance();
+ }
+
+ private void onItemAdded(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
+ Preconditions.checkArgument(item instanceof Movable, "Added item "+item+" must implement Movable");
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of item {} in container {}", new Object[] {this, item, parentContainer});
+
+ subscribe(item, metric, eventHandler);
+
+ // Update the model, including the current metric value (if any).
+ boolean immovable = (Boolean)elvis(item.getConfig(Movable.IMMOVABLE), false);
+ Number currentValue = item.getAttribute(metric);
+ model.onItemAdded(item, parentContainer, immovable);
+ if (currentValue != null)
+ model.onItemWorkrateUpdated(item, currentValue.doubleValue());
+
+ if (rebalanceNow) scheduleRebalance();
+ }
+
+ private void onItemRemoved(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of item {}", this, item);
+ unsubscribe(item);
+ model.onItemRemoved(item);
+ if (rebalanceNow) scheduleRebalance();
+ }
+
+ private void onItemMoved(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording moving of item {} to {}", new Object[] {this, item, parentContainer});
+ model.onItemMoved(item, parentContainer);
+ if (rebalanceNow) scheduleRebalance();
+ }
+
+ private void onItemMetricUpdate(ItemType item, double newValue, boolean rebalanceNow) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording metric update for item {}, new value {}", new Object[] {this, item, newValue});
+ model.onItemWorkrateUpdated(item, newValue);
+ if (rebalanceNow) scheduleRebalance();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java
new file mode 100644
index 0000000..b58b8d2
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import org.apache.brooklyn.api.location.Location;
+
+/**
+ * Temporary stub to resolve dependencies in ported LoadBalancingPolicy.
+ */
+public class LocationConstraint {
+ public boolean isPermitted(Location l) { return true; } // TODO
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java
new file mode 100644
index 0000000..68ba43d
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.annotation.EffectorParam;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicConfigKey;
+
+
+/**
+ * Represents an item that can be migrated between balanceable containers.
+ */
+public interface Movable extends Entity {
+
+ @SetFromFlag("immovable")
+ public static ConfigKey<Boolean> IMMOVABLE = new BasicConfigKey<Boolean>(
+ Boolean.class, "movable.item.immovable", "Indicates whether this item instance is immovable, so cannot be moved by policies", false);
+
+ public static BasicAttributeSensor<BalanceableContainer> CONTAINER = new BasicAttributeSensor<BalanceableContainer>(
+ BalanceableContainer.class, "movable.item.container", "The container that this item is on");
+
+ public static final MethodEffector<Void> MOVE = new MethodEffector<Void>(Movable.class, "move");
+
+ public String getContainerId();
+
+ @Effector(description="Moves this entity to the given container")
+ public void move(@EffectorParam(name="destination") Entity destination);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
new file mode 100644
index 0000000..b5c6a28
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Set;
+
+/**
+ * Provides conveniences for searching for hot/cold containers in a provided pool model.
+ * Ported from Monterey v3, with irrelevant bits removed.
+ */
+public class PolicyUtilForPool<ContainerType, ItemType> {
+
+ private final BalanceablePoolModel<ContainerType, ItemType> model;
+
+
+ public PolicyUtilForPool (BalanceablePoolModel<ContainerType, ItemType> model) {
+ this.model = model;
+ }
+
+ public ContainerType findColdestContainer(Set<ContainerType> excludedContainers) {
+ return findColdestContainer(excludedContainers, null);
+ }
+
+ /**
+ * Identifies the container with the maximum spare capacity (highThreshold - currentWorkrate),
+ * returns null if none of the model's nodes has spare capacity.
+ */
+ public ContainerType findColdestContainer(Set<ContainerType> excludedContainers, LocationConstraint locationConstraint) {
+ double maxSpareCapacity = 0;
+ ContainerType coldest = null;
+
+ for (ContainerType c : model.getPoolContents()) {
+ if (excludedContainers.contains(c))
+ continue;
+ if (locationConstraint != null && !locationConstraint.isPermitted(model.getLocation(c)))
+ continue;
+
+ double highThreshold = model.getHighThreshold(c);
+ double totalWorkrate = model.getTotalWorkrate(c);
+ double spareCapacity = highThreshold - totalWorkrate;
+
+ if (highThreshold == -1 || totalWorkrate == -1) {
+ continue; // container presumably has been removed
+ }
+ if (spareCapacity > maxSpareCapacity) {
+ maxSpareCapacity = spareCapacity;
+ coldest = c;
+ }
+ }
+ return coldest;
+ }
+
+ /**
+ * Identifies the container with the maximum overshoot (currentWorkrate - highThreshold),
+ * returns null if none of the model's nodes has an overshoot.
+ */
+ public ContainerType findHottestContainer(Set<ContainerType> excludedContainers) {
+ double maxOvershoot = 0;
+ ContainerType hottest = null;
+
+ for (ContainerType c : model.getPoolContents()) {
+ if (excludedContainers.contains(c))
+ continue;
+
+ double totalWorkrate = model.getTotalWorkrate(c);
+ double highThreshold = model.getHighThreshold(c);
+ double overshoot = totalWorkrate - highThreshold;
+
+ if (highThreshold == -1 || totalWorkrate == -1) {
+ continue; // container presumably has been removed
+ }
+ if (overshoot > maxOvershoot) {
+ maxOvershoot = overshoot;
+ hottest = c;
+ }
+ }
+ return hottest;
+ }
+
+}
\ No newline at end of file