You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2016/10/18 12:43:31 UTC
[5/8] brooklyn-server git commit:
InvokeEffectorOnCollectionSensorChange
InvokeEffectorOnCollectionSensorChange
Tracks a collection and invokes effectors when elements are added and
removed.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/414f881a
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/414f881a
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/414f881a
Branch: refs/heads/master
Commit: 414f881a68920f3781aab40f4d0a9f8ad43e2c9a
Parents: 2797e0c
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Tue Oct 18 12:12:05 2016 +0100
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Tue Oct 18 12:12:05 2016 +0100
----------------------------------------------------------------------
.../InvokeEffectorOnCollectionSensorChange.java | 197 ++++++++++++++
...ectorOnCollectionSensorChangeRebindTest.java | 93 +++++++
...okeEffectorOnCollectionSensorChangeTest.java | 270 +++++++++++++++++++
3 files changed, 560 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/414f881a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
new file mode 100644
index 0000000..fad9be6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Subscribes to events on a collection {@link AttributeSensor} and invokes the named
+ * effectors for each element that was added and removed.
+ * <p>
+ * The policy only detects <em>replacements</em> of the collection; it does not act on
+ * modifications. If the sensor has value <code>A</code> and an element is added –
+ * value <code>A'</code> – the on-added effector is not invoked. If the sensor is
+ * later set to <code>B</code> the delta is made between <code>A</code> and <code>B</code>,
+ * not <code>A'</code> and <code>B</code>.
+ * <p>
+ * To simplify the detection of additions and removals the collection is converted to a
+ * {@link Set}. This means that only a single event will fire for duplicate elements in
+ * the collection. Null values for the sensor are considered an empty collection.
+ * <p>
+ * The effectors are provided the elements that changed in their parameter map. If the
+ * sensor is a collection of maps the elements are provided with their keys coerced to
+ * strings and their values unchanged. Otherwise the elements are provided in a
+ * single-entry map keyed by the value for {@link #PARAMETER_NAME}.
+ * <p>
+ * Effectors are asynchronous. If elements are added and removed in quick succession
+ * there are no guarantees that the `onAdded' task will have finished before the
+ * corresponding `onRemoved' task is invoked.
+ */
+public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy implements SensorEventListener<Collection<?>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnCollectionSensorChange.class);
+
+ public static final ConfigKey<AttributeSensor<? extends Collection<?>>> TRIGGER_SENSOR = ConfigKeys.newConfigKey(
+ new TypeToken<AttributeSensor<? extends Collection<?>>>() {},
+ "sensor",
+ "Sensor to be monitored.");
+
+ public static final ConfigKey<String> ON_ADDED_EFFECTOR_NAME = ConfigKeys.newStringConfigKey(
+ "onAdded",
+ "Name of the effector to invoke when entries are added to the collection.");
+
+ public static final ConfigKey<String> ON_REMOVED_EFFECTOR_NAME = ConfigKeys.newStringConfigKey(
+ "onRemoved",
+ "Name of the effector to invoke when entries are removed from the collection.");
+
+ public static final ConfigKey<String> PARAMETER_NAME = ConfigKeys.newStringConfigKey(
+ "parameterName",
+ "The name of the parameter to supply to the effectors",
+ "value");
+
+ /** The previous version of the set against which events will be compared. */
+ private Set<Object> previous = Collections.emptySet();
+
+ /** Guards accesses of previous. */
+ private final Object[] updateLock = new Object[0];
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ /*
+ * A warning to future modifiers of this method: it is called on rebind
+ * so any changes must be compatible with existing persisted state.
+ */
+ super.setEntity(entity);
+ Sensor<? extends Collection<?>> sensor =
+ checkNotNull(getConfig(TRIGGER_SENSOR), "Value required for " + TRIGGER_SENSOR.getName());
+
+ checkArgument(Strings.isNonBlank(getConfig(PARAMETER_NAME)), "Value required for " + PARAMETER_NAME.getName());
+
+ // Fail straight away if neither effector is found.
+ if (getEffector(getOnAddedEffector()).isAbsentOrNull() &&
+ getEffector(getOnRemovedEffector()).isAbsentOrNull()) {
+ throw new IllegalArgumentException("Value required for one or both of " + ON_ADDED_EFFECTOR_NAME.getName() +
+ " and " + ON_REMOVED_EFFECTOR_NAME.getName());
+ }
+
+ // Initialise `present` before subscribing.
+ Collection<?> current = entity.sensors().get(getTriggerSensor());
+ synchronized (updateLock) {
+ previous = (current != null) ? new HashSet<>(current) : Collections.emptySet();
+ }
+ subscriptions().subscribe(entity, sensor, this);
+ }
+
+ @Override
+ public void onEvent(SensorEvent<Collection<?>> event) {
+ final Set<Object> newValue = event.getValue() != null
+ ? new LinkedHashSet<>(event.getValue())
+ : ImmutableSet.of();
+ final Set<Object> added = new LinkedHashSet<>(), removed = new LinkedHashSet<>();
+ // It's only necessary to hold updateLock just to calculate the difference but
+ // it is useful to guarantee that all the effectors are queued before the next
+ // event is handled.
+ synchronized (updateLock) {
+ // Not using .immutableCopy() in case either set contains `null`.
+ Sets.difference(newValue, previous).copyInto(added);
+ Sets.difference(previous, newValue).copyInto(removed);
+ for (Object o : added) {
+ onAdded(o);
+ }
+ for (Object o : removed) {
+ onRemoved(o);
+ }
+ this.previous = Collections.unmodifiableSet(newValue);
+ }
+ }
+
+ private void onAdded(Object newElement) {
+ onEvent(getOnAddedEffector(), newElement);
+ }
+
+ private void onRemoved(Object newElement) {
+ onEvent(getOnRemovedEffector(), newElement);
+ }
+
+ private void onEvent(String effectorName, Object parameter) {
+ Maybe<Effector<?>> effector = getEffector(effectorName);
+ if (effector.isPresentAndNonNull()) {
+ final Map<String, Object> parameters;
+ if (parameter instanceof Map) {
+ Map<?, ?> param = (Map) parameter;
+ parameters = MutableMap.of();
+ for (Map.Entry<?, ?> entry : param.entrySet()) {
+ String key = TypeCoercions.coerce(entry.getKey(), String.class);
+ parameters.put(key, entry.getValue());
+ }
+ } else {
+ parameters = MutableMap.of(getConfig(PARAMETER_NAME), parameter);
+ }
+
+ LOG.debug("{} invoking {} on {} with parameters {}", new Object[]{this, effector, entity, parameters});
+ entity.invoke(effector.get(), parameters);
+ }
+ }
+
+ private Maybe<Effector<?>> getEffector(String name) {
+ return entity.getEntityType().getEffectorByName(name);
+ }
+
+ private String getOnAddedEffector() {
+ return getConfig(ON_ADDED_EFFECTOR_NAME);
+ }
+
+ private String getOnRemovedEffector() {
+ return getConfig(ON_REMOVED_EFFECTOR_NAME);
+ }
+
+ private AttributeSensor<? extends Collection<?>> getTriggerSensor() {
+ return getConfig(TRIGGER_SENSOR);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/414f881a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeRebindTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeRebindTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeRebindTest.java
new file mode 100644
index 0000000..e4311d6
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeRebindTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.AddEffector;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+public class InvokeEffectorOnCollectionSensorChangeRebindTest extends RebindTestFixtureWithApp {
+
+ private static final AttributeSensor<Collection<Integer>> SENSOR = Sensors.newSensor(new TypeToken<Collection<Integer>>() {},
+ "invokeeffectoronsetchangerebindtest.sensor");
+
+ private static final AttributeSensor<Collection<Object>> REMOVED_EFFECTOR_VALUES = Sensors.newSensor(new TypeToken<Collection<Object>>() {},
+ "invokeeffectoronsetchangerebindtest.removedvalues");
+
+ @Test
+ public void testEffectorMaintainsPreviousCollectionThroughRebind() throws Exception {
+ final Set<Integer> input1 = ImmutableSet.of(1, 2);
+ final Set<Integer> input2 = ImmutableSet.of(2, 3);
+ final Set<Integer> input3 = ImmutableSet.of(3, 4);
+
+ Entity testEntity = app().createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class)
+ .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, SENSOR)
+ .configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, "on-removed-effector"))
+ .addInitializer(new AddEffector(Effectors.effector(Void.class, "on-removed-effector")
+ .impl(new PublishingEffector())
+ .build())));
+ testEntity.sensors().set(SENSOR, input1);
+ testEntity.sensors().set(SENSOR, input2);
+ EntityAsserts.assertAttributeEqualsEventually(testEntity, REMOVED_EFFECTOR_VALUES, ImmutableSet.<Object>of(1));
+
+ newApp = rebind();
+
+ testEntity = Iterables.getOnlyElement(newApp.getChildren());
+ testEntity.sensors().set(SENSOR, input3);
+ EntityAsserts.assertAttributeEqualsEventually(testEntity, REMOVED_EFFECTOR_VALUES, ImmutableSet.<Object>of(1, 2));
+ }
+
+
+ private static class PublishingEffector extends EffectorBody<Void> {
+ @Override
+ public Void call(ConfigBag parameters) {
+ synchronized (PublishingEffector.class) {
+ Collection<Object> values = entity().sensors().get(REMOVED_EFFECTOR_VALUES);
+ if (values == null) {
+ values = Sets.newHashSet();
+ }
+ final Object v = parameters.getStringKey("value");
+ values.add(v);
+ entity().sensors().set(REMOVED_EFFECTOR_VALUES, values);
+ return null;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/414f881a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
new file mode 100644
index 0000000..ab72f71
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import static org.apache.brooklyn.policy.InvokeEffectorOnCollectionSensorChange.PARAMETER_NAME;
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.AddEffector;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.CollectionFunctionals;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.reflect.TypeToken;
+
+public class InvokeEffectorOnCollectionSensorChangeTest extends BrooklynAppUnitTestSupport {
+
+ private static final AttributeSensor<Collection<Integer>> DEFAULT_SENSOR = Sensors.newSensor(new TypeToken<Collection<Integer>>() {},
+ "invokeeffectoronsetchangetest.sensor");
+
+ LinkedBlockingQueue<ConfigBag> onAddedParameters;
+ LinkedBlockingQueue<ConfigBag> onRemovedParameters;
+ Effector<Void> onAddedEffector;
+ Effector<Void> onRemovedEffector;
+ TestEntity testEntity;
+
+ @Override
+ @BeforeMethod
+ public void setUp() throws Exception {
+ super.setUp();
+ onAddedParameters = new LinkedBlockingQueue<>();
+ onRemovedParameters = new LinkedBlockingQueue<>();
+ onAddedEffector = Effectors.effector(Void.class, "on-added-effector")
+ .impl(new RecordingEffector(onAddedParameters))
+ .build();
+ onRemovedEffector = Effectors.effector(Void.class, "on-removed-effector")
+ .impl(new RecordingEffector(onRemovedParameters))
+ .build();
+ testEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .addInitializer(new AddEffector(onAddedEffector))
+ .addInitializer(new AddEffector(onRemovedEffector)));
+ }
+
+ @Test
+ public void testOnAddedEffectorCalledWhenItemsAdded() throws Exception {
+ addSetChangePolicy(true, false);
+ final Set<Integer> values = ImmutableSet.of(1);
+ testEntity.sensors().set(DEFAULT_SENSOR, values);
+ ConfigBag params = onAddedParameters.poll(10, TimeUnit.SECONDS);
+ assertEquals(params.getStringKey(PARAMETER_NAME.getDefaultValue()), 1);
+ }
+
+ @Test
+ public void testOnRemovedEffectorNotCalledWhenItemsAdded() throws Exception {
+ addSetChangePolicy(false, true);
+ final Set<Integer> values = ImmutableSet.of(1);
+ testEntity.sensors().set(DEFAULT_SENSOR, values);
+ Asserts.continually(CollectionFunctionals.sizeSupplier(onRemovedParameters), Predicates.equalTo(0));
+ }
+
+ @Test
+ public void testOnRemovedEffectorCalledWhenItemRemoved() throws Exception {
+ testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1, 2));
+ addSetChangePolicy(false, true);
+ final Set<Integer> values = ImmutableSet.of(1);
+ testEntity.sensors().set(DEFAULT_SENSOR, values);
+ ConfigBag params = onRemovedParameters.poll(10, TimeUnit.SECONDS);
+ assertEquals(params.getStringKey(PARAMETER_NAME.getDefaultValue()), 2);
+ }
+
+ @Test
+ public void testOnAddedEffectorNotCalledWhenItemRemoved() throws Exception {
+ testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1));
+ addSetChangePolicy(true, false);
+ testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.<Integer>of());
+ Asserts.continually(CollectionFunctionals.sizeSupplier(onRemovedParameters), Predicates.equalTo(0));
+ }
+
+ @Test
+ public void testSeveralItemsAddedAndRemovedAtOnce() throws Exception {
+ testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1, 2, 3));
+ addSetChangePolicy(true, true);
+ testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(3, 4, 5));
+
+ // 1 and 2 were removed, 4 and 5 were added.
+ Asserts.eventually(new ConfigBagValueKeySupplier(onRemovedParameters),
+ Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(1, 2)));
+ Asserts.eventually(new ConfigBagValueKeySupplier(onAddedParameters),
+ Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(4, 5)));
+ }
+
+ @Test
+ public void testNothingHappensWhenSensorRepublishedUnchanged() {
+ final ImmutableSet<Integer> input1 = ImmutableSet.of(1, 2, 3);
+ testEntity.sensors().set(DEFAULT_SENSOR, input1);
+ addSetChangePolicy(true, true);
+ testEntity.sensors().set(DEFAULT_SENSOR, input1);
+ // Neither effector should be invoked.
+ Asserts.continually(CollectionFunctionals.sizeSupplier(onAddedParameters), Predicates.equalTo(0));
+ Asserts.continually(CollectionFunctionals.sizeSupplier(onRemovedParameters), Predicates.equalTo(0));
+ }
+
+ @Test
+ public void testCollectionsAreConvertedToSets() {
+ final List<Integer> input1 = ImmutableList.of(
+ 1, 1,
+ 2, 3, 4, 5,
+ 2, 3, 4, 5);
+ final List<Integer> input2 = ImmutableList.of(6, 5, 4, 3, 3);
+
+ addSetChangePolicy(true, true);
+
+ testEntity.sensors().set(DEFAULT_SENSOR, input1);
+ Asserts.eventually(new ConfigBagValueKeySupplier(onAddedParameters),
+ Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(1, 2, 3, 4, 5)));
+ Asserts.continually(CollectionFunctionals.sizeSupplier(onRemovedParameters), Predicates.equalTo(0));
+
+ onAddedParameters.clear();
+
+ testEntity.sensors().set(DEFAULT_SENSOR, input2);
+ Asserts.eventually(new ConfigBagValueKeySupplier(onAddedParameters),
+ Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(6)));
+ Asserts.eventually(new ConfigBagValueKeySupplier(onRemovedParameters),
+ Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(1, 2)));
+ }
+
+ @Test
+ public void testMapValueUsedAsArgumentDirectly() {
+ AttributeSensor<Collection<Map<String, String>>> sensor = Sensors.newSensor(new TypeToken<Collection<Map<String, String>>>() {},
+ "testMapValueUsedAsArgumentDirectly");
+ final Set<Map<String, String>> input1 = ImmutableSet.<Map<String, String>>of(
+ ImmutableMap.of("a", "1"),
+ ImmutableMap.of("b", "2"));
+ final Set<Map<String, String>> input2 = ImmutableSet.<Map<String, String>>of(
+ ImmutableMap.of("b", "2"),
+ ImmutableMap.of("c", "3"),
+ ImmutableMap.of("d", "4"));
+
+ testEntity.sensors().set(sensor, input1);
+ addSetChangePolicy(sensor, true, true);
+
+ testEntity.sensors().set(sensor, input2);
+ Asserts.eventually(new ConfigBagMapSupplier(onAddedParameters),
+ Predicates.<Collection<Map<String, Object>>>equalTo(ImmutableSet.<Map<String, Object>>of(
+ ImmutableMap.<String, Object>of("c", "3"),
+ ImmutableMap.<String, Object>of("d", "4"))));
+ Asserts.eventually(new ConfigBagMapSupplier(onRemovedParameters),
+ Predicates.<Collection<Map<String, Object>>>equalTo(ImmutableSet.<Map<String, Object>>of(
+ ImmutableMap.<String, Object>of("a", "1"))));
+
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testErrorIfNeitherOnAddedNorOnRemovedAreSet() {
+ addSetChangePolicy(false, false);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testErrorIfTriggerSensorNotSet() {
+ testEntity.policies().add(PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class)
+ .configure(InvokeEffectorOnCollectionSensorChange.ON_ADDED_EFFECTOR_NAME, onAddedEffector.getName())
+ .configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, onRemovedEffector.getName()));
+ }
+
+ private void addSetChangePolicy(boolean includeOnAdded, boolean includeOnRemoved) {
+ addSetChangePolicy(DEFAULT_SENSOR, includeOnAdded, includeOnRemoved);
+ }
+
+ private void addSetChangePolicy(AttributeSensor<? extends Collection<?>> sensor, boolean includeOnAdded, boolean includeOnRemoved) {
+ PolicySpec<InvokeEffectorOnCollectionSensorChange> policySpec = PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class)
+ .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, sensor);
+ if (includeOnAdded) {
+ policySpec.configure(InvokeEffectorOnCollectionSensorChange.ON_ADDED_EFFECTOR_NAME, onAddedEffector.getName());
+ }
+ if (includeOnRemoved) {
+ policySpec.configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, onRemovedEffector.getName());
+ }
+ testEntity.policies().add(policySpec);
+ }
+
+ private static class RecordingEffector extends EffectorBody<Void> {
+ final Collection<ConfigBag> callParameters;
+
+ private RecordingEffector(Collection<ConfigBag> callParameters) {
+ this.callParameters = callParameters;
+ }
+
+ @Override
+ public Void call(ConfigBag config) {
+ callParameters.add(config);
+ return null;
+ }
+ }
+
+ private static class ConfigBagValueKeySupplier implements Supplier<Collection<Object>> {
+ private final Collection<ConfigBag> collection;
+
+ private ConfigBagValueKeySupplier(Collection<ConfigBag> collection) {
+ this.collection = collection;
+ }
+
+ @Override
+ public Collection<Object> get() {
+ Set<Object> set = new HashSet<>();
+ for (ConfigBag bag : collection) {
+ set.add(bag.getStringKey(PARAMETER_NAME.getDefaultValue()));
+ }
+ return set;
+ }
+ }
+
+ private static class ConfigBagMapSupplier implements Supplier<Collection<Map<String, Object>>> {
+ private final Collection<ConfigBag> collection;
+
+
+ private ConfigBagMapSupplier(Collection<ConfigBag> collection) {
+ this.collection = collection;
+ }
+
+ @Override
+ public Collection<Map<String, Object>> get() {
+ Set<Map<String, Object>> values = new HashSet<>(collection.size());
+ for (ConfigBag bag : collection) {
+ values.add(bag.getAllConfigRaw());
+ }
+ return values;
+ }
+ }
+
+}
\ No newline at end of file