You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sv...@apache.org on 2016/10/31 11:11:08 UTC

[2/3] brooklyn-server git commit: Add AbstractInvokeEffectorPolicy

Add AbstractInvokeEffectorPolicy

Supports tracking the number of effector tasks that are ongoing and
publishing that as an "is busy" sensor on the entity the policy is
attached to.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/a413cedf
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/a413cedf
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/a413cedf

Branch: refs/heads/master
Commit: a413cedf5cffec87a343ee3041b37db538d6a03a
Parents: 92da714
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Fri Oct 28 11:52:55 2016 +0100
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Fri Oct 28 11:52:55 2016 +0100

----------------------------------------------------------------------
 .../policy/AbstractInvokeEffectorPolicy.java    | 145 +++++++++++++++++++
 .../InvokeEffectorOnCollectionSensorChange.java |  14 +-
 .../policy/InvokeEffectorOnSensorChange.java    |  54 +++++--
 .../AbstractInvokeEffectorPolicyTest.java       | 105 ++++++++++++++
 ...okeEffectorOnCollectionSensorChangeTest.java |  25 +++-
 ...keEffectorOnSensorChangeIntegrationTest.java |  87 +++++++++++
 6 files changed, 415 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java b/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java
new file mode 100644
index 0000000..d0b5e67
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
+
+    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
+            "isBusySensor",
+            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
+                    "If unset running tasks will not be tracked.");
+
+    private final AtomicInteger taskCounter = new AtomicInteger();
+
+    /**
+     * Indicates that onEvent was notified of an value of is not the latest sensor value.
+     */
+    private boolean moreUpdatesComing;
+    /**
+     * The timestamp of the event that informed moreUpdatesComing. Subsequent notifications
+     * of earlier events will not cause updates of moreUpdatesComing.
+     */
+    private long mostRecentUpdate = 0;
+    /**
+     * Guards {@link #moreUpdatesComing} and {@link #mostRecentUpdate}.
+     */
+    private final Object[] moreUpdatesLock = new Object[0];
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+        if (isBusySensorEnabled()) {
+            // Republishes when the entity rebinds.
+            publishIsBusy();
+        }
+    }
+
+    /**
+     * Invoke effector with parameters on the entity that the policy is attached to.
+     */
+    protected <T> Task<T> invoke(Effector<T> effector, Map<String, ?> parameters) {
+        if (isBusySensorEnabled()) {
+            getTaskCounter().incrementAndGet();
+            publishIsBusy();
+        }
+        Task<T> task = entity.invoke(effector, parameters);
+        if (isBusySensorEnabled()) {
+            task.addListener(new EffectorListener(), MoreExecutors.sameThreadExecutor());
+        }
+        return task;
+    }
+
+    protected boolean isBusy() {
+        synchronized (moreUpdatesLock) {
+            return getTaskCounter().get() != 0 || moreUpdatesComing;
+        }
+    }
+
+    protected boolean isBusySensorEnabled() {
+        return Strings.isNonBlank(getIsBusySensorName());
+    }
+
+    protected Maybe<Effector<?>> getEffectorNamed(String name) {
+        return entity.getEntityType().getEffectorByName(name);
+    }
+
+    @Nonnull
+    protected String getIsBusySensorName() {
+        return getConfig(IS_BUSY_SENSOR_NAME);
+    }
+
+    /**
+     * Indicates that when the policy was notified of eventValue, occurring at time
+     * eventTimestamp, it observed the current sensor value to be current. This
+     * informs the value for {@link #moreUpdatesComing}.
+     */
+    protected <T> void setMoreUpdatesComing(long eventTimestamp, T eventValue, T current) {
+        if (eventTimestamp >= mostRecentUpdate) {
+            synchronized (moreUpdatesLock) {
+                if (eventTimestamp >= mostRecentUpdate) {
+                    moreUpdatesComing = !Objects.equal(eventValue, current);
+                    mostRecentUpdate = eventTimestamp;
+                }
+            }
+        }
+    }
+
+    private AtomicInteger getTaskCounter() {
+        return taskCounter;
+    }
+
+    private void publishIsBusy() {
+        final boolean busy = isBusy();
+        LOG.trace("{} taskCount={}, isBusy={}", new Object[]{this, getTaskCounter().get(), busy});
+        AttributeSensor<Boolean> sensor = Sensors.newBooleanSensor(getIsBusySensorName());
+        entity.sensors().set(sensor, busy);
+    }
+
+    private class EffectorListener implements Runnable {
+        @Override
+        public void run() {
+            getTaskCounter().decrementAndGet();
+            publishIsBusy();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
index fad9be6..352d40d 100644
--- a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
+++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
@@ -37,7 +37,6 @@ import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.guava.Maybe;
@@ -72,7 +71,7 @@ import com.google.common.reflect.TypeToken;
  * there are no guarantees that the `onAdded' task will have finished before the
  * corresponding `onRemoved' task is invoked.
  */
-public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy implements SensorEventListener<Collection<?>> {
+public class InvokeEffectorOnCollectionSensorChange extends AbstractInvokeEffectorPolicy implements SensorEventListener<Collection<?>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnCollectionSensorChange.class);
 
@@ -132,6 +131,15 @@ public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy imple
         final Set<Object> newValue = event.getValue() != null
                 ? new LinkedHashSet<>(event.getValue())
                 : ImmutableSet.of();
+        if (isBusySensorEnabled()) {
+            // There are more events coming that this policy hasn't been notified of if the
+            // value received in the event does not match the current value of the sensor.
+            final Collection<?> sensorVal = entity.sensors().get(getTriggerSensor());
+            final Set<Object> sensorValSet = sensorVal != null
+                ? new LinkedHashSet<>(sensorVal)
+                : ImmutableSet.of();
+            setMoreUpdatesComing(event.getTimestamp(), newValue, sensorValSet);
+        }
         final Set<Object> added = new LinkedHashSet<>(), removed = new LinkedHashSet<>();
         // It's only necessary to hold updateLock just to calculate the difference but
         // it is useful to guarantee that all the effectors are queued before the next
@@ -174,7 +182,7 @@ public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy imple
             }
 
             LOG.debug("{} invoking {} on {} with parameters {}", new Object[]{this, effector, entity, parameters});
-            entity.invoke(effector.get(), parameters);
+            invoke(effector.get(), parameters);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
index ebb6da8..662fd1d 100644
--- a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
+++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
@@ -18,19 +18,23 @@
  */
 package org.apache.brooklyn.policy;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.text.StringPredicates;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
 
 /**
  * Invokes the given effector when the policy changes.
@@ -40,28 +44,56 @@ import com.google.common.base.Preconditions;
  * * support conditions
  * * allow to be triggered by sensors on members
  */
-public class InvokeEffectorOnSensorChange extends AbstractPolicy implements SensorEventListener<Object> {
+public class InvokeEffectorOnSensorChange extends AbstractInvokeEffectorPolicy implements SensorEventListener<Object> {
     
     private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnSensorChange.class);
 
-    public static final ConfigKey<Object> SENSOR = ConfigKeys.newConfigKey(Object.class, 
-            "sensor", "Sensor to be monitored, as string or sensor type");
+    public static final ConfigKey<Object> SENSOR = ConfigKeys.builder(Object.class)
+            .name("sensor")
+            .description("Sensor to be monitored, as string or sensor type")
+            .constraint(Predicates.notNull())
+            .build();
 
-    public static final ConfigKey<String> EFFECTOR = ConfigKeys.newStringConfigKey(
-            "effector", "Name of effector to invoke");
+    public static final ConfigKey<String> EFFECTOR = ConfigKeys.builder(String.class)
+            .name("effector")
+            .description("Name of effector to invoke")
+            .constraint(StringPredicates.isNonBlank())
+            .build();
+
+    private AttributeSensor<Object> sensor;
 
     @Override
     public void setEntity(EntityLocal entity) {
         super.setEntity(entity);
         Preconditions.checkNotNull(getConfig(EFFECTOR), EFFECTOR);
-        Object sensor = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR);
-        if (sensor instanceof String) sensor = Sensors.newSensor(Object.class, (String)sensor);
-        subscriptions().subscribe(entity, (Sensor<?>)sensor, this);
+        sensor = getSensor();
+        subscriptions().subscribe(entity, sensor, this);
+        LOG.debug("{} subscribed to {} events on {}", new Object[]{this, sensor, entity});
     }
 
     @Override
     public void onEvent(SensorEvent<Object> event) {
-        entity.invoke(entity.getEntityType().getEffectorByName(getConfig(EFFECTOR)).get(), MutableMap.<String, Object>of());
+        final Effector<?> eff = getEffectorNamed(getConfig(EFFECTOR)).get();
+        if (isBusySensorEnabled()) {
+            final Object currentSensorValue = entity.sensors().get(sensor);
+            setMoreUpdatesComing(event.getTimestamp(), event.getValue(), currentSensorValue);
+        }
+        invoke(eff, MutableMap.<String, Object>of());
+    }
+
+    private AttributeSensor<Object> getSensor() {
+        final Object configVal = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR);
+        final AttributeSensor<Object> sensor;
+        if (configVal == null) {
+            throw new NullPointerException("Value for " + SENSOR.getName() + " is null");
+        } else if (configVal instanceof String) {
+            sensor = Sensors.newSensor(Object.class, (String) configVal);
+        } else if (configVal instanceof AttributeSensor) {
+            sensor = (AttributeSensor<Object>) configVal;
+        } else {
+            sensor = TypeCoercions.tryCoerce(configVal, new TypeToken<AttributeSensor<Object>>() {}).get();
+        }
+        return sensor;
     }
     
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java b/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java
new file mode 100644
index 0000000..15f3b48
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+
+public class AbstractInvokeEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+
+    @Test
+    public void testCountReflectsNumberOfExecutingEffectors() {
+        final CountDownLatch effectorLatch = new CountDownLatch(1);
+        final AttributeSensor<Boolean> policyIsBusy = Sensors.newBooleanSensor(
+                "policyIsBusy");
+        final Effector<Void> blockingEffector = Effectors.effector(Void.class, "abstract-invoke-effector-policy-test")
+                .impl(new BlockingEffector(effectorLatch))
+                .build();
+        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        final TestAbstractInvokeEffectorPolicy policy = entity.policies().add(
+                PolicySpec.create(TestAbstractInvokeEffectorPolicy.class)
+                        .configure(AbstractInvokeEffectorPolicy.IS_BUSY_SENSOR_NAME, policyIsBusy.getName()));
+        final Task<?> effectorTask = policy.invoke(blockingEffector, ImmutableMap.<String, Object>of());
+
+        // expect isbusy on entity, effector incomplete.
+        Supplier<Boolean> effectorTaskDoneSupplier = new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+                return effectorTask.isDone();
+            }
+        };
+        Asserts.continually(effectorTaskDoneSupplier, Predicates.equalTo(false));
+        EntityAsserts.assertAttributeEqualsEventually(entity, policyIsBusy, true);
+
+        effectorLatch.countDown();
+
+        Asserts.eventually(effectorTaskDoneSupplier, Predicates.equalTo(true));
+        EntityAsserts.assertAttributeEqualsEventually(entity, policyIsBusy, false);
+    }
+
+    public static class TestAbstractInvokeEffectorPolicy extends AbstractInvokeEffectorPolicy {
+        public TestAbstractInvokeEffectorPolicy() {
+        }
+
+        @Override
+        public void setEntity(EntityLocal entity) {
+            super.setEntity(entity);
+        }
+    }
+
+    private static class BlockingEffector extends EffectorBody<Void> {
+        final CountDownLatch latch;
+
+        private BlockingEffector(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        public Void call(ConfigBag config) {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            }
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
index ab72f71..49d8c33 100644
--- a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
+++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -34,6 +35,8 @@ import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.policy.PolicySpec;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.effector.AddEffector;
 import org.apache.brooklyn.core.effector.EffectorBody;
 import org.apache.brooklyn.core.effector.Effectors;
@@ -48,6 +51,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -58,6 +62,9 @@ public class InvokeEffectorOnCollectionSensorChangeTest extends BrooklynAppUnitT
     private static final AttributeSensor<Collection<Integer>> DEFAULT_SENSOR = Sensors.newSensor(new TypeToken<Collection<Integer>>() {},
             "invokeeffectoronsetchangetest.sensor");
 
+    private static final AttributeSensor<Boolean> IS_BUSY_SENSOR = Sensors.newBooleanSensor(
+            "invokeeffectoronsetchangetest.isBusy");
+
     LinkedBlockingQueue<ConfigBag> onAddedParameters;
     LinkedBlockingQueue<ConfigBag> onRemovedParameters;
     Effector<Void> onAddedEffector;
@@ -202,13 +209,29 @@ public class InvokeEffectorOnCollectionSensorChangeTest extends BrooklynAppUnitT
                 .configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, onRemovedEffector.getName()));
     }
 
