You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:11 UTC

[13/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming package policy

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java
new file mode 100644
index 0000000..9d3968b
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an abstract algorithm for optimally balancing worker "items" among several "containers" based on the workloads
+ * of the items, and corresponding high- and low-thresholds on the containers.
+ * 
+ * TODO: extract interface, provide default implementation
+ * TODO: remove legacy code comments
+ */
+public class BalancingStrategy<NodeType extends Entity, ItemType extends Movable> {
+
+    // FIXME Bad idea to use MessageFormat.format in this way; if toString of entity contains
+    // special characters interpreted by MessageFormat, then it will all break!
+    
+    // This is a modified version of the watermark elasticity policy from Monterey v3.
+    
+    private static final Logger LOG = LoggerFactory.getLogger(BalancingStrategy.class);
+    
+    private static final int MAX_MIGRATIONS_PER_BALANCING_NODE = 20; // arbitrary (Splodge)
+    private static final boolean BALANCE_COLD_PULLS_IN_SAME_RUN_AS_HOT_PUSHES = false;
+    
+    private final String name;
+    private final BalanceablePoolModel<NodeType, ItemType> model;
+    private final PolicyUtilForPool<NodeType, ItemType> helper;
+//    private boolean loggedColdestTooHigh = false;
+//    private boolean loggedHottestTooLow = false;
+    
+    
+    public BalancingStrategy(String name, BalanceablePoolModel<NodeType, ItemType> model) {
+        this.name = name;
+        this.model = model;
+        this.helper = new PolicyUtilForPool<NodeType, ItemType>(model);
+    }
+    
+    public String getName() {
+        return name;
+    }
+    
+    public void rebalance() {
+        checkAndApplyOn(model.getPoolContents());
+    }
+    
+    public int getMaxMigrationsPerBalancingNode() {
+        return MAX_MIGRATIONS_PER_BALANCING_NODE;
+    }
+    
+    public BalanceablePoolModel<NodeType, ItemType> getDataProvider() {
+        return model;
+    }
+    
+    // This was the entry point for the legacy policy.
+    private void checkAndApplyOn(final Collection<NodeType> dirtyNodesSupplied) {
+        Collection<NodeType> dirtyNodes = dirtyNodesSupplied;
+        
+//        if (startTime + FORCE_ALL_NODES_IF_DELAYED_FOR_MILLIS < System.currentTimeMillis()) {
+//            Set<NodeType> allNodes = new LinkedHashSet<NodeType>();
+//            allNodes.addAll(dirtyNodes);
+//            dirtyNodes = allNodes;
+//            allNodes.addAll(getDataProvider().getPoolContents());
+//            if (LOG.isDebugEnabled())
+//                LOG.debug("policy "+getDataProvider().getAbbr()+" running after delay ("+
+//                        TimeUtils.makeTimeString(System.currentTimeMillis() - startTime)+", analysing all nodes: "+
+//                        dirtyNodes);
+//        }
+        
+//        nodeFinder.optionalCachedNodesWithBacklogDetected.clear();
+//        boolean gonnaGrow = growPool(dirtyNodes);
+//        getDataProvider().waitForAllTransitionsComplete();
+        boolean gonnaGrow = false;
+        
+        Set<NodeType> nonFrozenDirtyNodes = new LinkedHashSet<NodeType>(dirtyNodes);
+//        boolean gonnaShrink = false;
+//        if (!gonnaGrow && !DONT_SHRINK_UNLESS_BALANCED) {
+//            gonnaShrink = shrinkPool(nonFrozenDirtyNodes);
+//            getDataProvider().waitForAllTransitionsComplete();
+//        }
+        
+        if (getDataProvider().getPoolSize() >= 2) {
+            boolean didBalancing = false;
+            for (NodeType a : nonFrozenDirtyNodes) {
+                didBalancing |= balanceItemsOnNodesInQuestion(a, gonnaGrow);
+//                getMutator().waitForAllTransitionsComplete();
+            }
+            if (didBalancing) {
+                return;
+            }
+        }
+        
+//        if (!gonnaGrow && DONT_SHRINK_UNLESS_BALANCED) {
+//            gonnaShrink = shrinkPool(nonFrozenDirtyNodes);
+//            getDataProvider().waitForAllTransitionsComplete();
+//        }
+        
+//        if (gonnaGrow || gonnaShrink)
+//        //don't log 'doing nothing' message
+//        return;
+        
+//        if (LOG.isDebugEnabled()) {
+//            double poolTotal = getDataProvider().getPoolPredictedWorkrateTotal();
+//            int poolSize = getDataProvider().getPoolPredictedSize();
+//            LOG.debug(MessageFormat.format("policy "+getDataProvider().getAbbr()+" did nothing; pool workrate is {0,number,#.##} x {1} nodes",
+//                    1.0*poolTotal/poolSize, poolSize));
+//        }
+    }
+    
+    protected boolean balanceItemsOnNodesInQuestion(NodeType questionedNode, boolean gonnaGrow) {
+        double questionedNodeTotalWorkrate = getDataProvider().getTotalWorkrate(questionedNode);
+        
+        boolean balanced = balanceItemsOnHotNode(questionedNode, questionedNodeTotalWorkrate, gonnaGrow);
+//        getMutator().waitForAllTransitionsComplete();
+        
+        if (!balanced || BALANCE_COLD_PULLS_IN_SAME_RUN_AS_HOT_PUSHES) {
+            balanced |= balanceItemsOnColdNode(questionedNode, questionedNodeTotalWorkrate, gonnaGrow);
+//            getMutator().waitForAllTransitionsComplete();
+        }
+        if (balanced)
+            return true;
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug( MessageFormat.format(
+                    "policy "+getDataProvider().getName()+" not balancing "+questionedNode+"; " +
+                    "its workrate {0,number,#.##} is acceptable (or cannot be balanced)", questionedNodeTotalWorkrate) );
+        }
+        return false;
+    }
+    
+    protected boolean balanceItemsOnHotNode(NodeType node, double nodeWorkrate, boolean gonnaGrow) {
+        double originalNodeWorkrate = nodeWorkrate;
+        int migrationCount = 0;
+        int iterationCount = 0;
+        
+        Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>();
+        Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>();
+        
+//        if (nodeFinder.config.COUNT_BACKLOG_AS_EXTRA_WORKRATE) {
+//            int backlog = nodeFinder.getBacklogQueueLength(questionedNode);
+//            if (backlog>0) {
+//                Level l = backlog>1000 ? Level.INFO : backlog>10 ? Level.FINE : Level.FINER;
+//                if (LOG.isLoggable(l)) {
+//                    LOG.log( l, MessageFormat.format(
+//                            "policy "+getDataProvider().getAbbr()+" detected queue at node "+questionedNode+", " +
+//                            "inflating workrate {0,number,#.##} + "+backlog, questionedNodeTotalWorkrate) );
+//                }
+//                questionedNodeTotalWorkrate += backlog;
+//            }
+//        }
+        
+        Double highThreshold = model.getHighThreshold(node);
+        if (highThreshold == -1) {
+            // node presumably has been removed; TODO log
+            return false;
+        }
+        
+        while (nodeWorkrate > highThreshold && migrationCount < getMaxMigrationsPerBalancingNode()) {
+            iterationCount++;
+            
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(MessageFormat.format(
+                        "policy "+getDataProvider().getName()+" considering balancing hot node "+node+" " +
+                        "(workrate {0,number,#.##}); iteration "+iterationCount, nodeWorkrate) );
+            }
+            
+            // move from hot node, to coldest
+            
+            NodeType coldNode = helper.findColdestContainer(nodesChecked);
+            
+            if (coldNode == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" not balancing hot node "+node+" " +
+                            "(workrate {0,number,#.##}); no coldest node available", nodeWorkrate) );
+                }
+                break;
+            }
+            
+            if (coldNode.equals(node)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" not balancing hot node "+node+" " +
+                            "(workrate {0,number,#.##}); it is also the coldest modifiable node", nodeWorkrate) );
+                }
+                break;
+            }
+            
+            double coldNodeWorkrate = getDataProvider().getTotalWorkrate(coldNode);
+            boolean emergencyLoadBalancing = coldNodeWorkrate < nodeWorkrate*2/3;
+            double coldNodeHighThreshold = model.getHighThreshold(coldNode);
+            if (coldNodeWorkrate >= coldNodeHighThreshold && !emergencyLoadBalancing) {
+                //don't balance if all nodes are approx equally hot (and very hot)
+                
+                //for now, stop warning if it is a recurring theme!
+//                Level level = loggedColdestTooHigh ? Level.FINER : Level.INFO;
+//                LOG.log(level, MessageFormat.format(
+//                        "policy "+getDataProvider().getAbbr()+" not balancing hot node "+questionedNode+" " +
+//                        "(workrate {0,number,#.##}); coldest node "+coldNode+" has workrate {1,number,#.##} also too high"+
+//                        (loggedColdestTooHigh ? "" : " (future cases will be logged at finer)"),
+//                        questionedNodeTotalWorkrate, coldNodeWorkrate) );
+//                loggedColdestTooHigh = true;
+                break;
+            }
+            double poolLowWatermark = Double.MAX_VALUE; // TODO
+            if (gonnaGrow && (coldNodeWorkrate >= poolLowWatermark && !emergencyLoadBalancing)) {
+                //if we're growing the pool, refuse to balance unless the cold node is indeed very cold, or hot node very hot
+                
+                //for now, stop warning if it is a recurring theme!
+//                Level level = loggedColdestTooHigh ? Level.FINER : Level.INFO;
+//                LOG.log(level, MessageFormat.format(
+//                        "policy "+getDataProvider().getAbbr()+" not balancing hot node "+questionedNode+" " +
+//                        "(workrate {0,number,#.##}); coldest node "+coldNode+" has workrate {1,number,#.##} also too high to accept while pool is growing" +
+//                        (loggedColdestTooHigh ? "" : " (future cases will be logged at finer)"),
+//                        questionedNodeTotalWorkrate, coldNodeWorkrate) );
+//                loggedColdestTooHigh = true;
+                break;
+            }
+            
+            String questionedNodeName = getDataProvider().getName(node);
+            String coldNodeName = getDataProvider().getName(coldNode);
+            Location coldNodeLocation = getDataProvider().getLocation(coldNode);
+            
+            if (LOG.isDebugEnabled()) {
+                LOG.debug( MessageFormat.format(
+                        "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
+                        "("+node+", workrate {0,number,#.##}), " +
+                        "considering target "+coldNodeName+" ("+coldNode+", workrate {1,number,#.##})",
+                        nodeWorkrate, coldNodeWorkrate) );
+            }
+            
+            double idealSizeToMove = (nodeWorkrate - coldNodeWorkrate) / 2;
+            //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount
+            
+            if (idealSizeToMove + coldNodeWorkrate > coldNodeHighThreshold)
+                idealSizeToMove = coldNodeHighThreshold - coldNodeWorkrate;
+            
+            
+            double maxSizeToMoveIdeally = Math.min(
+                    nodeWorkrate/2 + 0.00001,
+                    //permit it to exceed node high if there is no alternative (this is 'max' not 'ideal'),
+                    //so long as it still gives a significant benefit
+                    //                      getConfiguration().nodeHighWaterMark - coldNodeWorkrate,
+                    (nodeWorkrate - coldNodeWorkrate)*0.9);
+            double maxSizeToMoveIfNoSmallButLarger = nodeWorkrate*3/4;
+            
+            Map<ItemType, Double> questionedNodeItems = getDataProvider().getItemWorkrates(node);
+            if (questionedNodeItems == null) {
+                if (LOG.isDebugEnabled())
+                    LOG.debug(MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
+                            "("+node+", workrate {0,number,#.##}), abandoned; " +
+                            "item report for " + questionedNodeName + " unavailable",
+                            nodeWorkrate));
+                break;
+            }
+            ItemType itemToMove = findBestItemToMove(questionedNodeItems, idealSizeToMove, maxSizeToMoveIdeally,
+                    maxSizeToMoveIfNoSmallButLarger, itemsMoved, coldNodeLocation);
+            
+            if (itemToMove == null) {
+                if (LOG.isDebugEnabled())
+                    LOG.debug(MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " +
+                            "("+node+", workrate {0,number,#.##}), ending; " +
+                            "no suitable segment found " +
+                            "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " +
+                            "moving to coldest node "+coldNodeName+" ("+coldNode+", workrate {3,number,#.##}); available items: {4}",
+                            nodeWorkrate, idealSizeToMove, maxSizeToMoveIdeally, coldNodeWorkrate, questionedNodeItems) );
+                break;
+            }
+            
+            itemsMoved.add(itemToMove);
+            double itemWorkrate = questionedNodeItems.get(itemToMove);
+            
+//            if (LOG.isLoggable(Level.FINE))
+//                LOG.fine( MessageFormat.format(
+//                        "policy "+getDataProvider().getAbbr()+" balancing hot node "+questionedNodeName+" " +
+//                        "(workrate {0,number,#.##}, too high), transitioning " + itemToMove +
+//                        " to "+coldNodeName+" (workrate {1,number,#.##}, now += {2,number,#.##})",
+//                        questionedNodeTotalWorkrate, coldNodeWorkrate, segmentRate) );
+            
+            nodeWorkrate -= itemWorkrate;
+            coldNodeWorkrate += itemWorkrate;
+            
+            moveItem(itemToMove, node, coldNode);
+            ++migrationCount;
+        }
+        
+        if (LOG.isDebugEnabled()) {
+            if (iterationCount == 0) {
+                if (LOG.isTraceEnabled())
+                    LOG.trace( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" balancing if hot finished at node "+node+"; " +
+                            "workrate {0,number,#.##} not hot",
+                            originalNodeWorkrate) );
+            }
+            else if (itemsMoved.isEmpty()) {
+                if (LOG.isTraceEnabled())
+                    LOG.trace( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" balancing finished at hot node "+node+" " +
+                            "(workrate {0,number,#.##}); no way to improve it",
+                            originalNodeWorkrate) );
+            } else {
+                LOG.debug( MessageFormat.format(
+                        "policy "+getDataProvider().getName()+" balancing finished at hot node "+node+"; " +
+                        "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " +
+                        "by moving off {3}",
+                        originalNodeWorkrate,
+                        nodeWorkrate,
+                        getDataProvider().getTotalWorkrate(node),
+                        itemsMoved
+                        ) );
+            }
+        }
+        return !itemsMoved.isEmpty();
+    }
+    
+    protected boolean balanceItemsOnColdNode(NodeType questionedNode, double questionedNodeTotalWorkrate, boolean gonnaGrow) {
+        // Abort if the node has pending adjustments.
+        Map<ItemType, Double> items = getDataProvider().getItemWorkrates(questionedNode);
+        if (items == null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug( MessageFormat.format(
+                        "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
+                        "(workrate {0,number,#.##}); workrate breakdown unavailable (probably reverting)",
+                        questionedNodeTotalWorkrate) );
+            }
+            return false;
+        }
+        for (ItemType item : items.keySet()) {
+            if (!model.isItemMoveable(item)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
+                            "(workrate {0,number,#.##}); at least one item ("+item+") is in flux",
+                            questionedNodeTotalWorkrate) );
+                }
+                return false;
+            }
+        }
+        
+        double originalQuestionedNodeTotalWorkrate = questionedNodeTotalWorkrate;
+        int numMigrations = 0;
+        
+        Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>();
+        Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>();
+        
+        int iters = 0;
+        Location questionedLocation = getDataProvider().getLocation(questionedNode);
+        
+        double lowThreshold = model.getLowThreshold(questionedNode);
+        while (questionedNodeTotalWorkrate < lowThreshold) {
+            iters++;
+            
+            if (LOG.isDebugEnabled()) {
+                LOG.debug( MessageFormat.format(
+                        "policy "+getDataProvider().getName()+" considering balancing cold node "+questionedNode+" " +
+                        "(workrate {0,number,#.##}); iteration "+iters, questionedNodeTotalWorkrate));
+            }
+            
+            // move from cold node, to hottest
+            
+            NodeType hotNode = helper.findHottestContainer(nodesChecked);
+            
+            if (hotNode == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
+                            "(workrate {0,number,#.##}); no hottest node available", questionedNodeTotalWorkrate) );
+                }
+                
+                break;
+            }
+            if (hotNode.equals(questionedNode)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
+                            "(workrate {0,number,#.##}); it is also the hottest modfiable node", questionedNodeTotalWorkrate) );
+                }
+                break;
+            }
+            
+            
+            double hotNodeWorkrate = getDataProvider().getTotalWorkrate(hotNode);
+            double hotNodeLowThreshold = model.getLowThreshold(hotNode);
+            double hotNodeHighThreshold = model.getHighThreshold(hotNode);
+            boolean emergencyLoadBalancing = false;  //doesn't apply to cold
+            if (hotNodeWorkrate == -1 || hotNodeLowThreshold == -1 || hotNodeHighThreshold == -1) {
+                // hotNode presumably has been removed; TODO log
+                break;
+            }
+            if (hotNodeWorkrate <= hotNodeLowThreshold && !emergencyLoadBalancing) {
+                //don't balance if all nodes are too low
+                
+                //for now, stop warning if it is a recurring theme!
+//                Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
+//                LOG.log(level, MessageFormat.format(
+//                        "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
+//                        "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low" +
+//                        (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
+//                        questionedNodeTotalWorkrate, hotNodeWorkrate) );
+//                loggedHottestTooLow = true;
+                break;
+            }
+            if (gonnaGrow && (hotNodeWorkrate <= hotNodeHighThreshold && !emergencyLoadBalancing)) {
+                //if we're growing the pool, refuse to balance unless the hot node is quite hot
+                
+                //again, stop warning if it is a recurring theme!
+//                Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
+//                LOG.log(level, MessageFormat.format(
+//                        "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
+//                        "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low to accept while pool is growing"+
+//                        (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
+//                        questionedNodeTotalWorkrate, hotNodeWorkrate) );
+//                loggedHottestTooLow = true;
+                break;
+            }
+            
+            String questionedNodeName = getDataProvider().getName(questionedNode);
+            String hotNodeName = getDataProvider().getName(hotNode);
+            
+            if (LOG.isDebugEnabled()) {
+                LOG.debug( MessageFormat.format(
+                        "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
+                        "("+questionedNode+", workrate {0,number,#.##}), " +
+                        "considering source "+hotNodeName+" ("+hotNode+", workrate {1,number,#.##})",
+                        questionedNodeTotalWorkrate, hotNodeWorkrate) );
+            }
+            
+            double idealSizeToMove = (hotNodeWorkrate - questionedNodeTotalWorkrate) / 2;
+            //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount
+            double targetNodeHighThreshold = model.getHighThreshold(questionedNode);
+            if (idealSizeToMove + questionedNodeTotalWorkrate > targetNodeHighThreshold)
+                idealSizeToMove = targetNodeHighThreshold - questionedNodeTotalWorkrate;
+            double maxSizeToMoveIdeally = Math.min(
+                    hotNodeWorkrate/2,
+                    //allow to swap order, but not very much (0.9 was allowed when balancing high)
+                    (hotNodeWorkrate - questionedNodeTotalWorkrate)*0.6);
+            double maxSizeToMoveIfNoSmallButLarger = questionedNodeTotalWorkrate*3/4;
+            
+            Map<ItemType, Double> hotNodeItems = getDataProvider().getItemWorkrates(hotNode);
+            if (hotNodeItems == null) {
+                if (LOG.isDebugEnabled())
+                    LOG.debug(MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
+                            "("+questionedNode+", workrate {0,number,#.##}), " +
+                            "excluding hot node "+hotNodeName+" because its item report unavailable",
+                            questionedNodeTotalWorkrate));
+                nodesChecked.add(hotNode);
+                continue;
+            }
+            
+            ItemType itemToMove = findBestItemToMove(hotNodeItems, idealSizeToMove, maxSizeToMoveIdeally,
+                    maxSizeToMoveIfNoSmallButLarger, itemsMoved, questionedLocation);
+            if (itemToMove == null) {
+                if (LOG.isDebugEnabled())
+                    LOG.debug(MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
+                            "("+questionedNode+", workrate {0,number,#.##}), " +
+                            "excluding hot node "+hotNodeName+" because it has no appilcable items " +
+                            "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " +
+                            "moving from hot node "+hotNodeName+" ("+hotNode+", workrate {3,number,#.##}); available items: {4}",
+                            questionedNodeTotalWorkrate, idealSizeToMove, maxSizeToMoveIdeally, hotNodeWorkrate, hotNodeItems) );
+                
+                nodesChecked.add(hotNode);
+                continue;
+            }
+            
+            itemsMoved.add(itemToMove);
+            double segmentRate = hotNodeItems.get(itemToMove);
+            
+//            if (LOG.isLoggable(Level.FINE))
+//                LOG.fine( MessageFormat.format(
+//                        "policy "+getDataProvider().getAbbr()+" balancing cold node "+questionedNodeName+" " +
+//                        "(workrate {0,number,#.##}, too low), transitioning " + itemToMove +
+//                        " from "+hotNodeName+" (workrate {1,number,#.##}, now -= {2,number,#.##})",
+//                        questionedNodeTotalWorkrate, hotNodeWorkrate, segmentRate) );
+            
+            questionedNodeTotalWorkrate += segmentRate;
+            hotNodeWorkrate -= segmentRate;
+            
+            moveItem(itemToMove, hotNode, questionedNode);
+            
+            if (++numMigrations >= getMaxMigrationsPerBalancingNode()) {
+                break;
+            }
+        }
+        
+        if (LOG.isDebugEnabled()) {
+            if (iters == 0) {
+                if (LOG.isTraceEnabled())
+                    LOG.trace( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" balancing if cold finished at node "+questionedNode+"; " +
+                            "workrate {0,number,#.##} not cold",
+                            originalQuestionedNodeTotalWorkrate) );
+            }
+            else if (itemsMoved.isEmpty()) {
+                if (LOG.isTraceEnabled())
+                    LOG.trace( MessageFormat.format(
+                            "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+" " +
+                            "(workrate {0,number,#.##}); no way to improve it",
+                            originalQuestionedNodeTotalWorkrate) );
+            } else {
+                LOG.debug( MessageFormat.format(
+                        "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+"; " +
+                        "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " +
+                        "by moving in {3}",
+                        originalQuestionedNodeTotalWorkrate,
+                        questionedNodeTotalWorkrate,
+                        getDataProvider().getTotalWorkrate(questionedNode),
+                        itemsMoved) );
+            }
+        }
+        return !itemsMoved.isEmpty();
+    }
+    
+    protected void moveItem(ItemType item, NodeType oldNode, NodeType newNode) {
+        item.move(newNode);
+        model.onItemMoved(item, newNode);
+    }
+    
+    /**
+     * "Best" is defined as nearest to the targetCost, without exceeding maxCost, unless maxCostIfNothingSmallerButLarger > 0
+     * which does just that (useful if the ideal and target are estimates and aren't quite right, typically it will take
+     * something larger than maxRate but less than half the total rate, which is only possible when the estimates don't agree)
+     */
+    protected ItemType findBestItemToMove(Map<ItemType, Double> costsPerItem, double targetCost, double maxCost,
+            double maxCostIfNothingSmallerButLarger, Set<ItemType> excludedItems, Location locationIfKnown) {
+        
+        ItemType closestMatch = null;
+        ItemType smallestMoveable = null, largest = null;
+        double minDiff = Double.MAX_VALUE, smallestC = Double.MAX_VALUE, largestC = Double.MIN_VALUE;
+        boolean exclusions = false;
+        
+        for (Entry<ItemType, Double> entry : costsPerItem.entrySet()) {
+            ItemType item = entry.getKey();
+            Double cost = entry.getValue();
+            
+            if (cost == null) {
+                if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' has null workrate: skipping", item));
+                continue;
+            }
+            
+            if (!model.isItemMoveable(item)) {
+                if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' cannot be moved: skipping", item));
+                continue;
+            }
+            if (cost < 0) {
+                if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' subject to recent adjustment: skipping", item));
+                continue;
+            }
+            if (excludedItems.contains(item)) {
+                exclusions = true;
+                continue;
+            }
+            if (cost < 0) { // FIXME: already tested above
+                exclusions = true;
+                continue;
+            }
+            if (cost <= 0) { // FIXME: overlaps previous condition
+                continue;
+            }
+            if (largest == null || cost > largestC) {
+                largest = item;
+                largestC = cost;
+            }
+            if (!model.isItemMoveable(item)) { // FIXME: already tested above
+                continue;
+            }
+            if (locationIfKnown != null && !model.isItemAllowedIn(item, locationIfKnown)) {
+                continue;
+            }
+            if (smallestMoveable == null || cost < smallestC) {
+                smallestMoveable = item;
+                smallestC = cost;
+            }
+            if (cost > maxCost) {
+                continue;
+            }
+            double diff = Math.abs(targetCost - cost);
+            if (closestMatch == null || diff < minDiff) {
+                closestMatch = item;
+                minDiff = diff;
+            }
+        }
+        
+        if (closestMatch != null)
+            return closestMatch;
+        
+        if (smallestC < maxCostIfNothingSmallerButLarger && smallestC < largestC && !exclusions)
+            return smallestMoveable;
+        
+        return null;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
new file mode 100644
index 0000000..5f6cabc
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.location.Location;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+
+/**
+ * Standard implementation of {@link BalanceablePoolModel}, providing essential arithmetic for item and container
+ * workrates and thresholds. See subclasses for specific requirements for migrating items.
+ */
+public class DefaultBalanceablePoolModel<ContainerType, ItemType> implements BalanceablePoolModel<ContainerType, ItemType> {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBalanceablePoolModel.class);
+    
+    /*
+     * Performance comments.
+     *  - Used hprof with LoadBalancingPolicySoakTest.testLoadBalancingManyManyItemsTest (1000 items)
+     *  - Prior to adding containerToItems, it created a new set by iterating over all items.
+     *    This was the biggest percentage of any brooklyn code.
+     *    Hence it's worth duplicating the values, keyed by item and keyed by container.
+     *  - Unfortunately changing threading model (so have a "rebalancer" thread, and a thread that 
+     *    processes events to update the model), get ConcurrentModificationException if don't take
+     *    copy of containerToItems.get(node)...
+     */
+    
+    // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item 
+    private static final String NULL_CONTAINER = "null-container";
+    
+    private final String name;
+    private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>());
+    private final Map<ContainerType, Double> containerToLowThreshold = new ConcurrentHashMap<ContainerType, Double>();
+    private final Map<ContainerType, Double> containerToHighThreshold = new ConcurrentHashMap<ContainerType, Double>();
+    private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>();
+    private final SetMultimap<ContainerType, ItemType> containerToItems =  Multimaps.synchronizedSetMultimap(HashMultimap.<ContainerType, ItemType>create());
+    private final Map<ItemType, Double> itemToWorkrate = new ConcurrentHashMap<ItemType, Double>();
+    private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>());
+    
+    private volatile double poolLowThreshold = 0;
+    private volatile double poolHighThreshold = 0;
+    private volatile double currentPoolWorkrate = 0;
+    
+    public DefaultBalanceablePoolModel(String name) {
+        this.name = name;
+    }
+    
+    public ContainerType getParentContainer(ItemType item) {
+        ContainerType result = itemToContainer.get(item);
+        return (result != NULL_CONTAINER) ? result : null;
+    }
+    
+    public Set<ItemType> getItemsForContainer(ContainerType node) {
+        Set<ItemType> result = containerToItems.get(node);
+        synchronized (containerToItems) {
+            return (result != null) ? ImmutableSet.copyOf(result) : Collections.<ItemType>emptySet();
+        }
+    }
+    
+    public Double getItemWorkrate(ItemType item) {
+        return itemToWorkrate.get(item);
+    }
+    
+    @Override public double getPoolLowThreshold() { return poolLowThreshold; }
+    @Override public double getPoolHighThreshold() { return poolHighThreshold; }
+    @Override public double getCurrentPoolWorkrate() { return currentPoolWorkrate; }
+    @Override public boolean isHot() { return !containers.isEmpty() && currentPoolWorkrate > poolHighThreshold; }
+    @Override public boolean isCold() { return !containers.isEmpty() && currentPoolWorkrate < poolLowThreshold; }
+    
+    
+    // Provider methods.
+    
+    @Override public String getName() { return name; }
+    @Override public int getPoolSize() { return containers.size(); }
+    @Override public Set<ContainerType> getPoolContents() { return containers; }
+    @Override public String getName(ContainerType container) { return container.toString(); } // TODO: delete?
+    @Override public Location getLocation(ContainerType container) { return null; } // TODO?
+    
+    @Override public double getLowThreshold(ContainerType container) {
+        Double result = containerToLowThreshold.get(container);
+        return (result != null) ? result : -1;
+    }
+    
+    @Override public double getHighThreshold(ContainerType container) {
+        Double result = containerToHighThreshold.get(container);
+        return (result != null) ? result : -1;
+    }
+    
+    @Override public double getTotalWorkrate(ContainerType container) {
+        double totalWorkrate = 0;
+        for (ItemType item : getItemsForContainer(container)) {
+            Double workrate = itemToWorkrate.get(item);
+            if (workrate != null)
+                totalWorkrate += Math.abs(workrate);
+        }
+        return totalWorkrate;
+    }
+    
+    @Override public Map<ContainerType, Double> getContainerWorkrates() {
+        Map<ContainerType, Double> result = new LinkedHashMap<ContainerType, Double>();
+        for (ContainerType node : containers)
+            result.put(node, getTotalWorkrate(node));
+        return result;
+    }
+    
+    @Override public Map<ItemType, Double> getItemWorkrates(ContainerType node) {
+        Map<ItemType, Double> result = new LinkedHashMap<ItemType, Double>();
+        for (ItemType item : getItemsForContainer(node))
+            result.put(item, itemToWorkrate.get(item));
+        return result;
+    }
+    
+    @Override public boolean isItemMoveable(ItemType item) {
+        // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable?
+        return itemToContainer.containsKey(item) && !immovableItems.contains(item);
+    }
+    
+    @Override public boolean isItemAllowedIn(ItemType item, Location location) {
+        return true; // TODO?
+    }
+    
+    
+    // Mutators.
+    
+    @Override
+    public void onItemMoved(ItemType item, ContainerType newNode) {
+        if (!itemToContainer.containsKey(item)) {
+            // Item may have been deleted; order of events received from different sources 
+            // (i.e. item itself and for itemGroup membership) is non-deterministic.
+            LOG.info("Balanceable pool model ignoring onItemMoved for unknown item {} to container {}; " +
+                    "if onItemAdded subsequently received will get new container then", item, newNode);
+            return;
+        }
+        ContainerType newNodeNonNull = toNonNullContainer(newNode);
+        ContainerType oldNode = itemToContainer.put(item, newNodeNonNull);
+        if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
+        if (newNode != null) containerToItems.put(newNode, item);
+    }
+    
+    @Override
+    public void onContainerAdded(ContainerType newContainer, double lowThreshold, double highThreshold) {
+        boolean added = containers.add(newContainer);
+        if (!added) {
+            // See LoadBalancingPolicy.onContainerAdded for possible explanation of why can get duplicate calls
+            LOG.debug("Duplicate container-added event for {}; ignoring", newContainer);
+            return;
+        }
+        containerToLowThreshold.put(newContainer, lowThreshold);
+        containerToHighThreshold.put(newContainer, highThreshold);
+        poolLowThreshold += lowThreshold;
+        poolHighThreshold += highThreshold;
+    }
+    
+    @Override
+    public void onContainerRemoved(ContainerType oldContainer) {
+        containers.remove(oldContainer);
+        Double containerLowThreshold = containerToLowThreshold.remove(oldContainer);
+        Double containerHighThresold = containerToHighThreshold.remove(oldContainer);
+        poolLowThreshold -= (containerLowThreshold != null ? containerLowThreshold : 0);
+        poolHighThreshold -= (containerHighThresold != null ? containerHighThresold : 0);
+        
+        // TODO: assert no orphaned items
+    }
+    
+    @Override
+    public void onItemAdded(ItemType item, ContainerType parentContainer) {
+        onItemAdded(item, parentContainer, false);
+    }
+    
+    @Override
+    public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable) {
+        // Duplicate calls to onItemAdded do no harm, as long as most recent is most accurate!
+        // Important that it stays that way for now - See LoadBalancingPolicy.onContainerAdded for explanation.
+
+        if (immovable)
+            immovableItems.add(item);
+        
+        ContainerType parentContainerNonNull = toNonNullContainer(parentContainer);
+        ContainerType oldNode = itemToContainer.put(item, parentContainerNonNull);
+        if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
+        if (parentContainer != null) containerToItems.put(parentContainer, item);
+    }
+    
+    @Override
+    public void onItemRemoved(ItemType item) {
+        ContainerType oldNode = itemToContainer.remove(item);
+        if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item);
+        Double workrate = itemToWorkrate.remove(item);
+        if (workrate != null)
+            currentPoolWorkrate -= workrate;
+        immovableItems.remove(item);
+    }
+    
+    @Override
+    public void onItemWorkrateUpdated(ItemType item, double newValue) {
+        if (hasItem(item)) {
+            Double oldValue = itemToWorkrate.put(item, newValue);
+            double delta = ( newValue - (oldValue != null ? oldValue : 0) );
+            currentPoolWorkrate += delta;
+        } else {
+            // Can happen when item removed - get notification of removal and workrate from group and item
+            // respectively, so can overtake each other
+            if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of workrate for unknown item {}, to {}", item, newValue);
+        }
+    }
+    
+    private boolean hasItem(ItemType item) {
+        return itemToContainer.containsKey(item);
+    }
+    
+    
+    // Additional methods for tests.
+
+    /**
+     * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers. 
+     */
+    @VisibleForTesting
+    public String itemDistributionToString() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        dumpItemDistribution(new PrintStream(baos));
+        return new String(baos.toByteArray());
+    }
+
+    @VisibleForTesting
+    public void dumpItemDistribution() {
+        dumpItemDistribution(System.out);
+    }
+    
+    @VisibleForTesting
+    public void dumpItemDistribution(PrintStream out) {
+        for (ContainerType container : getPoolContents()) {
+            out.println("Container '"+container+"': ");
+            for (ItemType item : getItemsForContainer(container)) {
+                Double workrate = getItemWorkrate(item);
+                out.println("\t"+"Item '"+item+"' ("+workrate+")");
+            }
+        }
+        out.flush();
+    }
+    
+    @SuppressWarnings("unchecked")
+    private ContainerType nullContainer() {
+        return (ContainerType) NULL_CONTAINER; // relies on erasure
+    }
+    
+    private ContainerType toNonNullContainer(ContainerType container) {
+        return (container != null) ? container : nullContainer();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
new file mode 100644
index 0000000..2538bc6
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.event.basic.BasicConfigKey;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+/**
+ * A group of items that are contained within a given (dynamically changing) set of containers.
+ * 
+ * The {@link setContainers(Group)} sets the group of containers. The membership of that group
+ * is dynamically tracked.
+ * 
+ * When containers are added/removed, or when an items is added/removed, or when an {@link Moveable} item 
+ * is moved then the membership of this group of items is automatically updated accordingly.
+ * 
+ * For example: in Monterey, this could be used to track the actors that are within a given cluster of venues.
+ */
+@ImplementedBy(ItemsInContainersGroupImpl.class)
+public interface ItemsInContainersGroup extends DynamicGroup {
+
+    @SetFromFlag("itemFilter")
+    public static final ConfigKey<Predicate<? super Entity>> ITEM_FILTER = new BasicConfigKey(
+            Predicate.class, "itemsInContainerGroup.itemFilter", "Filter for which items within the containers will automatically be in group", Predicates.alwaysTrue());
+
+    public void setContainers(Group containerGroup);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
new file mode 100644
index 0000000..225a2b6
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.AbstractGroup;
+import brooklyn.entity.basic.DynamicGroupImpl;
+
+import com.google.common.base.Predicate;
+
+/**
+ * A group of items that are contained within a given (dynamically changing) set of containers.
+ * 
+ * The {@link setContainers(Group)} sets the group of containers. The membership of that group
+ * is dynamically tracked.
+ * 
+ * When containers are added/removed, or when an items is added/removed, or when an {@link Moveable} item 
+ * is moved then the membership of this group of items is automatically updated accordingly.
+ * 
+ * For example: in Monterey, this could be used to track the actors that are within a given cluster of venues.
+ */
+public class ItemsInContainersGroupImpl extends DynamicGroupImpl implements ItemsInContainersGroup {
+
+    // TODO Inefficient: will not scale to many 1000s of items
+
+    private static final Logger LOG = LoggerFactory.getLogger(ItemsInContainersGroup.class);
+    
+    private Group containerGroup;
+    
+    private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
+        @Override
+        public void onEvent(SensorEvent<Object> event) {
+            Entity source = event.getSource();
+            Object value = event.getValue();
+            Sensor sensor = event.getSensor();
+            
+            if (sensor.equals(AbstractGroup.MEMBER_ADDED)) {
+                onContainerAdded((Entity) value);
+            } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) {
+                onContainerRemoved((Entity) value);
+            } else if (sensor.equals(Movable.CONTAINER)) {
+                onItemMoved((Movable)source, (BalanceableContainer<?>) value);
+            } else {
+                throw new IllegalStateException("Unhandled event type "+sensor+": "+event);
+            }
+        }
+    };
+
+    public ItemsInContainersGroupImpl() {
+    }
+    
+    @Override
+    public void init() {
+        super.init();
+        setEntityFilter(new Predicate<Entity>() {
+            @Override public boolean apply(Entity e) {
+                return acceptsEntity(e);
+            }});
+    }
+    
+    protected Predicate<? super Entity> getItemFilter() {
+        return getConfig(ITEM_FILTER);
+    }
+    
+    @Override
+    protected boolean acceptsEntity(Entity e) {
+        if (e instanceof Movable) {
+            return acceptsItem((Movable)e, ((Movable)e).getAttribute(Movable.CONTAINER));
+        }
+        return false;
+    }
+
+    boolean acceptsItem(Movable e, BalanceableContainer c) {
+        return (containerGroup != null && c != null) ? getItemFilter().apply(e) && containerGroup.hasMember(c) : false;
+    }
+
+    @Override
+    public void setContainers(Group containerGroup) {
+        this.containerGroup = containerGroup;
+        subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
+        subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
+        subscribe(null, Movable.CONTAINER, eventHandler);
+        
+        if (LOG.isTraceEnabled()) LOG.trace("{} scanning entities on container group set", this);
+        rescanEntities();
+    }
+    
+    private void onContainerAdded(Entity newContainer) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} rescanning entities on container {} added", this, newContainer);
+        rescanEntities();
+    }
+    
+    private void onContainerRemoved(Entity oldContainer) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} rescanning entities on container {} removed", this, oldContainer);
+        rescanEntities();
+    }
+    
+    protected void onEntityAdded(Entity item) {
+        if (acceptsEntity(item)) {
+            if (LOG.isDebugEnabled()) LOG.debug("{} adding new item {}", this, item);
+            addMember(item);
+        }
+    }
+    
+    protected void onEntityRemoved(Entity item) {
+        if (removeMember(item)) {
+            if (LOG.isDebugEnabled()) LOG.debug("{} removing deleted item {}", this, item);
+        }
+    }
+    
+    private void onItemMoved(Movable item, BalanceableContainer container) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} processing moved item {}, to container {}", new Object[] {this, item, container});
+        if (hasMember(item)) {
+            if (!acceptsItem(item, container)) {
+                if (LOG.isDebugEnabled()) LOG.debug("{} removing moved item {} from group, as new container {} is not a member", new Object[] {this, item, container});
+                removeMember(item);
+            }
+        } else {
+            if (acceptsItem(item, container)) {
+                if (LOG.isDebugEnabled()) LOG.debug("{} adding moved item {} to group, as new container {} is a member", new Object[] {this, item, container});
+                addMember(item);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
new file mode 100644
index 0000000..f437fc8
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static brooklyn.util.JavaGroovyEquivalents.elvis;
+import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.EntityInternal;
+
+import org.apache.brooklyn.policy.autoscaling.AutoScalerPolicy;
+
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * <p>Policy that is attached to a pool of "containers", each of which can host one or more migratable "items".
+ * The policy monitors the workrates of the items and effects migrations in an attempt to ensure that the containers
+ * are all sufficiently utilized without any of them being overloaded.
+ * 
+ * <p>The particular sensor that defines the items' workrates is specified when the policy is constructed. High- and
+ * low-thresholds are defined as <strong>configuration keys</strong> on each of the container entities in the pool:
+ * for an item sensor named {@code foo.bar.sensorName}, the corresponding container config keys would be named
+ * {@code foo.bar.sensorName.threshold.low} and {@code foo.bar.sensorName.threshold.high}.
+ * 
+ * <p>In addition to balancing items among the available containers, this policy causes the pool Entity to emit
+ * {@code POOL_COLD} and {@code POOL_HOT} events when it is determined that there is a surplus or shortfall
+ * of container resource in the pool respectively. These events may be consumed by a separate policy that is capable
+ * of resizing the container pool.
+ */
+    // removed from catalog because it cannot currently be configured via catalog mechanisms
+    // PolicySpec.create fails due to no no-arg constructor
+    // TODO make metric and model things which can be initialized from config then reinstate in catalog
+//@Catalog(name="Load Balancer", description="Policy that is attached to a pool of \"containers\", each of which "
+//        + "can host one or more migratable \"items\". The policy monitors the workrates of the items and effects "
+//        + "migrations in an attempt to ensure that the containers are all sufficiently utilized without any of "
+//        + "them being overloaded.")
+public class LoadBalancingPolicy<NodeType extends Entity, ItemType extends Movable> extends AbstractPolicy {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicy.class);
+    
+    @SetFromFlag(defaultVal="100")
+    private long minPeriodBetweenExecs;
+    
+    private final AttributeSensor<? extends Number> metric;
+    private final String lowThresholdConfigKeyName;
+    private final String highThresholdConfigKeyName;
+    private final BalanceablePoolModel<NodeType, ItemType> model;
+    private final BalancingStrategy<NodeType, ItemType> strategy;
+    private BalanceableWorkerPool poolEntity;
+    
+    private volatile ScheduledExecutorService executor;
+    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+    private volatile long executorTime = 0;
+
+    private int lastEmittedDesiredPoolSize = 0;
+    private static enum TemperatureStates { COLD, HOT }
+    private TemperatureStates lastEmittedPoolTemperature = null; // "cold" or "hot"
+    
+    private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public void onEvent(SensorEvent<Object> event) {
+            if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", LoadBalancingPolicy.this, event);
+            Entity source = event.getSource();
+            Object value = event.getValue();
+            Sensor sensor = event.getSensor();
+            
+            if (sensor.equals(metric)) {
+                onItemMetricUpdate((ItemType)source, ((Number) value).doubleValue(), true);
+            } else if (sensor.equals(BalanceableWorkerPool.CONTAINER_ADDED)) {
+                onContainerAdded((NodeType) value, true);
+            } else if (sensor.equals(BalanceableWorkerPool.CONTAINER_REMOVED)) {
+                onContainerRemoved((NodeType) value, true);
+            } else if (sensor.equals(BalanceableWorkerPool.ITEM_ADDED)) {
+                BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value;
+                onItemAdded((ItemType)pair.item, (NodeType)pair.container, true);
+            } else if (sensor.equals(BalanceableWorkerPool.ITEM_REMOVED)) {
+                BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value;
+                onItemRemoved((ItemType)pair.item, (NodeType)pair.container, true);
+            } else if (sensor.equals(BalanceableWorkerPool.ITEM_MOVED)) {
+                BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value;
+                onItemMoved((ItemType)pair.item, (NodeType)pair.container, true);
+            }
+        }
+    };
+
+    public LoadBalancingPolicy() {
+        this(null, null);
+    }
+    
+    public LoadBalancingPolicy(AttributeSensor<? extends Number> metric,
+            BalanceablePoolModel<NodeType, ItemType> model) {
+        this(MutableMap.of(), metric, model);
+    }
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public LoadBalancingPolicy(Map props, AttributeSensor<? extends Number> metric,
+            BalanceablePoolModel<NodeType, ItemType> model) {
+        
+        super(props);
+        this.metric = metric;
+        this.lowThresholdConfigKeyName = metric.getName()+".threshold.low";
+        this.highThresholdConfigKeyName = metric.getName()+".threshold.high";
+        this.model = model;
+        this.strategy = new BalancingStrategy(getDisplayName(), model); // TODO: extract interface, inject impl
+        
+        // TODO Should re-use the execution manager's thread pool, somehow
+        executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setEntity(EntityLocal entity) {
+        Preconditions.checkArgument(entity instanceof BalanceableWorkerPool, "Provided entity must be a BalanceableWorkerPool");
+        super.setEntity(entity);
+        this.poolEntity = (BalanceableWorkerPool) entity;
+        
+        // Detect when containers are added to or removed from the pool.
+        subscribe(poolEntity, BalanceableWorkerPool.CONTAINER_ADDED, eventHandler);
+        subscribe(poolEntity, BalanceableWorkerPool.CONTAINER_REMOVED, eventHandler);
+        subscribe(poolEntity, BalanceableWorkerPool.ITEM_ADDED, eventHandler);
+        subscribe(poolEntity, BalanceableWorkerPool.ITEM_REMOVED, eventHandler);
+        subscribe(poolEntity, BalanceableWorkerPool.ITEM_MOVED, eventHandler);
+        
+        // Take heed of any extant containers.
+        for (Entity container : poolEntity.getContainerGroup().getMembers()) {
+            onContainerAdded((NodeType)container, false);
+        }
+        for (Entity item : poolEntity.getItemGroup().getMembers()) {
+            onItemAdded((ItemType)item, (NodeType)item.getAttribute(Movable.CONTAINER), false);
+        }
+
+        scheduleRebalance();
+    }
+    
+    @Override
+    public void suspend() {
+        // TODO unsubscribe from everything? And resubscribe on resume?
+        super.suspend();
+        if (executor != null) executor.shutdownNow();;
+        executorQueued.set(false);
+    }
+    
+    @Override
+    public void resume() {
+        super.resume();
+        executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+        executorTime = 0;
+        executorQueued.set(false);
+    }
+    
+    private ThreadFactory newThreadFactory() {
+        return new ThreadFactoryBuilder()
+                .setNameFormat("brooklyn-followthesunpolicy-%d")
+                .build();
+    }
+
+    private void scheduleRebalance() {
+        if (isRunning() && executorQueued.compareAndSet(false, true)) {
+            long now = System.currentTimeMillis();
+            long delay = Math.max(0, (executorTime + minPeriodBetweenExecs) - now);
+            
+            executor.schedule(new Runnable() {
+                @SuppressWarnings("rawtypes")
+                public void run() {
+                    try {
+                        executorTime = System.currentTimeMillis();
+                        executorQueued.set(false);
+                        strategy.rebalance();
+
+                        if (LOG.isDebugEnabled()) LOG.debug("{} post-rebalance: poolSize={}; workrate={}; lowThreshold={}; " + 
+                                "highThreshold={}", new Object[] {this, model.getPoolSize(), model.getCurrentPoolWorkrate(), 
+                                model.getPoolLowThreshold(), model.getPoolHighThreshold()});
+                        
+                        if (model.isCold()) {
+                            Map eventVal = ImmutableMap.of(
+                                    AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, model.getPoolSize(),
+                                    AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, model.getCurrentPoolWorkrate(),
+                                    AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, model.getPoolLowThreshold(),
+                                    AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, model.getPoolHighThreshold());
+            
+                            ((EntityLocal)poolEntity).emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, eventVal);
+                            
+                            if (LOG.isInfoEnabled()) {
+                                int desiredPoolSize = (int) Math.ceil(model.getCurrentPoolWorkrate() / (model.getPoolLowThreshold()/model.getPoolSize()));
+                                if (desiredPoolSize != lastEmittedDesiredPoolSize || lastEmittedPoolTemperature != TemperatureStates.COLD) {
+                                    LOG.info("{} emitted COLD (suggesting {}): {}", new Object[] {this, desiredPoolSize, eventVal});
+                                    lastEmittedDesiredPoolSize = desiredPoolSize;
+                                    lastEmittedPoolTemperature = TemperatureStates.COLD;
+                                }
+                            }
+                        
+                        } else if (model.isHot()) {
+                            Map eventVal = ImmutableMap.of(
+                                    AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, model.getPoolSize(),
+                                    AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, model.getCurrentPoolWorkrate(),
+                                    AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, model.getPoolLowThreshold(),
+                                    AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, model.getPoolHighThreshold());
+                            
+                            ((EntityLocal)poolEntity).emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, eventVal);
+                            
+                            if (LOG.isInfoEnabled()) {
+                                int desiredPoolSize = (int) Math.ceil(model.getCurrentPoolWorkrate() / (model.getPoolHighThreshold()/model.getPoolSize()));
+                                if (desiredPoolSize != lastEmittedDesiredPoolSize || lastEmittedPoolTemperature != TemperatureStates.HOT) {
+                                    LOG.info("{} emitted HOT (suggesting {}): {}", new Object[] {this, desiredPoolSize, eventVal});
+                                    lastEmittedDesiredPoolSize = desiredPoolSize;
+                                    lastEmittedPoolTemperature = TemperatureStates.HOT;
+                                }
+                            }
+                        }
+
+                    } catch (Exception e) {
+                        if (isRunning()) {
+                            LOG.error("Error rebalancing", e);
+                        } else {
+                            LOG.debug("Error rebalancing, but no longer running", e);
+                        }
+                    }
+                }},
+                delay,
+                TimeUnit.MILLISECONDS);
+        }
+    }
+    
+    // TODO Can get duplicate onContainerAdded events.
+    //      I presume it's because we subscribe and then iterate over the extant containers.
+    //      Solution would be for subscription to give you events for existing / current value(s).
+    //      Also current impl messes up single-threaded updates model: the setEntity is a different thread than for subscription events.
+    private void onContainerAdded(NodeType newContainer, boolean rebalanceNow) {
+        Preconditions.checkArgument(newContainer instanceof BalanceableContainer, "Added container must be a BalanceableContainer");
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of container {}", this, newContainer);
+        // Low and high thresholds for the metric we're interested in are assumed to be present
+        // in the container's configuration.
+        Number lowThreshold = (Number) findConfigValue(newContainer, lowThresholdConfigKeyName);
+        Number highThreshold = (Number) findConfigValue(newContainer, highThresholdConfigKeyName);
+        if (lowThreshold == null || highThreshold == null) {
+            LOG.warn(
+                "Balanceable container '"+newContainer+"' does not define low- and high- threshold configuration keys: '"+
+                lowThresholdConfigKeyName+"' and '"+highThresholdConfigKeyName+"', skipping");
+            return;
+        }
+        
+        model.onContainerAdded(newContainer, lowThreshold.doubleValue(), highThreshold.doubleValue());
+        
+        // Note: no need to scan the container for items; they will appear via the ITEM_ADDED events.
+        // Also, must abide by any item-filters etc defined in the pool.
+        
+        if (rebalanceNow) scheduleRebalance();
+    }
+    
+    private static Object findConfigValue(Entity entity, String configKeyName) {
+        Map<ConfigKey<?>, Object> config = ((EntityInternal)entity).getAllConfig();
+        for (Entry<ConfigKey<?>, Object> entry : config.entrySet()) {
+            if (configKeyName.equals(entry.getKey().getName()))
+                return entry.getValue();
+        }
+        return null;
+    }
+    
+    // TODO Receiving duplicates of onContainerRemoved (e.g. when running LoadBalancingInmemorySoakTest)
+    private void onContainerRemoved(NodeType oldContainer, boolean rebalanceNow) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of container {}", this, oldContainer);
+        model.onContainerRemoved(oldContainer);
+        if (rebalanceNow) scheduleRebalance();
+    }
+    
+    private void onItemAdded(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
+        Preconditions.checkArgument(item instanceof Movable, "Added item "+item+" must implement Movable");
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of item {} in container {}", new Object[] {this, item, parentContainer});
+        
+        subscribe(item, metric, eventHandler);
+        
+        // Update the model, including the current metric value (if any).
+        boolean immovable = (Boolean)elvis(item.getConfig(Movable.IMMOVABLE), false);
+        Number currentValue = item.getAttribute(metric);
+        model.onItemAdded(item, parentContainer, immovable);
+        if (currentValue != null)
+            model.onItemWorkrateUpdated(item, currentValue.doubleValue());
+        
+        if (rebalanceNow) scheduleRebalance();
+    }
+    
+    private void onItemRemoved(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of item {}", this, item);
+        unsubscribe(item);
+        model.onItemRemoved(item);
+        if (rebalanceNow) scheduleRebalance();
+    }
+    
+    private void onItemMoved(ItemType item, NodeType parentContainer, boolean rebalanceNow) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording moving of item {} to {}", new Object[] {this, item, parentContainer});
+        model.onItemMoved(item, parentContainer);
+        if (rebalanceNow) scheduleRebalance();
+    }
+    
+    private void onItemMetricUpdate(ItemType item, double newValue, boolean rebalanceNow) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording metric update for item {}, new value {}", new Object[] {this, item, newValue});
+        model.onItemWorkrateUpdated(item, newValue);
+        if (rebalanceNow) scheduleRebalance();
+    }
+    
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java
new file mode 100644
index 0000000..b58b8d2
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import org.apache.brooklyn.api.location.Location;
+
+/**
+ * Temporary stub to resolve dependencies in ported LoadBalancingPolicy.
+ */
+public class LocationConstraint {
+    public boolean isPermitted(Location l) { return true; } // TODO
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java
new file mode 100644
index 0000000..68ba43d
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.annotation.EffectorParam;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicConfigKey;
+
+
+/**
+ * Represents an item that can be migrated between balanceable containers.
+ */
+public interface Movable extends Entity {
+    
+    @SetFromFlag("immovable")
+    public static ConfigKey<Boolean> IMMOVABLE = new BasicConfigKey<Boolean>(
+        Boolean.class, "movable.item.immovable", "Indicates whether this item instance is immovable, so cannot be moved by policies", false);
+    
+    public static BasicAttributeSensor<BalanceableContainer> CONTAINER = new BasicAttributeSensor<BalanceableContainer>(
+        BalanceableContainer.class, "movable.item.container", "The container that this item is on");
+    
+    public static final MethodEffector<Void> MOVE = new MethodEffector<Void>(Movable.class, "move");
+    
+    public String getContainerId();
+    
+    @Effector(description="Moves this entity to the given container")
+    public void move(@EffectorParam(name="destination") Entity destination);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
new file mode 100644
index 0000000..b5c6a28
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Set;
+
+/**
+ * Provides conveniences for searching for hot/cold containers in a provided pool model.
+ * Ported from Monterey v3, with irrelevant bits removed.
+ */
+public class PolicyUtilForPool<ContainerType, ItemType> {
+    
+    private final BalanceablePoolModel<ContainerType, ItemType> model;
+    
+    
+    public PolicyUtilForPool (BalanceablePoolModel<ContainerType, ItemType> model) {
+        this.model = model;
+    }
+    
+    public ContainerType findColdestContainer(Set<ContainerType> excludedContainers) {
+        return findColdestContainer(excludedContainers, null);
+    }
+    
+    /**
+     * Identifies the container with the maximum spare capacity (highThreshold - currentWorkrate),
+     * returns null if none of the model's nodes has spare capacity.
+     */
+    public ContainerType findColdestContainer(Set<ContainerType> excludedContainers, LocationConstraint locationConstraint) {
+        double maxSpareCapacity = 0;
+        ContainerType coldest = null;
+        
+        for (ContainerType c : model.getPoolContents()) {
+            if (excludedContainers.contains(c))
+                continue;
+            if (locationConstraint != null && !locationConstraint.isPermitted(model.getLocation(c)))
+                continue;
+            
+            double highThreshold = model.getHighThreshold(c);
+            double totalWorkrate = model.getTotalWorkrate(c);
+            double spareCapacity = highThreshold - totalWorkrate;
+            
+            if (highThreshold == -1 || totalWorkrate == -1) {
+                continue; // container presumably has been removed
+            }
+            if (spareCapacity > maxSpareCapacity) {
+                maxSpareCapacity = spareCapacity;
+                coldest = c;
+            }
+        }
+        return coldest;
+    }
+    
+    /**
+     * Identifies the container with the maximum overshoot (currentWorkrate - highThreshold),
+     * returns null if none of the model's  nodes has an overshoot.
+     */
+    public ContainerType findHottestContainer(Set<ContainerType> excludedContainers) {
+        double maxOvershoot = 0;
+        ContainerType hottest = null;
+        
+        for (ContainerType c : model.getPoolContents()) {
+            if (excludedContainers.contains(c))
+                continue;
+            
+            double totalWorkrate = model.getTotalWorkrate(c);
+            double highThreshold = model.getHighThreshold(c);
+            double overshoot = totalWorkrate - highThreshold;
+            
+            if (highThreshold == -1 || totalWorkrate == -1) {
+                continue; // container presumably has been removed
+            }
+            if (overshoot > maxOvershoot) {
+                maxOvershoot = overshoot;
+                hottest = c;
+            }
+        }
+        return hottest;
+    }
+    
+}
\ No newline at end of file