You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2016/10/18 12:43:31 UTC

[5/8] brooklyn-server git commit: InvokeEffectorOnCollectionSensorChange

InvokeEffectorOnCollectionSensorChange

Tracks a collection and invokes effectors when elements are added and
removed.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/414f881a
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/414f881a
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/414f881a

Branch: refs/heads/master
Commit: 414f881a68920f3781aab40f4d0a9f8ad43e2c9a
Parents: 2797e0c
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Tue Oct 18 12:12:05 2016 +0100
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Tue Oct 18 12:12:05 2016 +0100

----------------------------------------------------------------------
 .../InvokeEffectorOnCollectionSensorChange.java | 197 ++++++++++++++
 ...ectorOnCollectionSensorChangeRebindTest.java |  93 +++++++
 ...okeEffectorOnCollectionSensorChangeTest.java | 270 +++++++++++++++++++
 3 files changed, 560 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/414f881a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
new file mode 100644
index 0000000..fad9be6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Subscribes to events on a collection {@link AttributeSensor} and invokes the named
+ * effectors for each element that was added and removed.
+ * <p>
+ * The policy only detects <em>replacements</em> of the collection; it does not act on
+ * modifications. If the sensor has value <code>A</code> and an element is added &ndash;
+ * value <code>A'</code> &ndash; the on-added effector is not invoked. If the sensor is
+ * later set to <code>B</code> the delta is made between <code>A</code> and <code>B</code>,
+ * not <code>A'</code> and <code>B</code>.
+ * <p>
+ * To simplify the detection of additions and removals the collection is converted to a
+ * {@link Set}. This means that only a single event will fire for duplicate elements in
+ * the collection. Null values for the sensor are considered an empty collection.
+ * <p>
+ * The effectors are provided the elements that changed in their parameter map. If the
+ * sensor is a collection of maps the elements are provided with their keys coerced to
+ * strings and their values unchanged. Otherwise the elements are provided in a
+ * single-entry map keyed by the value for {@link #PARAMETER_NAME}.
+ * <p>
+ * Effectors are asynchronous. If elements are added and removed in quick succession
+ * there are no guarantees that the `onAdded' task will have finished before the
+ * corresponding `onRemoved' task is invoked.
+ */
+public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy implements SensorEventListener<Collection<?>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnCollectionSensorChange.class);
+
+    public static final ConfigKey<AttributeSensor<? extends Collection<?>>> TRIGGER_SENSOR = ConfigKeys.newConfigKey(
+            new TypeToken<AttributeSensor<? extends Collection<?>>>() {},
+            "sensor",
+            "Sensor to be monitored.");
+
+    public static final ConfigKey<String> ON_ADDED_EFFECTOR_NAME = ConfigKeys.newStringConfigKey(
+            "onAdded",
+            "Name of the effector to invoke when entries are added to the collection.");
+
+    public static final ConfigKey<String> ON_REMOVED_EFFECTOR_NAME = ConfigKeys.newStringConfigKey(
+            "onRemoved",
+            "Name of the effector to invoke when entries are removed from the collection.");
+
+    public static final ConfigKey<String> PARAMETER_NAME = ConfigKeys.newStringConfigKey(
+            "parameterName",
+            "The name of the parameter to supply to the effectors",
+            "value");
+
+    /** The previous version of the set against which events will be compared. */
+    private Set<Object> previous = Collections.emptySet();
+
+    /** Guards accesses of previous. */
+    private final Object[] updateLock = new Object[0];
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        /*
+         * A warning to future modifiers of this method: it is called on rebind
+         * so any changes must be compatible with existing persisted state.
+         */
+        super.setEntity(entity);
+        Sensor<? extends Collection<?>> sensor =
+                checkNotNull(getConfig(TRIGGER_SENSOR), "Value required for " + TRIGGER_SENSOR.getName());
+
+        checkArgument(Strings.isNonBlank(getConfig(PARAMETER_NAME)), "Value required for " + PARAMETER_NAME.getName());
+
+        // Fail straight away if neither effector is found.
+        if (getEffector(getOnAddedEffector()).isAbsentOrNull() &&
+                getEffector(getOnRemovedEffector()).isAbsentOrNull()) {
+            throw new IllegalArgumentException("Value required for one or both of " + ON_ADDED_EFFECTOR_NAME.getName() +
+                    " and " + ON_REMOVED_EFFECTOR_NAME.getName());
+        }
+
+        // Initialise `present` before subscribing.
+        Collection<?> current = entity.sensors().get(getTriggerSensor());
+        synchronized (updateLock) {
+            previous = (current != null) ? new HashSet<>(current) : Collections.emptySet();
+        }
+        subscriptions().subscribe(entity, sensor, this);
+    }
+
+    @Override
+    public void onEvent(SensorEvent<Collection<?>> event) {
+        final Set<Object> newValue = event.getValue() != null
+                ? new LinkedHashSet<>(event.getValue())
+                : ImmutableSet.of();
+        final Set<Object> added = new LinkedHashSet<>(), removed = new LinkedHashSet<>();
+        // It's only necessary to hold updateLock just to calculate the difference but
+        // it is useful to guarantee that all the effectors are queued before the next
+        // event is handled.
+        synchronized (updateLock) {
+            // Not using .immutableCopy() in case either set contains `null`.
+            Sets.difference(newValue, previous).copyInto(added);
+            Sets.difference(previous, newValue).copyInto(removed);
+            for (Object o : added) {
+                onAdded(o);
+            }
+            for (Object o : removed) {
+                onRemoved(o);
+            }
+            this.previous = Collections.unmodifiableSet(newValue);
+        }
+    }
+
+    private void onAdded(Object newElement) {
+        onEvent(getOnAddedEffector(), newElement);
+    }
+
+    private void onRemoved(Object newElement) {
+        onEvent(getOnRemovedEffector(), newElement);
+    }
+
+    private void onEvent(String effectorName, Object parameter) {
+        Maybe<Effector<?>> effector = getEffector(effectorName);
+        if (effector.isPresentAndNonNull()) {
+            final Map<String, Object> parameters;
+            if (parameter instanceof Map) {
+                Map<?, ?> param = (Map) parameter;
+                parameters = MutableMap.of();
+                for (Map.Entry<?, ?> entry : param.entrySet()) {
+                    String key = TypeCoercions.coerce(entry.getKey(), String.class);
+                    parameters.put(key, entry.getValue());
+                }
+            } else {
+                parameters = MutableMap.of(getConfig(PARAMETER_NAME), parameter);
+            }
+
+            LOG.debug("{} invoking {} on {} with parameters {}", new Object[]{this, effector, entity, parameters});
+            entity.invoke(effector.get(), parameters);
+        }
+    }
+
+    private Maybe<Effector<?>> getEffector(String name) {
+        return entity.getEntityType().getEffectorByName(name);
+    }
+
+    private String getOnAddedEffector() {
+        return getConfig(ON_ADDED_EFFECTOR_NAME);
+    }
+
+    private String getOnRemovedEffector() {
+        return getConfig(ON_REMOVED_EFFECTOR_NAME);
+    }
+
+    private AttributeSensor<? extends Collection<?>> getTriggerSensor() {
+        return getConfig(TRIGGER_SENSOR);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/414f881a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeRebindTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeRebindTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeRebindTest.java
new file mode 100644
index 0000000..e4311d6
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeRebindTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.AddEffector;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+public class InvokeEffectorOnCollectionSensorChangeRebindTest extends RebindTestFixtureWithApp {
+
+    private static final AttributeSensor<Collection<Integer>> SENSOR = Sensors.newSensor(new TypeToken<Collection<Integer>>() {},
+            "invokeeffectoronsetchangerebindtest.sensor");
+
+    private static final AttributeSensor<Collection<Object>> REMOVED_EFFECTOR_VALUES = Sensors.newSensor(new TypeToken<Collection<Object>>() {},
+            "invokeeffectoronsetchangerebindtest.removedvalues");
+
+    @Test
+    public void testEffectorMaintainsPreviousCollectionThroughRebind() throws Exception {
+        final Set<Integer> input1 = ImmutableSet.of(1, 2);
+        final Set<Integer> input2 = ImmutableSet.of(2, 3);
+        final Set<Integer> input3 = ImmutableSet.of(3, 4);
+
+        Entity testEntity = app().createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class)
+                        .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, SENSOR)
+                        .configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, "on-removed-effector"))
+                .addInitializer(new AddEffector(Effectors.effector(Void.class, "on-removed-effector")
+                        .impl(new PublishingEffector())
+                        .build())));
+        testEntity.sensors().set(SENSOR, input1);
+        testEntity.sensors().set(SENSOR, input2);
+        EntityAsserts.assertAttributeEqualsEventually(testEntity, REMOVED_EFFECTOR_VALUES, ImmutableSet.<Object>of(1));
+
+        newApp = rebind();
+
+        testEntity = Iterables.getOnlyElement(newApp.getChildren());
+        testEntity.sensors().set(SENSOR, input3);
+        EntityAsserts.assertAttributeEqualsEventually(testEntity, REMOVED_EFFECTOR_VALUES, ImmutableSet.<Object>of(1, 2));
+    }
+
+
+    private static class PublishingEffector extends EffectorBody<Void> {
+        @Override
+        public Void call(ConfigBag parameters) {
+            synchronized (PublishingEffector.class) {
+                Collection<Object> values = entity().sensors().get(REMOVED_EFFECTOR_VALUES);
+                if (values == null) {
+                    values = Sets.newHashSet();
+                }
+                final Object v = parameters.getStringKey("value");
+                values.add(v);
+                entity().sensors().set(REMOVED_EFFECTOR_VALUES, values);
+                return null;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/414f881a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
new file mode 100644
index 0000000..ab72f71
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import static org.apache.brooklyn.policy.InvokeEffectorOnCollectionSensorChange.PARAMETER_NAME;
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.AddEffector;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.CollectionFunctionals;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.reflect.TypeToken;
+
+public class InvokeEffectorOnCollectionSensorChangeTest extends BrooklynAppUnitTestSupport {
+
+    private static final AttributeSensor<Collection<Integer>> DEFAULT_SENSOR = Sensors.newSensor(new TypeToken<Collection<Integer>>() {},
+            "invokeeffectoronsetchangetest.sensor");
+
+    LinkedBlockingQueue<ConfigBag> onAddedParameters;
+    LinkedBlockingQueue<ConfigBag> onRemovedParameters;
+    Effector<Void> onAddedEffector;
+    Effector<Void> onRemovedEffector;
+    TestEntity testEntity;
+
+    @Override
+    @BeforeMethod
+    public void setUp() throws Exception {
+        super.setUp();
+        onAddedParameters = new LinkedBlockingQueue<>();
+        onRemovedParameters = new LinkedBlockingQueue<>();
+        onAddedEffector = Effectors.effector(Void.class, "on-added-effector")
+                .impl(new RecordingEffector(onAddedParameters))
+                .build();
+        onRemovedEffector = Effectors.effector(Void.class, "on-removed-effector")
+                .impl(new RecordingEffector(onRemovedParameters))
+                .build();
+        testEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .addInitializer(new AddEffector(onAddedEffector))
+                .addInitializer(new AddEffector(onRemovedEffector)));
+    }
+
+    @Test
+    public void testOnAddedEffectorCalledWhenItemsAdded() throws Exception {
+        addSetChangePolicy(true, false);
+        final Set<Integer> values = ImmutableSet.of(1);
+        testEntity.sensors().set(DEFAULT_SENSOR, values);
+        ConfigBag params = onAddedParameters.poll(10, TimeUnit.SECONDS);
+        assertEquals(params.getStringKey(PARAMETER_NAME.getDefaultValue()), 1);
+    }
+
+    @Test
+    public void testOnRemovedEffectorNotCalledWhenItemsAdded() throws Exception {
+        addSetChangePolicy(false, true);
+        final Set<Integer> values = ImmutableSet.of(1);
+        testEntity.sensors().set(DEFAULT_SENSOR, values);
+        Asserts.continually(CollectionFunctionals.sizeSupplier(onRemovedParameters), Predicates.equalTo(0));
+    }
+
+    @Test
+    public void testOnRemovedEffectorCalledWhenItemRemoved() throws Exception {
+        testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1, 2));
+        addSetChangePolicy(false, true);
+        final Set<Integer> values = ImmutableSet.of(1);
+        testEntity.sensors().set(DEFAULT_SENSOR, values);
+        ConfigBag params = onRemovedParameters.poll(10, TimeUnit.SECONDS);
+        assertEquals(params.getStringKey(PARAMETER_NAME.getDefaultValue()), 2);
+    }
+
+    @Test
+    public void testOnAddedEffectorNotCalledWhenItemRemoved() throws Exception {
+        testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1));
+        addSetChangePolicy(true, false);
+        testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.<Integer>of());
+        Asserts.continually(CollectionFunctionals.sizeSupplier(onRemovedParameters), Predicates.equalTo(0));
+    }
+
+    @Test
+    public void testSeveralItemsAddedAndRemovedAtOnce() throws Exception {
+        testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1, 2, 3));
+        addSetChangePolicy(true, true);
+        testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(3, 4, 5));
+
+        // 1 and 2 were removed, 4 and 5 were added.
+        Asserts.eventually(new ConfigBagValueKeySupplier(onRemovedParameters),
+                Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(1, 2)));
+        Asserts.eventually(new ConfigBagValueKeySupplier(onAddedParameters),
+                Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(4, 5)));
+    }
+
+    @Test
+    public void testNothingHappensWhenSensorRepublishedUnchanged() {
+        final ImmutableSet<Integer> input1 = ImmutableSet.of(1, 2, 3);
+        testEntity.sensors().set(DEFAULT_SENSOR, input1);
+        addSetChangePolicy(true, true);
+        testEntity.sensors().set(DEFAULT_SENSOR, input1);
+        // Neither effector should be invoked.
+        Asserts.continually(CollectionFunctionals.sizeSupplier(onAddedParameters), Predicates.equalTo(0));
+        Asserts.continually(CollectionFunctionals.sizeSupplier(onRemovedParameters), Predicates.equalTo(0));
+    }
+
+    @Test
+    public void testCollectionsAreConvertedToSets() {
+        final List<Integer> input1 = ImmutableList.of(
+                1, 1,
+                2, 3, 4, 5,
+                2, 3, 4, 5);
+        final List<Integer> input2 = ImmutableList.of(6, 5, 4, 3, 3);
+
+        addSetChangePolicy(true, true);
+
+        testEntity.sensors().set(DEFAULT_SENSOR, input1);
+        Asserts.eventually(new ConfigBagValueKeySupplier(onAddedParameters),
+                Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(1, 2, 3, 4, 5)));
+        Asserts.continually(CollectionFunctionals.sizeSupplier(onRemovedParameters), Predicates.equalTo(0));
+
+        onAddedParameters.clear();
+
+        testEntity.sensors().set(DEFAULT_SENSOR, input2);
+        Asserts.eventually(new ConfigBagValueKeySupplier(onAddedParameters),
+                Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(6)));
+        Asserts.eventually(new ConfigBagValueKeySupplier(onRemovedParameters),
+                Predicates.<Collection<Object>>equalTo(ImmutableSet.<Object>of(1, 2)));
+    }
+
+    @Test
+    public void testMapValueUsedAsArgumentDirectly() {
+        AttributeSensor<Collection<Map<String, String>>> sensor = Sensors.newSensor(new TypeToken<Collection<Map<String, String>>>() {},
+                "testMapValueUsedAsArgumentDirectly");
+        final Set<Map<String, String>> input1 = ImmutableSet.<Map<String, String>>of(
+                ImmutableMap.of("a", "1"),
+                ImmutableMap.of("b", "2"));
+        final Set<Map<String, String>> input2 = ImmutableSet.<Map<String, String>>of(
+                ImmutableMap.of("b", "2"),
+                ImmutableMap.of("c", "3"),
+                ImmutableMap.of("d", "4"));
+
+        testEntity.sensors().set(sensor, input1);
+        addSetChangePolicy(sensor, true, true);
+
+        testEntity.sensors().set(sensor, input2);
+        Asserts.eventually(new ConfigBagMapSupplier(onAddedParameters),
+                Predicates.<Collection<Map<String, Object>>>equalTo(ImmutableSet.<Map<String, Object>>of(
+                        ImmutableMap.<String, Object>of("c", "3"),
+                        ImmutableMap.<String, Object>of("d", "4"))));
+        Asserts.eventually(new ConfigBagMapSupplier(onRemovedParameters),
+                Predicates.<Collection<Map<String, Object>>>equalTo(ImmutableSet.<Map<String, Object>>of(
+                        ImmutableMap.<String, Object>of("a", "1"))));
+
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testErrorIfNeitherOnAddedNorOnRemovedAreSet() {
+        addSetChangePolicy(false, false);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testErrorIfTriggerSensorNotSet() {
+        testEntity.policies().add(PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class)
+                .configure(InvokeEffectorOnCollectionSensorChange.ON_ADDED_EFFECTOR_NAME, onAddedEffector.getName())
+                .configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, onRemovedEffector.getName()));
+    }
+
+    private void addSetChangePolicy(boolean includeOnAdded, boolean includeOnRemoved) {
+        addSetChangePolicy(DEFAULT_SENSOR, includeOnAdded, includeOnRemoved);
+    }
+
+    private void addSetChangePolicy(AttributeSensor<? extends Collection<?>> sensor, boolean includeOnAdded, boolean includeOnRemoved) {
+        PolicySpec<InvokeEffectorOnCollectionSensorChange> policySpec = PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class)
+                .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, sensor);
+        if (includeOnAdded) {
+            policySpec.configure(InvokeEffectorOnCollectionSensorChange.ON_ADDED_EFFECTOR_NAME, onAddedEffector.getName());
+        }
+        if (includeOnRemoved) {
+            policySpec.configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, onRemovedEffector.getName());
+        }
+        testEntity.policies().add(policySpec);
+    }
+
+    private static class RecordingEffector extends EffectorBody<Void> {
+        final Collection<ConfigBag> callParameters;
+
+        private RecordingEffector(Collection<ConfigBag> callParameters) {
+            this.callParameters = callParameters;
+        }
+
+        @Override
+        public Void call(ConfigBag config) {
+            callParameters.add(config);
+            return null;
+        }
+    }
+
+    private static class ConfigBagValueKeySupplier implements Supplier<Collection<Object>> {
+        private final Collection<ConfigBag> collection;
+
+        private ConfigBagValueKeySupplier(Collection<ConfigBag> collection) {
+            this.collection = collection;
+        }
+
+        @Override
+        public Collection<Object> get() {
+            Set<Object> set = new HashSet<>();
+            for (ConfigBag bag : collection) {
+                set.add(bag.getStringKey(PARAMETER_NAME.getDefaultValue()));
+            }
+            return set;
+        }
+    }
+
+    private static class ConfigBagMapSupplier implements Supplier<Collection<Map<String, Object>>> {
+        private final Collection<ConfigBag> collection;
+
+
+        private ConfigBagMapSupplier(Collection<ConfigBag> collection) {
+            this.collection = collection;
+        }
+
+        @Override
+        public Collection<Map<String, Object>> get() {
+            Set<Map<String, Object>> values = new HashSet<>(collection.size());
+            for (ConfigBag bag : collection) {
+                values.add(bag.getAllConfigRaw());
+            }
+            return values;
+        }
+    }
+
+}
\ No newline at end of file