+    @Test
+    public void testPublishesIsBusySensor() {
+        final List<Boolean> isBusyValues = new CopyOnWriteArrayList<>();
+        testEntity.subscriptions().subscribe(testEntity, IS_BUSY_SENSOR, new SensorEventListener<Boolean>() {
+            @Override
+            public void onEvent(SensorEvent<Boolean> event) {
+                isBusyValues.add(event.getValue());
+            }
+        });
+        addSetChangePolicy(true, false);
+        testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1));
+        List<Boolean> expected = ImmutableList.of(false, true, false);
+        Asserts.eventually(Suppliers.ofInstance(isBusyValues), Predicates.equalTo(expected));
+    }
+
     private void addSetChangePolicy(boolean includeOnAdded, boolean includeOnRemoved) {
         addSetChangePolicy(DEFAULT_SENSOR, includeOnAdded, includeOnRemoved);
     }
 
     private void addSetChangePolicy(AttributeSensor<? extends Collection<?>> sensor, boolean includeOnAdded, boolean includeOnRemoved) {
         PolicySpec<InvokeEffectorOnCollectionSensorChange> policySpec = PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class)
-                .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, sensor);
+                .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, sensor)
+                .configure(InvokeEffectorOnCollectionSensorChange.IS_BUSY_SENSOR_NAME, IS_BUSY_SENSOR.getName());
         if (includeOnAdded) {
             policySpec.configure(InvokeEffectorOnCollectionSensorChange.ON_ADDED_EFFECTOR_NAME, onAddedEffector.getName());
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java
new file mode 100644
index 0000000..cfeaf83
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.AddEffector;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.entity.stock.BasicEntity;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.testng.annotations.Test;
+
+public class InvokeEffectorOnSensorChangeIntegrationTest extends BrooklynAppUnitTestSupport {
+
+    @Test(groups = "Integration")
+    public void testIsBusySensorAlwaysFalseAtEnd() throws InterruptedException {
+        /*
+         * Stress-test isBusy. Reliably failed with insufficient synchronisation
+         * in AbstractInvokeEffectorPolicy.
+         */
+        final AttributeSensor<String> sensor = Sensors.newStringSensor("my-sensor");
+        final AttributeSensor<Boolean> isBusy = Sensors.newBooleanSensor("is-busy");
+        Effector<Void> effector = Effectors.effector(Void.class, "effector")
+                .impl(new DoNothingEffector())
+                .build();
+        final BasicEntity entity = app.createAndManageChild(EntitySpec.create(BasicEntity.class)
+                .addInitializer(new AddEffector(effector))
+                .policy(PolicySpec.create(InvokeEffectorOnSensorChange.class)
+                        .configure(InvokeEffectorOnSensorChange.SENSOR, sensor)
+                        .configure(InvokeEffectorOnSensorChange.EFFECTOR, "effector")
+                        .configure(InvokeEffectorOnSensorChange.IS_BUSY_SENSOR_NAME, isBusy.getName())));
+        final AtomicInteger threadId = new AtomicInteger();
+        Thread[] threads = new Thread[10];
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(new Runnable() {
+                private int count = 0;
+                @Override
+                public void run() {
+                    int id = threadId.incrementAndGet();
+                    while (count++ < 1000) {
+                        entity.sensors().set(sensor, "thread-" + id + "-" + count);
+                    }
+                }
+            });
+            threads[i].start();
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        EntityAsserts.assertAttributeEqualsEventually(entity, isBusy, false);
+    }
+
+
+    private static class DoNothingEffector extends EffectorBody<Void> {
+        @Override
+        public Void call(ConfigBag config) {
+            return null;
+        }
+    }
+
+}
\ No newline at end of file