You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:04 UTC
[06/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming
package policy
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/AbstractLoadBalancingPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/AbstractLoadBalancingPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/AbstractLoadBalancingPolicyTest.java
new file mode 100644
index 0000000..6257bc9
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/AbstractLoadBalancingPolicyTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.entity.basic.Entities;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.Sensors;
+import org.apache.brooklyn.location.basic.SimulatedLocation;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class AbstractLoadBalancingPolicyTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractLoadBalancingPolicyTest.class);
+
+ protected static final long TIMEOUT_MS = 10*1000;
+ protected static final long SHORT_WAIT_MS = 250;
+
+ protected static final long CONTAINER_STARTUP_DELAY_MS = 100;
+
+ public static final AttributeSensor<Integer> TEST_METRIC =
+ Sensors.newIntegerSensor("test.metric", "Dummy workrate for test entities");
+
+ public static final ConfigKey<Double> LOW_THRESHOLD_CONFIG_KEY = new BasicConfigKey<Double>(Double.class, TEST_METRIC.getName()+".threshold.low", "desc", 0.0);
+ public static final ConfigKey<Double> HIGH_THRESHOLD_CONFIG_KEY = new BasicConfigKey<Double>(Double.class, TEST_METRIC.getName()+".threshold.high", "desc", 0.0);
+
+ protected TestApplication app;
+ protected SimulatedLocation loc;
+ protected BalanceableWorkerPool pool;
+ protected DefaultBalanceablePoolModel<Entity, Entity> model;
+ protected LoadBalancingPolicy policy;
+ protected Group containerGroup;
+ protected Group itemGroup;
+ protected Random random = new Random();
+
+ @BeforeMethod(alwaysRun=true)
+ public void before() {
+ LOG.debug("In AbstractLoadBalancingPolicyTest.before()");
+
+ MockItemEntityImpl.totalMoveCount.set(0);
+ MockItemEntityImpl.lastMoveTime.set(0);
+
+ loc = new SimulatedLocation(MutableMap.of("name", "loc"));
+
+ model = new DefaultBalanceablePoolModel<Entity, Entity>("pool-model");
+
+ app = TestApplication.Factory.newManagedInstanceForTests();
+ containerGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class)
+ .displayName("containerGroup")
+ .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockContainerEntity.class)));
+ itemGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class)
+ .displayName("itemGroup")
+ .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockItemEntity.class)));
+ pool = app.createAndManageChild(EntitySpec.create(BalanceableWorkerPool.class));
+ pool.setContents(containerGroup, itemGroup);
+ policy = new LoadBalancingPolicy(MutableMap.of("minPeriodBetweenExecs", 1), TEST_METRIC, model);
+ pool.addPolicy(policy);
+ app.start(ImmutableList.of(loc));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void after() {
+ if (policy != null) policy.destroy();
+ if (app != null) Entities.destroyAll(app.getManagementContext());
+ }
+
+ // Using this utility, as it gives more info about the workrates of all containers rather than just the one that differs
+ protected void assertWorkrates(Collection<MockContainerEntity> containers, Collection<Double> expectedC, double precision) {
+ Iterable<Double> actual = Iterables.transform(containers, new Function<MockContainerEntity, Double>() {
+ public Double apply(MockContainerEntity input) {
+ return getContainerWorkrate(input);
+ }});
+
+ List<Double> expected = Lists.newArrayList(expectedC);
+ String errMsg = "actual="+actual+"; expected="+expected;
+ assertEquals(containers.size(), expected.size(), errMsg);
+ for (int i = 0; i < containers.size(); i++) {
+ assertEquals(Iterables.get(actual, i), expected.get(i), precision, errMsg);
+ }
+ }
+
+ protected void assertWorkratesEventually(Collection<MockContainerEntity> containers, Iterable<? extends Movable> items, Collection<Double> expected) {
+ assertWorkratesEventually(containers, items, expected, 0d);
+ }
+
+ /**
+ * Asserts that the given container have the given expected workrates (by querying the containers directly).
+ * Accepts an accuracy of "precision" for each container's workrate.
+ */
+ protected void assertWorkratesEventually(final Collection<MockContainerEntity> containers, final Iterable<? extends Movable> items, final Collection<Double> expected, final double precision) {
+ try {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ public void run() {
+ assertWorkrates(containers, expected, precision);
+ }});
+ } catch (AssertionError e) {
+ String errMsg = e.getMessage()+"; "+verboseDumpToString(containers, items);
+ throw new RuntimeException(errMsg, e);
+ }
+ }
+
+ // Using this utility, as it gives more info about the workrates of all containers rather than just the one that differs
+ protected void assertWorkratesContinually(List<MockContainerEntity> containers, Iterable<? extends Movable> items, List<Double> expected) {
+ assertWorkratesContinually(containers, items, expected, 0d);
+ }
+
+ /**
+ * Asserts that the given containers have the given expected workrates (by querying the containers directly)
+ * continuously for SHORT_WAIT_MS.
+ * Accepts an accuracy of "precision" for each container's workrate.
+ */
+ protected void assertWorkratesContinually(final List<MockContainerEntity> containers, Iterable<? extends Movable> items, final List<Double> expected, final double precision) {
+ try {
+ Asserts.succeedsContinually(MutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
+ public void run() {
+ assertWorkrates(containers, expected, precision);
+ }});
+ } catch (AssertionError e) {
+ String errMsg = e.getMessage()+"; "+verboseDumpToString(containers, items);
+ throw new RuntimeException(errMsg, e);
+ }
+ }
+
+ protected String verboseDumpToString(Iterable<MockContainerEntity> containers, Iterable<? extends Movable> items) {
+ Iterable<Double> containerRates = Iterables.transform(containers, new Function<MockContainerEntity, Double>() {
+ @Override public Double apply(MockContainerEntity input) {
+ return (double) input.getWorkrate();
+ }});
+
+ Map<MockContainerEntity, Set<Movable>> itemDistributionByContainer = Maps.newLinkedHashMap();
+ for (MockContainerEntity container : containers) {
+ itemDistributionByContainer.put(container, container.getBalanceableItems());
+ }
+
+ Map<Movable, BalanceableContainer<?>> itemDistributionByItem = Maps.newLinkedHashMap();
+ for (Movable item : items) {
+ itemDistributionByItem.put(item, item.getAttribute(Movable.CONTAINER));
+ }
+
+ String modelItemDistribution = model.itemDistributionToString();
+ return "containers="+containers+"; containerRates="+containerRates
+ +"; itemDistributionByContainer="+itemDistributionByContainer
+ +"; itemDistributionByItem="+itemDistributionByItem
+ +"; model="+modelItemDistribution
+ +"; totalMoves="+MockItemEntityImpl.totalMoveCount
+ +"; lastMoveTime="+Time.makeDateString(MockItemEntityImpl.lastMoveTime.get());
+ }
+
+ protected MockContainerEntity newContainer(TestApplication app, String name, double lowThreshold, double highThreshold) {
+ return newAsyncContainer(app, name, lowThreshold, highThreshold, 0);
+ }
+
+ /**
+ * Creates a new container that will take "delay" millis to complete its start-up.
+ */
+ protected MockContainerEntity newAsyncContainer(TestApplication app, String name, double lowThreshold, double highThreshold, long delay) {
+ MockContainerEntity container = app.createAndManageChild(EntitySpec.create(MockContainerEntity.class)
+ .displayName(name)
+ .configure(MockContainerEntity.DELAY, delay)
+ .configure(LOW_THRESHOLD_CONFIG_KEY, lowThreshold)
+ .configure(HIGH_THRESHOLD_CONFIG_KEY, highThreshold));
+ LOG.debug("Managed new container {}", container);
+ container.start(ImmutableList.of(loc));
+ return container;
+ }
+
+ protected static MockItemEntity newItem(TestApplication app, MockContainerEntity container, String name, double workrate) {
+ MockItemEntity item = app.createAndManageChild(EntitySpec.create(MockItemEntity.class)
+ .displayName(name));
+ LOG.debug("Managing new item {} on container {}", item, container);
+ item.move(container);
+ ((EntityLocal)item).setAttribute(TEST_METRIC, (int)workrate);
+ return item;
+ }
+
+ protected static MockItemEntity newLockedItem(TestApplication app, MockContainerEntity container, String name, double workrate) {
+ MockItemEntity item = app.createAndManageChild(EntitySpec.create(MockItemEntity.class)
+ .displayName(name)
+ .configure(Movable.IMMOVABLE, true));
+ LOG.debug("Managed new item {} on container {}", item, container);
+ item.move(container);
+ ((EntityLocal)item).setAttribute(TEST_METRIC, (int)workrate);
+ return item;
+ }
+
+ /**
+ * Asks the item directly for its workrate.
+ */
+ protected static double getItemWorkrate(MockItemEntity item) {
+ Object result = item.getAttribute(TEST_METRIC);
+ return (result == null ? 0 : ((Number) result).doubleValue());
+ }
+
+ /**
+ * Asks the container for its items, and then each of those items directly for their workrates; returns the total.
+ */
+ protected static double getContainerWorkrate(MockContainerEntity container) {
+ double result = 0.0;
+ Preconditions.checkNotNull(container, "container");
+ for (Movable item : container.getBalanceableItems()) {
+ Preconditions.checkNotNull(item, "item in container");
+ assertEquals(item.getContainerId(), container.getId());
+ result += getItemWorkrate((MockItemEntity)item);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolTest.java
new file mode 100644
index 0000000..18f6847
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.AbstractGroup;
+import brooklyn.entity.basic.AbstractGroupImpl;
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.trait.Resizable;
+import org.apache.brooklyn.location.basic.SimulatedLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+
+public class BalanceableWorkerPoolTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BalanceableWorkerPoolTest.class);
+
+ protected static final long TIMEOUT_MS = 10*1000;
+ protected static final long SHORT_WAIT_MS = 250;
+
+ protected static final long CONTAINER_STARTUP_DELAY_MS = 100;
+
+ protected TestApplication app;
+ protected SimulatedLocation loc;
+ protected BalanceableWorkerPool pool;
+ protected Group containerGroup;
+ protected Group itemGroup;
+
+ @BeforeMethod(alwaysRun=true)
+ public void before() {
+ loc = new SimulatedLocation(MutableMap.of("name", "loc"));
+
+ app = ApplicationBuilder.newManagedApp(TestApplication.class);
+ containerGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class)
+ .displayName("containerGroup")
+ .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockContainerEntity.class)));
+ itemGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class)
+ .displayName("itemGroup")
+ .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockItemEntity.class)));
+ pool = app.createAndManageChild(EntitySpec.create(BalanceableWorkerPool.class));
+ pool.setContents(containerGroup, itemGroup);
+
+ app.start(ImmutableList.of(loc));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void after() {
+ if (app != null) Entities.destroyAll(app.getManagementContext());
+ }
+
+ @Test
+ public void testDefaultResizeFailsIfContainerGroupNotResizable() throws Exception {
+ try {
+ pool.resize(1);
+ fail();
+ } catch (Exception e) {
+ if (Exceptions.getFirstThrowableOfType(e, UnsupportedOperationException.class) == null) throw e;
+ }
+ }
+
+ @Test
+ public void testDefaultResizeCallsResizeOnContainerGroup() {
+ LocallyResizableGroup resizable = app.createAndManageChild(EntitySpec.create(LocallyResizableGroup.class));
+
+ BalanceableWorkerPool pool2 = app.createAndManageChild(EntitySpec.create(BalanceableWorkerPool.class));
+ pool2.setContents(resizable, itemGroup);
+ Entities.manage(pool2);
+
+ pool2.resize(123);
+ assertEquals(resizable.getCurrentSize(), (Integer) 123);
+ }
+
+ @Test
+ public void testCustomResizableCalledWhenResizing() {
+ LocallyResizableGroup resizable = app.createAndManageChild(EntitySpec.create(LocallyResizableGroup.class));
+
+ pool.setResizable(resizable);
+
+ pool.resize(123);
+ assertEquals(resizable.getCurrentSize(), (Integer)123);
+ }
+
+ @ImplementedBy(LocallyResizableGroupImpl.class)
+ public static interface LocallyResizableGroup extends AbstractGroup, Resizable {
+ }
+
+ public static class LocallyResizableGroupImpl extends AbstractGroupImpl implements LocallyResizableGroup {
+ private int size = 0;
+
+ @Override
+ public Integer resize(Integer newSize) {
+ size = newSize;
+ return size;
+ }
+ @Override
+ public Integer getCurrentSize() {
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupTest.java
new file mode 100644
index 0000000..c256334
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.entity.basic.Entities;
+import org.apache.brooklyn.location.basic.SimulatedLocation;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class ItemsInContainersGroupTest {
+
+ // all tests are 20ms or less, but use a big timeout just in case very slow machine!
+ private static final long TIMEOUT_MS = 15000;
+
+ private TestApplication app;
+ private SimulatedLocation loc;
+ private Group containerGroup;
+ private ItemsInContainersGroup itemGroup;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ loc = new SimulatedLocation(MutableMap.of("name", "loc"));
+
+ app = ApplicationBuilder.newManagedApp(TestApplication.class);
+ containerGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class)
+ .displayName("containerGroup")
+ .configure(DynamicGroup.ENTITY_FILTER, new Predicate<Entity>() {
+ public boolean apply(Entity input) {
+ return input instanceof MockContainerEntity &&
+ input.getConfig(MockContainerEntity.MOCK_MEMBERSHIP) == "ingroup";
+ }}));
+ itemGroup = app.createAndManageChild(EntitySpec.create(ItemsInContainersGroup.class)
+ .displayName("itemGroup"));
+ itemGroup.setContainers(containerGroup);
+
+ app.start(ImmutableList.of(loc));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (app != null) Entities.destroyAll(app.getManagementContext());
+ }
+
+ @Test
+ public void testSimpleMembership() throws Exception {
+ MockContainerEntity containerIn = newContainer(app, "A", "ingroup");
+ MockItemEntity item1 = newItem(app, containerIn, "1");
+ MockItemEntity item2 = newItem(app, containerIn, "2");
+
+ assertItemsEventually(item1, item2);
+ }
+
+ @Test
+ public void testFilterIsAppliedToItems() throws Exception {
+ itemGroup.stop();
+ Entities.unmanage(itemGroup);
+
+ itemGroup = app.createAndManageChild(EntitySpec.create(ItemsInContainersGroup.class)
+ .displayName("itemGroupWithDispName2")
+ .configure(ItemsInContainersGroup.ITEM_FILTER, new Predicate<Entity>() {
+ public boolean apply(Entity input) {
+ return "2".equals(input.getDisplayName());
+ }}));
+ itemGroup.setContainers(containerGroup);
+
+ MockContainerEntity containerIn = newContainer(app, "A", "ingroup");
+ MockItemEntity item1 = newItem(app, containerIn, "1");
+ MockItemEntity item2 = newItem(app, containerIn, "2");
+
+ assertItemsEventually(item2); // does not include item1
+ }
+
+ @Test
+ public void testItemsInOtherContainersIgnored() throws Exception {
+ MockContainerEntity containerOut = newContainer(app, "A", "outgroup");
+ MockItemEntity item1 = newItem(app, containerOut, "1");
+
+ assertItemsEventually();
+ }
+
+ @Test
+ public void testItemMovedInIsAdded() throws Exception {
+ MockContainerEntity containerIn = newContainer(app, "A", "ingroup");
+ MockContainerEntity containerOut = newContainer(app, "A", "outgroup");
+ MockItemEntity item1 = newItem(app, containerOut, "1");
+ item1.move(containerIn);
+
+ assertItemsEventually(item1);
+ }
+
+ @Test
+ public void testItemMovedOutIsRemoved() throws Exception {
+ MockContainerEntity containerIn = newContainer(app, "A", "ingroup");
+ MockContainerEntity containerOut = newContainer(app, "A", "outgroup");
+ MockItemEntity item1 = newItem(app, containerIn, "1");
+ assertItemsEventually(item1);
+
+ item1.move(containerOut);
+ assertItemsEventually();
+ }
+
+ /*
+ * Previously could fail if...
+ * ItemsInContainersGroupImpl listener got notified of Movable.CONTAINER after entity was unmanaged
+ * (because being done in concurrent threads).
+ * This called ItemsInContainersGroupImpl.onItemMoved, which called addMember to add it back in again.
+ * In AbstractGroup.addMember, we now check if the entity is still managed, to
+ * ensure there is synchronization for concurrent calls to add/remove member.
+ */
+ @Test
+ public void testItemUnmanagedIsRemoved() throws Exception {
+ MockContainerEntity containerIn = newContainer(app, "A", "ingroup");
+ MockItemEntity item1 = newItem(app, containerIn, "1");
+ assertItemsEventually(item1);
+
+ Entities.unmanage(item1);
+ assertItemsEventually();
+ }
+
+ // TODO How to test this? Will it be used?
+ // Adding a new container then adding items to it is tested in many other methods.
+ @Test(enabled=false)
+ public void testContainerAddedWillAddItsItems() throws Exception {
+ }
+
+ @Test
+ public void testContainerRemovedWillRemoveItsItems() throws Exception {
+ MockContainerEntity containerA = newContainer(app, "A", "ingroup");
+ MockItemEntity item1 = newItem(app, containerA, "1");
+ assertItemsEventually(item1);
+
+ Entities.unmanage(containerA);
+ assertItemsEventually();
+ }
+
+ private void assertItemsEventually(final MockItemEntity... expected) {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ public void run() {
+ assertEquals(ImmutableSet.copyOf(itemGroup.getMembers()), ImmutableSet.copyOf(expected));
+ }});
+ }
+
+ private MockContainerEntity newContainer(TestApplication app, String name, String membership) {
+ MockContainerEntity container = app.createAndManageChild(EntitySpec.create(MockContainerEntity.class)
+ .displayName(name)
+ .configure(MockContainerEntity.MOCK_MEMBERSHIP, membership));
+ container.start(ImmutableList.of(loc));
+ return container;
+ }
+
+ private static MockItemEntity newItem(TestApplication app, MockContainerEntity container, String name) {
+ MockItemEntity item = app.createAndManageChild(EntitySpec.create(MockItemEntity.class)
+ .displayName(name));
+ item.move(container);
+ return item;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingModelTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingModelTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingModelTest.java
new file mode 100644
index 0000000..d96f509
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingModelTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collections;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class LoadBalancingModelTest {
+
+ private static final double PRECISION = 0.00001;
+
+ private MockContainerEntity container1 = new MockContainerEntityImpl();
+ private MockContainerEntity container2 = new MockContainerEntityImpl();
+ private MockItemEntity item1 = new MockItemEntityImpl();
+ private MockItemEntity item2 = new MockItemEntityImpl();
+ private MockItemEntity item3 = new MockItemEntityImpl();
+
+ private DefaultBalanceablePoolModel<MockContainerEntity, MockItemEntity> model;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ model = new DefaultBalanceablePoolModel<MockContainerEntity, MockItemEntity>("myname");
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ // nothing to tear down; no management context created
+ }
+
+ @Test
+ public void testPoolRatesCorrectlySumContainers() throws Exception {
+ model.onContainerAdded(container1, 10d, 20d);
+ model.onContainerAdded(container2, 11d, 22d);
+
+ assertEquals(model.getPoolLowThreshold(), 10d+11d, PRECISION);
+ assertEquals(model.getPoolHighThreshold(), 20d+22d, PRECISION);
+ }
+
+ @Test
+ public void testPoolRatesCorrectlySumItems() throws Exception {
+ model.onContainerAdded(container1, 10d, 20d);
+ model.onItemAdded(item1, container1, true);
+ model.onItemAdded(item2, container1, true);
+
+ model.onItemWorkrateUpdated(item1, 1d);
+ assertEquals(model.getCurrentPoolWorkrate(), 1d, PRECISION);
+
+ model.onItemWorkrateUpdated(item2, 2d);
+ assertEquals(model.getCurrentPoolWorkrate(), 1d+2d, PRECISION);
+
+ model.onItemWorkrateUpdated(item2, 4d);
+ assertEquals(model.getCurrentPoolWorkrate(), 1d+4d, PRECISION);
+
+ model.onItemRemoved(item1);
+ assertEquals(model.getCurrentPoolWorkrate(), 4d, PRECISION);
+ }
+
+ @Test
+ public void testWorkrateUpdateAfterItemRemovalIsNotRecorded() throws Exception {
+ model.onContainerAdded(container1, 10d, 20d);
+ model.onItemAdded(item1, container1, true);
+ model.onItemRemoved(item1);
+ model.onItemWorkrateUpdated(item1, 123d);
+
+ assertEquals(model.getCurrentPoolWorkrate(), 0d, PRECISION);
+ assertEquals(model.getContainerWorkrates().get(container1), 0d, PRECISION);
+ assertEquals(model.getItemWorkrate(item1), null);
+ }
+
+ @Test
+ public void testItemMovedWillUpdateContainerWorkrates() throws Exception {
+ model.onContainerAdded(container1, 10d, 20d);
+ model.onContainerAdded(container2, 11d, 21d);
+ model.onItemAdded(item1, container1, false);
+ model.onItemWorkrateUpdated(item1, 123d);
+
+ model.onItemMoved(item1, container2);
+
+ assertEquals(model.getItemsForContainer(container1), Collections.emptySet());
+ assertEquals(model.getItemsForContainer(container2), ImmutableSet.of(item1));
+ assertEquals(model.getItemWorkrate(item1), 123d);
+ assertEquals(model.getTotalWorkrate(container1), 0d);
+ assertEquals(model.getTotalWorkrate(container2), 123d);
+ assertEquals(model.getItemWorkrates(container1), Collections.emptyMap());
+ assertEquals(model.getItemWorkrates(container2), ImmutableMap.of(item1, 123d));
+ assertEquals(model.getContainerWorkrates(), ImmutableMap.of(container1, 0d, container2, 123d));
+ assertEquals(model.getCurrentPoolWorkrate(), 123d);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyConcurrencyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyConcurrencyTest.java
new file mode 100644
index 0000000..5a3b328
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyConcurrencyTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+
+import com.google.common.collect.Lists;
+
+public class LoadBalancingPolicyConcurrencyTest extends AbstractLoadBalancingPolicyTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicyConcurrencyTest.class);
+
+ private static final double WORKRATE_JITTER = 2d;
+ private static final int NUM_CONTAINERS = 20;
+ private static final int WORKRATE_UPDATE_PERIOD_MS = 1000;
+
+ private ScheduledExecutorService scheduledExecutor;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void before() {
+ scheduledExecutor = Executors.newScheduledThreadPool(10);
+ super.before();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void after() {
+ if (scheduledExecutor != null) scheduledExecutor.shutdownNow();
+ super.after();
+ }
+
+ @Test
+ public void testSimplePeriodicWorkrateUpdates() {
+ List<MockItemEntity> items = Lists.newArrayList();
+ List<MockContainerEntity> containers = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ containers.add(newContainer(app, "container"+i, 10, 30));
+ }
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ newItemWithPeriodicWorkrates(app, containers.get(0), "item"+i, 20);
+ }
+
+ assertWorkratesEventually(containers, items, Collections.nCopies(NUM_CONTAINERS, 20d), WORKRATE_JITTER);
+ }
+
+ @Test
+ public void testConcurrentlyAddContainers() {
+ final Queue<MockContainerEntity> containers = new ConcurrentLinkedQueue<MockContainerEntity>();
+ final List<MockItemEntity> items = Lists.newArrayList();
+
+ containers.add(newContainer(app, "container-orig", 10, 30));
+
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ items.add(newItemWithPeriodicWorkrates(app, containers.iterator().next(), "item"+i, 20));
+ }
+ for (int i = 0; i < NUM_CONTAINERS-1; i++) {
+ final int index = i;
+ scheduledExecutor.submit(new Callable<Void>() {
+ @Override public Void call() {
+ containers.add(newContainer(app, "container"+index, 10, 30));
+ return null;
+ }});
+ }
+
+ assertWorkratesEventually(containers, items, Collections.nCopies(NUM_CONTAINERS, 20d), WORKRATE_JITTER);
+ }
+
+ @Test
+ public void testConcurrentlyAddItems() {
+ final Queue<MockItemEntity> items = new ConcurrentLinkedQueue<MockItemEntity>();
+ final List<MockContainerEntity> containers = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ containers.add(newContainer(app, "container"+i, 10, 30));
+ }
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ final int index = i;
+ scheduledExecutor.submit(new Callable<Void>() {
+ @Override public Void call() {
+ items.add(newItemWithPeriodicWorkrates(app, containers.get(0), "item"+index, 20));
+ return null;
+ }});
+ }
+ assertWorkratesEventually(containers, items, Collections.nCopies(NUM_CONTAINERS, 20d), WORKRATE_JITTER);
+ }
+
+ // TODO Got IndexOutOfBoundsException from containers.last()
+ @Test(groups="WIP", invocationCount=100)
+ public void testConcurrentlyRemoveContainers() {
+ List<MockItemEntity> items = Lists.newArrayList();
+ final List<MockContainerEntity> containers = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ containers.add(newContainer(app, "container"+i, 15, 45));
+ }
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ items.add(newItemWithPeriodicWorkrates(app, containers.get(i), "item"+i, 20));
+ }
+
+ final List<MockContainerEntity> containersToStop = Lists.newArrayList();
+ for (int i = 0; i < NUM_CONTAINERS/2; i++) {
+ containersToStop.add(containers.remove(0));
+ }
+ for (final MockContainerEntity containerToStop : containersToStop) {
+ scheduledExecutor.submit(new Callable<Void>() {
+ @Override public Void call() {
+ try {
+ containerToStop.offloadAndStop(containers.get(containers.size()-1));
+ Entities.unmanage(containerToStop);
+ } catch (Throwable t) {
+ LOG.error("Error stopping container "+containerToStop, t);
+ }
+ return null;
+ }});
+ }
+
+ assertWorkratesEventually(containers, items, Collections.nCopies((int)(NUM_CONTAINERS/2), 40d), WORKRATE_JITTER*2);
+ }
+
+ @Test(groups="WIP")
+ public void testConcurrentlyRemoveItems() {
+ List<MockItemEntity> items = Lists.newArrayList();
+ List<MockContainerEntity> containers = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ containers.add(newContainer(app, "container"+i, 15, 45));
+ }
+ for (int i = 0; i < NUM_CONTAINERS*2; i++) {
+ items.add(newItemWithPeriodicWorkrates(app, containers.get(i%NUM_CONTAINERS), "item"+i, 20));
+ }
+ // should now have item0 and item{0+NUM_CONTAINERS} on container0, etc
+
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ // not removing consecutive items as that would leave it balanced!
+ int indexToStop = (i < NUM_CONTAINERS/2) ? NUM_CONTAINERS : 0;
+ final MockItemEntity itemToStop = items.remove(indexToStop);
+ scheduledExecutor.submit(new Callable<Void>() {
+ @Override public Void call() {
+ try {
+ itemToStop.stop();
+ Entities.unmanage(itemToStop);
+ } catch (Throwable t) {
+ LOG.error("Error stopping item "+itemToStop, t);
+ }
+ return null;
+ }});
+ }
+
+ assertWorkratesEventually(containers, items, Collections.nCopies(NUM_CONTAINERS, 20d), WORKRATE_JITTER);
+ }
+
+ protected MockItemEntity newItemWithPeriodicWorkrates(TestApplication app, MockContainerEntity container, String name, double workrate) {
+ MockItemEntity item = newItem(app, container, name, workrate);
+ scheduleItemWorkrateUpdates(item, workrate, WORKRATE_JITTER);
+ return item;
+ }
+
+ private void scheduleItemWorkrateUpdates(final MockItemEntity item, final double workrate, final double jitter) {
+ final AtomicReference<Future<?>> futureRef = new AtomicReference<Future<?>>();
+ Future<?> future = scheduledExecutor.scheduleAtFixedRate(
+ new Runnable() {
+ @Override public void run() {
+ if (item.isStopped() && futureRef.get() != null) {
+ futureRef.get().cancel(true);
+ return;
+ }
+ double jitteredWorkrate = workrate + (random.nextDouble()*jitter*2 - jitter);
+ ((EntityLocal)item).setAttribute(TEST_METRIC, (int) Math.max(0, jitteredWorkrate));
+ }
+ },
+ 0, WORKRATE_UPDATE_PERIOD_MS, TimeUnit.MILLISECONDS);
+ futureRef.set(future);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicySoakTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicySoakTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicySoakTest.java
new file mode 100644
index 0000000..8da494c
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicySoakTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class LoadBalancingPolicySoakTest extends AbstractLoadBalancingPolicyTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicySoakTest.class);
+
+ private static final long TIMEOUT_MS = 40*1000;
+
+ @Test
+ public void testLoadBalancingQuickTest() {
+ RunConfig config = new RunConfig();
+ config.numCycles = 1;
+ config.numContainers = 5;
+ config.numItems = 5;
+ config.lowThreshold = 200;
+ config.highThreshold = 300;
+ config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold));
+
+ runLoadBalancingSoakTest(config);
+ }
+
+ @Test
+ public void testLoadBalancingManyItemsQuickTest() {
+ RunConfig config = new RunConfig();
+ config.numCycles = 1;
+ config.numContainers = 5;
+ config.numItems = 30;
+ config.lowThreshold = 200;
+ config.highThreshold = 300;
+ config.numContainerStopsPerCycle = 1;
+ config.numItemStopsPerCycle = 1;
+ config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold));
+
+ runLoadBalancingSoakTest(config);
+ }
+
+ @Test(groups={"Integration","Acceptance"}) // acceptance group, because it's slow to run many cycles
+ public void testLoadBalancingSoakTest() {
+ RunConfig config = new RunConfig();
+ config.numCycles = 100;
+ config.numContainers = 5;
+ config.numItems = 5;
+ config.lowThreshold = 200;
+ config.highThreshold = 300;
+ config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold));
+
+ runLoadBalancingSoakTest(config);
+ }
+
+ @Test(groups={"Integration","Acceptance"}) // acceptance group, because it's slow to run many cycles
+ public void testLoadBalancingManyItemsSoakTest() {
+ RunConfig config = new RunConfig();
+ config.numCycles = 100;
+ config.numContainers = 5;
+ config.numItems = 30;
+ config.lowThreshold = 200;
+ config.highThreshold = 300;
+ config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold));
+ config.numContainerStopsPerCycle = 3;
+ config.numItemStopsPerCycle = 10;
+
+ runLoadBalancingSoakTest(config);
+ }
+
+ @Test(groups={"Integration","Acceptance"})
+ public void testLoadBalancingManyManyItemsTest() {
+ RunConfig config = new RunConfig();
+ config.numCycles = 1;
+ config.numContainers = 5;
+ config.numItems = 1000;
+ config.lowThreshold = 2000;
+ config.highThreshold = 3000;
+ config.numContainerStopsPerCycle = 0;
+ config.numItemStopsPerCycle = 0;
+ config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold));
+ config.verbose = false;
+
+ runLoadBalancingSoakTest(config);
+ }
+
+ private void runLoadBalancingSoakTest(RunConfig config) {
+ final int numCycles = config.numCycles;
+ final int numContainers = config.numContainers;
+ final int numItems = config.numItems;
+ final double lowThreshold = config.lowThreshold;
+ final double highThreshold = config.highThreshold;
+ final int totalRate = config.totalRate;
+ final int numContainerStopsPerCycle = config.numContainerStopsPerCycle;
+ final int numItemStopsPerCycle = config.numItemStopsPerCycle;
+ final boolean verbose = config.verbose;
+
+ MockItemEntityImpl.totalMoveCount.set(0);
+
+ final List<MockContainerEntity> containers = new ArrayList<MockContainerEntity>();
+ final List<MockItemEntity> items = new ArrayList<MockItemEntity>();
+
+ for (int i = 1; i <= numContainers; i++) {
+ MockContainerEntity container = newContainer(app, "container-"+i, lowThreshold, highThreshold);
+ containers.add(container);
+ }
+ for (int i = 1; i <= numItems; i++) {
+ MockItemEntity item = newItem(app, containers.get(0), "item-"+i, 5);
+ items.add(item);
+ }
+
+ for (int i = 1; i <= numCycles; i++) {
+ LOG.info(LoadBalancingPolicySoakTest.class.getSimpleName()+": cycle "+i);
+
+ // Stop items, and start others
+ for (int j = 1; j <= numItemStopsPerCycle; j++) {
+ int itemIndex = random.nextInt(numItems);
+ MockItemEntity itemToStop = items.get(itemIndex);
+ itemToStop.stop();
+ LOG.debug("Unmanaging item {}", itemToStop);
+ Entities.unmanage(itemToStop);
+ items.set(itemIndex, newItem(app, containers.get(0), "item-"+(itemIndex+1)+"."+i+"."+j, 5));
+ }
+
+ // Repartition the load across the items
+ final List<Integer> itemRates = randomlyDivideLoad(numItems, totalRate, 0, (int)highThreshold);
+
+ for (int j = 0; j < numItems; j++) {
+ MockItemEntity item = items.get(j);
+ ((EntityLocal)item).setAttribute(MockItemEntity.TEST_METRIC, itemRates.get(j));
+ }
+
+ // Stop containers, and start others
+ for (int j = 1; j <= numContainerStopsPerCycle; j++) {
+ int containerIndex = random.nextInt(numContainers);
+ MockContainerEntity containerToStop = containers.get(containerIndex);
+ containerToStop.offloadAndStop(containers.get((containerIndex+1)%numContainers));
+ LOG.debug("Unmanaging container {}", containerToStop);
+ Entities.unmanage(containerToStop);
+
+ MockContainerEntity containerToAdd = newContainer(app, "container-"+(containerIndex+1)+"."+i+"."+j, lowThreshold, highThreshold);
+ containers.set(containerIndex, containerToAdd);
+ }
+
+ // Assert that the items become balanced again
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ @Override public void run() {
+ Iterable<Double> containerRates = Iterables.transform(containers, new Function<MockContainerEntity, Double>() {
+ @Override public Double apply(MockContainerEntity input) {
+ return (double) input.getWorkrate();
+ }});
+
+ String errMsg;
+ if (verbose) {
+ errMsg = verboseDumpToString(containers, items)+"; itemRates="+itemRates;
+ } else {
+ errMsg = containerRates+"; totalMoves="+MockItemEntityImpl.totalMoveCount;
+ }
+
+ // Check that haven't lost any items
+ // (as observed in one jenkins build failure: 2014-03-18; but that could also be
+ // explained by errMsg generated in the middle of a move)
+ List<Entity> itemsFromModel = Lists.newArrayList();
+ List<Entity> itemsFromContainers = Lists.newArrayList();
+ for (Entity container : model.getPoolContents()) {
+ itemsFromModel.addAll(model.getItemsForContainer(container));
+ }
+ for (MockContainerEntity container : containers) {
+ itemsFromContainers.addAll(container.getBalanceableItems());
+ }
+ Asserts.assertEqualsIgnoringOrder(itemsFromModel, items, true, errMsg);
+ Asserts.assertEqualsIgnoringOrder(itemsFromContainers, items, true, errMsg);
+
+ // Check overall container rates are balanced
+ assertEquals(sum(containerRates), sum(itemRates), errMsg);
+ for (double containerRate : containerRates) {
+ assertTrue(containerRate >= lowThreshold, errMsg);
+ assertTrue(containerRate <= highThreshold, errMsg);
+ }
+ }});
+ }
+ }
+
+ private static class RunConfig {
+ int numCycles = 1;
+ int numContainers = 5;
+ int numItems = 5;
+ double lowThreshold = 200;
+ double highThreshold = 300;
+ int totalRate = (int) (5*(0.95*highThreshold));
+ int numContainerStopsPerCycle = 1;
+ int numItemStopsPerCycle = 1;
+ boolean verbose = true;
+ }
+
+ // Testing conveniences.
+
+ private double sum(Iterable<? extends Number> vals) {
+ double total = 0;;
+ for (Number val : vals) {
+ total += val.doubleValue();
+ }
+ return total;
+ }
+
+ /**
+ * Distributes a given load across a number of items randomly. The variability in load for an item is dictated by the variance,
+ * but the total will always equal totalLoad.
+ *
+ * The distribution of load is skewed: one side of the list will have bigger values than the other.
+ * Which side is skewed will vary, so when balancing a policy will find that things have entirely changed.
+ *
+ * TODO This is not particularly good at distributing load, but it's random and skewed enough to force rebalancing.
+ */
+ private List<Integer> randomlyDivideLoad(int numItems, int totalLoad, int min, int max) {
+ List<Integer> result = new ArrayList<Integer>(numItems);
+ int totalRemaining = totalLoad;
+ int variance = 3;
+ int skew = 3;
+
+ for (int i = 0; i < numItems; i++) {
+ int itemsRemaining = numItems-i;
+ int itemFairShare = (totalRemaining/itemsRemaining);
+ double skewFactor = ((double)i/numItems)*2 - 1; // a number between -1 and 1, depending how far through the item set we are
+ int itemSkew = (int) (random.nextInt(skew)*skewFactor);
+ int itemLoad = itemFairShare + (random.nextInt(variance*2)-variance) + itemSkew;
+ itemLoad = Math.max(min, itemLoad);
+ itemLoad = Math.min(totalRemaining, itemLoad);
+ itemLoad = Math.min(max, itemLoad);
+ result.add(itemLoad);
+ totalRemaining -= itemLoad;
+ }
+
+ if (random.nextBoolean()) Collections.reverse(result);
+
+ assertTrue(sum(result) <= totalLoad, "totalLoad="+totalLoad+"; result="+result);
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyTest.java
new file mode 100644
index 0000000..a0166f9
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class LoadBalancingPolicyTest extends AbstractLoadBalancingPolicyTest {
+
+ // Expect no balancing to occur as container A isn't above the high threshold.
+ @Test
+ public void testNoopWhenWithinThresholds() {
+ MockContainerEntity containerA = newContainer(app, "A", 10, 100);
+ MockContainerEntity containerB = newContainer(app, "B", 20, 60);
+ MockItemEntity item1 = newItem(app, containerA, "1", 10);
+ MockItemEntity item2 = newItem(app, containerA, "2", 10);
+ MockItemEntity item3 = newItem(app, containerA, "3", 10);
+ MockItemEntity item4 = newItem(app, containerA, "4", 10);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4),
+ ImmutableList.of(40d, 0d));
+ }
+
+ @Test
+ public void testNoopWhenAlreadyBalanced() {
+ MockContainerEntity containerA = newContainer(app, "A", 20, 80);
+ MockContainerEntity containerB = newContainer(app, "B", 20, 80);
+ MockItemEntity item1 = newItem(app, containerA, "1", 10);
+ MockItemEntity item2 = newItem(app, containerA, "2", 30);
+ MockItemEntity item3 = newItem(app, containerB, "3", 20);
+ MockItemEntity item4 = newItem(app, containerB, "4", 20);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4),
+ ImmutableList.of(40d, 40d));
+ assertEquals(containerA.getBalanceableItems(), ImmutableSet.of(item1, item2));
+ assertEquals(containerB.getBalanceableItems(), ImmutableSet.of(item3, item4));
+ }
+
+ // Expect 20 units of workload to be migrated from hot container (A) to cold (B).
+ @Test
+ public void testSimpleBalancing() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 25);
+ MockContainerEntity containerB = newContainer(app, "B", 20, 60);
+ MockItemEntity item1 = newItem(app, containerA, "1", 10);
+ MockItemEntity item2 = newItem(app, containerA, "2", 10);
+ MockItemEntity item3 = newItem(app, containerA, "3", 10);
+ MockItemEntity item4 = newItem(app, containerA, "4", 10);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4),
+ ImmutableList.of(20d, 20d));
+ }
+
+ @Test
+ public void testSimpleBalancing2() {
+ MockContainerEntity containerA = newContainer(app, "A", 20, 40);
+ MockContainerEntity containerB = newContainer(app, "B", 20, 40);
+ MockItemEntity item1 = newItem(app, containerA, "1", 0);
+ MockItemEntity item2 = newItem(app, containerB, "2", 40);
+ MockItemEntity item3 = newItem(app, containerB, "3", 20);
+ MockItemEntity item4 = newItem(app, containerB, "4", 20);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4),
+ ImmutableList.of(40d, 40d));
+ }
+
+// @Test
+// public void testAdjustedItemNotMoved() {
+// MockBalancingModel pool = new MockBalancingModel(
+// containers(
+// containerA, 20, 50,
+// containerB, 20, 50),
+// items(
+// "item1", containerA, 0,
+// "item2", containerB, -40,
+// "item3", containerB, 20,
+// "item4", containerB, 20)
+// );
+//
+// BalancingStrategy<String, String> policy = new BalancingStrategy<String, String>("Test", pool);
+// policy.rebalance();
+//
+// assertEquals((Object)pool.getItemsForContainer(containerA), ImmutableSet.of("item1", "item3", "item4"), pool.itemDistributionToString());
+// assertEquals((Object)pool.getItemsForContainer(containerB), ImmutableSet.of("item2"), pool.itemDistributionToString());
+// }
+
+ @Test
+ public void testMultiMoveBalancing() {
+ MockContainerEntity containerA = newContainer(app, "A", 20, 50);
+ MockContainerEntity containerB = newContainer(app, "B", 20, 50);
+ MockItemEntity item1 = newItem(app, containerA, "1", 10);
+ MockItemEntity item2 = newItem(app, containerA, "2", 10);
+ MockItemEntity item3 = newItem(app, containerA, "3", 10);
+ MockItemEntity item4 = newItem(app, containerA, "4", 10);
+ MockItemEntity item5 = newItem(app, containerA, "5", 10);
+ MockItemEntity item6 = newItem(app, containerA, "6", 10);
+ MockItemEntity item7 = newItem(app, containerA, "7", 10);
+ MockItemEntity item8 = newItem(app, containerA, "8", 10);
+ MockItemEntity item9 = newItem(app, containerA, "9", 10);
+ MockItemEntity item10 = newItem(app, containerA, "10", 10);
+
+ // non-deterministic which items will be moved; but can assert how many (given they all have same workrate)
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4, item5, item6, item7, item8, item9, item10),
+ ImmutableList.of(50d, 50d));
+ assertEquals(containerA.getBalanceableItems().size(), 5);
+ assertEquals(containerB.getBalanceableItems().size(), 5);
+ }
+
+ @Test
+ public void testRebalanceWhenWorkratesChange() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 50);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 50);
+ MockItemEntity item1 = newItem(app, containerA, "1", 0);
+ MockItemEntity item2 = newItem(app, containerA, "2", 0);
+
+ ((EntityLocal)item1).setAttribute(MockItemEntity.TEST_METRIC, 40);
+ ((EntityLocal)item2).setAttribute(MockItemEntity.TEST_METRIC, 40);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2),
+ ImmutableList.of(40d, 40d));
+ }
+
+ // Expect no balancing to occur in hot pool (2 containers over-threshold at 40).
+ // On addition of new container, expect hot containers to offload 10 each.
+ @Test
+ public void testAddContainerWhenHot() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 30);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 30);
+ MockItemEntity item1 = newItem(app, containerA, "1", 10);
+ MockItemEntity item2 = newItem(app, containerA, "2", 10);
+ MockItemEntity item3 = newItem(app, containerA, "3", 10);
+ MockItemEntity item4 = newItem(app, containerA, "4", 10);
+ MockItemEntity item5 = newItem(app, containerB, "5", 10);
+ MockItemEntity item6 = newItem(app, containerB, "6", 10);
+ MockItemEntity item7 = newItem(app, containerB, "7", 10);
+ MockItemEntity item8 = newItem(app, containerB, "8", 10);
+ // Both containers are over-threshold at this point; should not rebalance.
+
+ MockContainerEntity containerC = newAsyncContainer(app, "C", 10, 30, CONTAINER_STARTUP_DELAY_MS);
+ // New container allows hot ones to offload work.
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB, containerC),
+ ImmutableList.of(item1, item2, item3, item4, item5, item6, item7, item8),
+ ImmutableList.of(30d, 30d, 20d));
+ }
+
+ // On addition of new container, expect no rebalancing to occur as no existing container is hot.
+ @Test
+ public void testAddContainerWhenCold() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 50);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 50);
+ MockItemEntity item1 = newItem(app, containerA, "1", 10);
+ MockItemEntity item2 = newItem(app, containerA, "2", 10);
+ MockItemEntity item3 = newItem(app, containerA, "3", 10);
+ MockItemEntity item4 = newItem(app, containerA, "4", 10);
+ MockItemEntity item5 = newItem(app, containerB, "5", 10);
+ MockItemEntity item6 = newItem(app, containerB, "6", 10);
+ MockItemEntity item7 = newItem(app, containerB, "7", 10);
+ MockItemEntity item8 = newItem(app, containerB, "8", 10);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4, item5, item6, item7, item8),
+ ImmutableList.of(40d, 40d));
+
+ MockContainerEntity containerC = newAsyncContainer(app, "C", 10, 50, CONTAINER_STARTUP_DELAY_MS);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB, containerC),
+ ImmutableList.of(item1, item2, item3, item4, item5, item6, item7, item8),
+ ImmutableList.of(40d, 40d, 0d));
+ }
+
+ // Expect no balancing to occur in cool pool (2 containers under-threshold at 30).
+ // On addition of new item, expect over-threshold container (A) to offload 20 to B.
+ @Test
+ public void testAddItem() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 50);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 50);
+ MockItemEntity item1 = newItem(app, containerA, "1", 10);
+ MockItemEntity item2 = newItem(app, containerA, "2", 10);
+ MockItemEntity item3 = newItem(app, containerA, "3", 10);
+ MockItemEntity item4 = newItem(app, containerB, "4", 10);
+ MockItemEntity item5 = newItem(app, containerB, "5", 10);
+ MockItemEntity item6 = newItem(app, containerB, "6", 10);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4, item5, item6),
+ ImmutableList.of(30d, 30d));
+
+ newItem(app, containerA, "7", 40);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4, item5, item6),
+ ImmutableList.of(50d, 50d));
+ }
+
+ // FIXME Failed in build repeatedly (e.g. #1035), but couldn't reproduce locally yet with invocationCount=100
+ @Test(groups="WIP")
+ public void testRemoveContainerCausesRebalancing() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 30);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 30);
+ MockContainerEntity containerC = newContainer(app, "C", 10, 30);
+ MockItemEntity item1 = newItem(app, containerA, "1", 10);
+ MockItemEntity item2 = newItem(app, containerA, "2", 10);
+ MockItemEntity item3 = newItem(app, containerB, "3", 10);
+ MockItemEntity item4 = newItem(app, containerB, "4", 10);
+ MockItemEntity item5 = newItem(app, containerC, "5", 10);
+ MockItemEntity item6 = newItem(app, containerC, "6", 10);
+
+ Entities.unmanage(containerC);
+ item5.move(containerA);
+ item6.move(containerA);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4, item5, item6),
+ ImmutableList.of(30d, 30d));
+ }
+
+ @Test
+ public void testRemoveItemCausesRebalancing() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 30);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 30);
+ MockItemEntity item1 = newItem(app, containerA, "1", 30);
+ MockItemEntity item2 = newItem(app, containerB, "2", 20);
+ MockItemEntity item3 = newItem(app, containerB, "3", 20);
+
+ item1.stop();
+ Entities.unmanage(item1);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3),
+ ImmutableList.of(20d, 20d));
+ }
+
+ @Test
+ public void testRebalancesAfterManualMove() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 50);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 50);
+ MockItemEntity item1 = newItem(app, containerA, "1", 20);
+ MockItemEntity item2 = newItem(app, containerA, "2", 20);
+ MockItemEntity item3 = newItem(app, containerB, "3", 20);
+ MockItemEntity item4 = newItem(app, containerB, "4", 20);
+
+ // Move everything onto containerA, and expect it to be automatically re-balanced
+ item3.move(containerA);
+ item4.move(containerA);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3, item4),
+ ImmutableList.of(40d, 40d));
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testModelIncludesItemsAndContainersStartedBeforePolicyCreated() {
+ pool.removePolicy(policy);
+ policy.destroy();
+
+ // Set-up containers and items.
+ final MockContainerEntity containerA = newContainer(app, "A", 10, 100);
+ newItem(app, containerA, "1", 10);
+
+ policy = new LoadBalancingPolicy(MutableMap.of(), TEST_METRIC, model);
+ pool.addPolicy(policy);
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ public void run() {
+ assertEquals(model.getContainerWorkrates(), ImmutableMap.of(containerA, 10d));
+ }
+ });
+ }
+
+ @Test
+ public void testLockedItemsNotMoved() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 50);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 50);
+ MockItemEntity item1 = newLockedItem(app, containerA, "1", 40);
+ MockItemEntity item2 = newLockedItem(app, containerA, "2", 40);
+
+ assertWorkratesContinually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2),
+ ImmutableList.of(80d, 0d));
+ }
+
+ @Test
+ public void testLockedItemsContributeToOverloadedMeasurements() {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 50);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 50);
+ MockItemEntity item1 = newLockedItem(app, containerA, "1", 40);
+ MockItemEntity item2 = newItem(app, containerA, "2", 25);
+ MockItemEntity item3 = newItem(app, containerA, "3", 25);
+
+ assertWorkratesEventually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3),
+ ImmutableList.of(40d, 50d));
+ }
+
+ @Test
+ public void testOverloadedLockedItemsPreventMoreWorkEnteringContainer() throws Exception {
+ // Set-up containers and items.
+ MockContainerEntity containerA = newContainer(app, "A", 10, 50);
+ MockContainerEntity containerB = newContainer(app, "B", 10, 50);
+ MockItemEntity item1 = newLockedItem(app, containerA, "1", 50);
+ Thread.sleep(1); // increase chances of item1's workrate having been received first
+ MockItemEntity item2 = newItem(app, containerB, "2", 30);
+ MockItemEntity item3 = newItem(app, containerB, "3", 30);
+
+ assertWorkratesContinually(
+ ImmutableList.of(containerA, containerB),
+ ImmutableList.of(item1, item2, item3),
+ ImmutableList.of(50d, 60d));
+ }
+
+ @Test
+ public void testPolicyUpdatesModel() {
+ final MockContainerEntity containerA = newContainer(app, "A", 10, 20);
+ final MockContainerEntity containerB = newContainer(app, "B", 11, 21);
+ final MockItemEntity item1 = newItem(app, containerA, "1", 12);
+ final MockItemEntity item2 = newItem(app, containerB, "2", 13);
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ public void run() {
+ assertEquals(model.getPoolSize(), 2);
+ assertEquals(model.getPoolContents(), ImmutableSet.of(containerA, containerB));
+ assertEquals(model.getItemWorkrate(item1), 12d);
+ assertEquals(model.getItemWorkrate(item2), 13d);
+
+ assertEquals(model.getParentContainer(item1), containerA);
+ assertEquals(model.getParentContainer(item2), containerB);
+ assertEquals(model.getContainerWorkrates(), ImmutableMap.of(containerA, 12d, containerB, 13d));
+
+ assertEquals(model.getPoolLowThreshold(), 10+11d);
+ assertEquals(model.getPoolHighThreshold(), 20+21d);
+ assertEquals(model.getCurrentPoolWorkrate(), 12+13d);
+ assertFalse(model.isHot());
+ assertFalse(model.isCold());
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntity.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntity.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntity.java
new file mode 100644
index 0000000..1937d9a
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntity.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.Effector;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.annotation.EffectorParam;
+import brooklyn.entity.basic.AbstractGroup;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.basic.BasicConfigKey;
+
+@ImplementedBy(MockContainerEntityImpl.class)
+public interface MockContainerEntity extends AbstractGroup, BalanceableContainer<Movable>, Startable {
+
+ @SetFromFlag("membership")
+ public static final ConfigKey<String> MOCK_MEMBERSHIP = new BasicConfigKey<String>(
+ String.class, "mock.container.membership", "For testing ItemsInContainersGroup");
+
+ @SetFromFlag("delay")
+ public static final ConfigKey<Long> DELAY = new BasicConfigKey<Long>(
+ Long.class, "mock.container.delay", "", 0L);
+
+ public static final Effector<Void> OFFLOAD_AND_STOP = new MethodEffector<Void>(MockContainerEntity.class, "offloadAndStop");
+
+ public void lock();
+
+ public void unlock();
+
+ public int getWorkrate();
+
+ public Map<Entity, Double> getItemUsage();
+
+ public void addItem(Entity item);
+
+ public void removeItem(Entity item);
+
+ public void offloadAndStop(@EffectorParam(name="otherContianer") MockContainerEntity otherContainer);
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntityImpl.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntityImpl.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntityImpl.java
new file mode 100644
index 0000000..de9da37
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntityImpl.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.api.location.Location;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.AbstractGroupImpl;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+
+public class MockContainerEntityImpl extends AbstractGroupImpl implements MockContainerEntity {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MockContainerEntity.class);
+
+ volatile boolean offloading;
+ volatile boolean running;
+
+ ReentrantLock _lock = new ReentrantLock();
+
+ @Override
+ public <T> T setAttribute(AttributeSensor<T> attribute, T val) {
+ if (LOG.isDebugEnabled()) LOG.debug("Mocks: container {} setting {} to {}", new Object[] {this, attribute, val});
+ return super.setAttribute(attribute, val);
+ }
+
+ @Override
+ public void lock() {
+ _lock.lock();
+ if (!running) {
+ _lock.unlock();
+ throw new IllegalStateException("Container lock "+this+"; it is not running");
+ }
+ }
+
+ @Override
+ public void unlock() {
+ _lock.unlock();
+ }
+
+ @Override
+ public int getWorkrate() {
+ int result = 0;
+ for (Entity member : getMembers()) {
+ Integer memberMetric = member.getAttribute(MockItemEntity.TEST_METRIC);
+ result += ((memberMetric != null) ? memberMetric : 0);
+ }
+ return result;
+ }
+
+ @Override
+ public Map<Entity, Double> getItemUsage() {
+ Map<Entity, Double> result = Maps.newLinkedHashMap();
+ for (Entity member : getMembers()) {
+ Map<Entity, Double> memberItemUsage = member.getAttribute(MockItemEntity.ITEM_USAGE_METRIC);
+ if (memberItemUsage != null) {
+ for (Map.Entry<Entity, Double> entry : memberItemUsage.entrySet()) {
+ double val = (result.containsKey(entry.getKey()) ? result.get(entry.getKey()) : 0d);
+ val += ((entry.getValue() != null) ? entry.getValue() : 0);
+ result.put(entry.getKey(), val);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void addItem(Entity item) {
+ if (LOG.isDebugEnabled()) LOG.debug("Mocks: adding item {} to container {}", item, this);
+ if (!running || offloading) throw new IllegalStateException("Container "+getDisplayName()+" is not running; cannot add item "+item);
+ addMember(item);
+ emit(ITEM_ADDED, item);
+ }
+
+ @Override
+ public void removeItem(Entity item) {
+ if (LOG.isDebugEnabled()) LOG.debug("Mocks: removing item {} from container {}", item, this);
+ if (!running) throw new IllegalStateException("Container "+getDisplayName()+" is not running; cannot remove item "+item);
+ removeMember(item);
+ emit(ITEM_REMOVED, item);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public Set<Movable> getBalanceableItems() {
+ return (Set) Sets.newLinkedHashSet(getMembers());
+ }
+
+ public String toString() {
+ return "MockContainer["+getDisplayName()+"]";
+ }
+
+ private long getDelay() {
+ return getConfig(DELAY);
+ }
+
+ @Override
+ public void start(Collection<? extends Location> locs) {
+ if (LOG.isDebugEnabled()) LOG.debug("Mocks: starting container {}", this);
+ _lock.lock();
+ try {
+ Time.sleep(getDelay());
+ running = true;
+ addLocations(locs);
+ emit(Attributes.LOCATION_CHANGED, null);
+ setAttribute(SERVICE_UP, true);
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (LOG.isDebugEnabled()) LOG.debug("Mocks: stopping container {}", this);
+ _lock.lock();
+ try {
+ running = false;
+ Time.sleep(getDelay());
+ setAttribute(SERVICE_UP, false);
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ private void stopWithoutLock() {
+ running = false;
+ Time.sleep(getDelay());
+ setAttribute(SERVICE_UP, false);
+ }
+
+ public void offloadAndStop(final MockContainerEntity otherContainer) {
+ if (LOG.isDebugEnabled()) LOG.debug("Mocks: offloading container {} to {} (items {})", new Object[] {this, otherContainer, getBalanceableItems()});
+ runWithLock(ImmutableList.of(this, otherContainer), new Runnable() {
+ public void run() {
+ offloading = false;
+ for (Movable item : getBalanceableItems()) {
+ ((MockItemEntity)item).moveNonEffector(otherContainer);
+ }
+ if (LOG.isDebugEnabled()) LOG.debug("Mocks: stopping offloaded container {}", this);
+ stopWithoutLock();
+ }});
+ }
+
+ @Override
+ public void restart() {
+ if (LOG.isDebugEnabled()) LOG.debug("Mocks: restarting {}", this);
+ throw new UnsupportedOperationException();
+ }
+
+ public static void runWithLock(List<MockContainerEntity> entitiesToLock, Runnable r) {
+ List<MockContainerEntity> entitiesToLockCopy = MutableList.copyOf(Iterables.filter(entitiesToLock, Predicates.notNull()));
+ List<MockContainerEntity> entitiesLocked = Lists.newArrayList();
+ Collections.sort(entitiesToLockCopy, new Comparator<MockContainerEntity>() {
+ public int compare(MockContainerEntity o1, MockContainerEntity o2) {
+ return o1.getId().compareTo(o2.getId());
+ }});
+
+ try {
+ for (MockContainerEntity it : entitiesToLockCopy) {
+ it.lock();
+ entitiesLocked.add(it);
+ }
+
+ r.run();
+
+ } finally {
+ for (MockContainerEntity it : entitiesLocked) {
+ it.unlock();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockItemEntity.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockItemEntity.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockItemEntity.java
new file mode 100644
index 0000000..83a54c0
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockItemEntity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.loadbalancing;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+
+import brooklyn.event.basic.Sensors;
+
+import com.google.common.reflect.TypeToken;
+
+@ImplementedBy(MockItemEntityImpl.class)
+public interface MockItemEntity extends Entity, Movable {
+
+ public static final AttributeSensor<Integer> TEST_METRIC = Sensors.newIntegerSensor(
+ "test.metric", "Dummy workrate for test entities");
+
+ @SuppressWarnings("serial")
+ public static final AttributeSensor<Map<Entity, Double>> ITEM_USAGE_METRIC = Sensors.newSensor(
+ new TypeToken<Map<Entity, Double>>() {}, "test.itemUsage.metric", "Dummy item usage for test entities");
+
+ public boolean isStopped();
+
+ public void moveNonEffector(Entity rawDestination);
+
+ public void stop();
+}