You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sv...@apache.org on 2016/10/31 11:11:07 UTC

[1/3] brooklyn-server git commit: Fix Javadoc references

Repository: brooklyn-server
Updated Branches:
  refs/heads/master 23be8b2a3 -> f4ebc2822


Fix Javadoc references


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/92da714c
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/92da714c
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/92da714c

Branch: refs/heads/master
Commit: 92da714c31e368b56ecb00cff21b0f7939faeb95
Parents: 44d11e2
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Tue Oct 18 15:59:30 2016 +0100
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Mon Oct 24 12:42:07 2016 +0100

----------------------------------------------------------------------
 .../networking/JcloudsLocationSecurityGroupCustomizer.java       | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/92da714c/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java
index e15ad68..a5bff7d 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/networking/JcloudsLocationSecurityGroupCustomizer.java
@@ -168,13 +168,13 @@ public class JcloudsLocationSecurityGroupCustomizer extends BasicJcloudsLocation
         return this;
     }
 
-    /** @see #addPermissionsToLocation(JcloudsSshMachineLocation, java.lang.Iterable) */
+    /** @see #addPermissionsToLocation(JcloudsMachineLocation, java.lang.Iterable) */
     public JcloudsLocationSecurityGroupCustomizer addPermissionsToLocation(final JcloudsMachineLocation location, IpPermission... permissions) {
         addPermissionsToLocation(location, ImmutableList.copyOf(permissions));
         return this;
     }
 
-    /** @see #addPermissionsToLocation(JcloudsSshMachineLocation, java.lang.Iterable) */
+    /** @see #addPermissionsToLocation(JcloudsMachineLocation, java.lang.Iterable) */
     public JcloudsLocationSecurityGroupCustomizer addPermissionsToLocation(final JcloudsMachineLocation location, SecurityGroupDefinition securityGroupDefinition) {
         addPermissionsToLocation(location, securityGroupDefinition.getPermissions());
         return this;


[2/3] brooklyn-server git commit: Add AbstractInvokeEffectorPolicy

Posted by sv...@apache.org.
Add AbstractInvokeEffectorPolicy

Supports tracking the number of effector tasks that are ongoing and
publishing that as an "is busy" sensor on the entity the policy is
attached to.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/a413cedf
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/a413cedf
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/a413cedf

Branch: refs/heads/master
Commit: a413cedf5cffec87a343ee3041b37db538d6a03a
Parents: 92da714
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Fri Oct 28 11:52:55 2016 +0100
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Fri Oct 28 11:52:55 2016 +0100

----------------------------------------------------------------------
 .../policy/AbstractInvokeEffectorPolicy.java    | 145 +++++++++++++++++++
 .../InvokeEffectorOnCollectionSensorChange.java |  14 +-
 .../policy/InvokeEffectorOnSensorChange.java    |  54 +++++--
 .../AbstractInvokeEffectorPolicyTest.java       | 105 ++++++++++++++
 ...okeEffectorOnCollectionSensorChangeTest.java |  25 +++-
 ...keEffectorOnSensorChangeIntegrationTest.java |  87 +++++++++++
 6 files changed, 415 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java b/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java
new file mode 100644
index 0000000..d0b5e67
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
+
+    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
+            "isBusySensor",
+            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
+                    "If unset running tasks will not be tracked.");
+
+    private final AtomicInteger taskCounter = new AtomicInteger();
+
+    /**
+     * Indicates that onEvent was notified of an value of is not the latest sensor value.
+     */
+    private boolean moreUpdatesComing;
+    /**
+     * The timestamp of the event that informed moreUpdatesComing. Subsequent notifications
+     * of earlier events will not cause updates of moreUpdatesComing.
+     */
+    private long mostRecentUpdate = 0;
+    /**
+     * Guards {@link #moreUpdatesComing} and {@link #mostRecentUpdate}.
+     */
+    private final Object[] moreUpdatesLock = new Object[0];
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+        if (isBusySensorEnabled()) {
+            // Republishes when the entity rebinds.
+            publishIsBusy();
+        }
+    }
+
+    /**
+     * Invoke effector with parameters on the entity that the policy is attached to.
+     */
+    protected <T> Task<T> invoke(Effector<T> effector, Map<String, ?> parameters) {
+        if (isBusySensorEnabled()) {
+            getTaskCounter().incrementAndGet();
+            publishIsBusy();
+        }
+        Task<T> task = entity.invoke(effector, parameters);
+        if (isBusySensorEnabled()) {
+            task.addListener(new EffectorListener(), MoreExecutors.sameThreadExecutor());
+        }
+        return task;
+    }
+
+    protected boolean isBusy() {
+        synchronized (moreUpdatesLock) {
+            return getTaskCounter().get() != 0 || moreUpdatesComing;
+        }
+    }
+
+    protected boolean isBusySensorEnabled() {
+        return Strings.isNonBlank(getIsBusySensorName());
+    }
+
+    protected Maybe<Effector<?>> getEffectorNamed(String name) {
+        return entity.getEntityType().getEffectorByName(name);
+    }
+
+    @Nonnull
+    protected String getIsBusySensorName() {
+        return getConfig(IS_BUSY_SENSOR_NAME);
+    }
+
+    /**
+     * Indicates that when the policy was notified of eventValue, occurring at time
+     * eventTimestamp, it observed the current sensor value to be current. This
+     * informs the value for {@link #moreUpdatesComing}.
+     */
+    protected <T> void setMoreUpdatesComing(long eventTimestamp, T eventValue, T current) {
+        if (eventTimestamp >= mostRecentUpdate) {
+            synchronized (moreUpdatesLock) {
+                if (eventTimestamp >= mostRecentUpdate) {
+                    moreUpdatesComing = !Objects.equal(eventValue, current);
+                    mostRecentUpdate = eventTimestamp;
+                }
+            }
+        }
+    }
+
+    private AtomicInteger getTaskCounter() {
+        return taskCounter;
+    }
+
+    private void publishIsBusy() {
+        final boolean busy = isBusy();
+        LOG.trace("{} taskCount={}, isBusy={}", new Object[]{this, getTaskCounter().get(), busy});
+        AttributeSensor<Boolean> sensor = Sensors.newBooleanSensor(getIsBusySensorName());
+        entity.sensors().set(sensor, busy);
+    }
+
+    private class EffectorListener implements Runnable {
+        @Override
+        public void run() {
+            getTaskCounter().decrementAndGet();
+            publishIsBusy();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
index fad9be6..352d40d 100644
--- a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
+++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java
@@ -37,7 +37,6 @@ import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.guava.Maybe;
@@ -72,7 +71,7 @@ import com.google.common.reflect.TypeToken;
  * there are no guarantees that the `onAdded' task will have finished before the
  * corresponding `onRemoved' task is invoked.
  */
-public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy implements SensorEventListener<Collection<?>> {
+public class InvokeEffectorOnCollectionSensorChange extends AbstractInvokeEffectorPolicy implements SensorEventListener<Collection<?>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnCollectionSensorChange.class);
 
@@ -132,6 +131,15 @@ public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy imple
         final Set<Object> newValue = event.getValue() != null
                 ? new LinkedHashSet<>(event.getValue())
                 : ImmutableSet.of();
+        if (isBusySensorEnabled()) {
+            // There are more events coming that this policy hasn't been notified of if the
+            // value received in the event does not match the current value of the sensor.
+            final Collection<?> sensorVal = entity.sensors().get(getTriggerSensor());
+            final Set<Object> sensorValSet = sensorVal != null
+                ? new LinkedHashSet<>(sensorVal)
+                : ImmutableSet.of();
+            setMoreUpdatesComing(event.getTimestamp(), newValue, sensorValSet);
+        }
         final Set<Object> added = new LinkedHashSet<>(), removed = new LinkedHashSet<>();
         // It's only necessary to hold updateLock just to calculate the difference but
         // it is useful to guarantee that all the effectors are queued before the next
@@ -174,7 +182,7 @@ public class InvokeEffectorOnCollectionSensorChange extends AbstractPolicy imple
             }
 
             LOG.debug("{} invoking {} on {} with parameters {}", new Object[]{this, effector, entity, parameters});
-            entity.invoke(effector.get(), parameters);
+            invoke(effector.get(), parameters);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
index ebb6da8..662fd1d 100644
--- a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
+++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
@@ -18,19 +18,23 @@
  */
 package org.apache.brooklyn.policy;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.text.StringPredicates;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
 
 /**
  * Invokes the given effector when the policy changes.
@@ -40,28 +44,56 @@ import com.google.common.base.Preconditions;
  * * support conditions
  * * allow to be triggered by sensors on members
  */
-public class InvokeEffectorOnSensorChange extends AbstractPolicy implements SensorEventListener<Object> {
+public class InvokeEffectorOnSensorChange extends AbstractInvokeEffectorPolicy implements SensorEventListener<Object> {
     
     private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnSensorChange.class);
 
-    public static final ConfigKey<Object> SENSOR = ConfigKeys.newConfigKey(Object.class, 
-            "sensor", "Sensor to be monitored, as string or sensor type");
+    public static final ConfigKey<Object> SENSOR = ConfigKeys.builder(Object.class)
+            .name("sensor")
+            .description("Sensor to be monitored, as string or sensor type")
+            .constraint(Predicates.notNull())
+            .build();
 
-    public static final ConfigKey<String> EFFECTOR = ConfigKeys.newStringConfigKey(
-            "effector", "Name of effector to invoke");
+    public static final ConfigKey<String> EFFECTOR = ConfigKeys.builder(String.class)
+            .name("effector")
+            .description("Name of effector to invoke")
+            .constraint(StringPredicates.isNonBlank())
+            .build();
+
+    private AttributeSensor<Object> sensor;
 
     @Override
     public void setEntity(EntityLocal entity) {
         super.setEntity(entity);
         Preconditions.checkNotNull(getConfig(EFFECTOR), EFFECTOR);
-        Object sensor = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR);
-        if (sensor instanceof String) sensor = Sensors.newSensor(Object.class, (String)sensor);
-        subscriptions().subscribe(entity, (Sensor<?>)sensor, this);
+        sensor = getSensor();
+        subscriptions().subscribe(entity, sensor, this);
+        LOG.debug("{} subscribed to {} events on {}", new Object[]{this, sensor, entity});
     }
 
     @Override
     public void onEvent(SensorEvent<Object> event) {
-        entity.invoke(entity.getEntityType().getEffectorByName(getConfig(EFFECTOR)).get(), MutableMap.<String, Object>of());
+        final Effector<?> eff = getEffectorNamed(getConfig(EFFECTOR)).get();
+        if (isBusySensorEnabled()) {
+            final Object currentSensorValue = entity.sensors().get(sensor);
+            setMoreUpdatesComing(event.getTimestamp(), event.getValue(), currentSensorValue);
+        }
+        invoke(eff, MutableMap.<String, Object>of());
+    }
+
+    private AttributeSensor<Object> getSensor() {
+        final Object configVal = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR);
+        final AttributeSensor<Object> sensor;
+        if (configVal == null) {
+            throw new NullPointerException("Value for " + SENSOR.getName() + " is null");
+        } else if (configVal instanceof String) {
+            sensor = Sensors.newSensor(Object.class, (String) configVal);
+        } else if (configVal instanceof AttributeSensor) {
+            sensor = (AttributeSensor<Object>) configVal;
+        } else {
+            sensor = TypeCoercions.tryCoerce(configVal, new TypeToken<AttributeSensor<Object>>() {}).get();
+        }
+        return sensor;
     }
     
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java b/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java
new file mode 100644
index 0000000..15f3b48
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicyTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+
+public class AbstractInvokeEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+
+    @Test
+    public void testCountReflectsNumberOfExecutingEffectors() {
+        final CountDownLatch effectorLatch = new CountDownLatch(1);
+        final AttributeSensor<Boolean> policyIsBusy = Sensors.newBooleanSensor(
+                "policyIsBusy");
+        final Effector<Void> blockingEffector = Effectors.effector(Void.class, "abstract-invoke-effector-policy-test")
+                .impl(new BlockingEffector(effectorLatch))
+                .build();
+        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        final TestAbstractInvokeEffectorPolicy policy = entity.policies().add(
+                PolicySpec.create(TestAbstractInvokeEffectorPolicy.class)
+                        .configure(AbstractInvokeEffectorPolicy.IS_BUSY_SENSOR_NAME, policyIsBusy.getName()));
+        final Task<?> effectorTask = policy.invoke(blockingEffector, ImmutableMap.<String, Object>of());
+
+        // expect isbusy on entity, effector incomplete.
+        Supplier<Boolean> effectorTaskDoneSupplier = new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+                return effectorTask.isDone();
+            }
+        };
+        Asserts.continually(effectorTaskDoneSupplier, Predicates.equalTo(false));
+        EntityAsserts.assertAttributeEqualsEventually(entity, policyIsBusy, true);
+
+        effectorLatch.countDown();
+
+        Asserts.eventually(effectorTaskDoneSupplier, Predicates.equalTo(true));
+        EntityAsserts.assertAttributeEqualsEventually(entity, policyIsBusy, false);
+    }
+
+    public static class TestAbstractInvokeEffectorPolicy extends AbstractInvokeEffectorPolicy {
+        public TestAbstractInvokeEffectorPolicy() {
+        }
+
+        @Override
+        public void setEntity(EntityLocal entity) {
+            super.setEntity(entity);
+        }
+    }
+
+    private static class BlockingEffector extends EffectorBody<Void> {
+        final CountDownLatch latch;
+
+        private BlockingEffector(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        public Void call(ConfigBag config) {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            }
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
index ab72f71..49d8c33 100644
--- a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
+++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChangeTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -34,6 +35,8 @@ import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.policy.PolicySpec;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.effector.AddEffector;
 import org.apache.brooklyn.core.effector.EffectorBody;
 import org.apache.brooklyn.core.effector.Effectors;
@@ -48,6 +51,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -58,6 +62,9 @@ public class InvokeEffectorOnCollectionSensorChangeTest extends BrooklynAppUnitT
     private static final AttributeSensor<Collection<Integer>> DEFAULT_SENSOR = Sensors.newSensor(new TypeToken<Collection<Integer>>() {},
             "invokeeffectoronsetchangetest.sensor");
 
+    private static final AttributeSensor<Boolean> IS_BUSY_SENSOR = Sensors.newBooleanSensor(
+            "invokeeffectoronsetchangetest.isBusy");
+
     LinkedBlockingQueue<ConfigBag> onAddedParameters;
     LinkedBlockingQueue<ConfigBag> onRemovedParameters;
     Effector<Void> onAddedEffector;
@@ -202,13 +209,29 @@ public class InvokeEffectorOnCollectionSensorChangeTest extends BrooklynAppUnitT
                 .configure(InvokeEffectorOnCollectionSensorChange.ON_REMOVED_EFFECTOR_NAME, onRemovedEffector.getName()));
     }
 
+    @Test
+    public void testPublishesIsBusySensor() {
+        final List<Boolean> isBusyValues = new CopyOnWriteArrayList<>();
+        testEntity.subscriptions().subscribe(testEntity, IS_BUSY_SENSOR, new SensorEventListener<Boolean>() {
+            @Override
+            public void onEvent(SensorEvent<Boolean> event) {
+                isBusyValues.add(event.getValue());
+            }
+        });
+        addSetChangePolicy(true, false);
+        testEntity.sensors().set(DEFAULT_SENSOR, ImmutableSet.of(1));
+        List<Boolean> expected = ImmutableList.of(false, true, false);
+        Asserts.eventually(Suppliers.ofInstance(isBusyValues), Predicates.equalTo(expected));
+    }
+
     private void addSetChangePolicy(boolean includeOnAdded, boolean includeOnRemoved) {
         addSetChangePolicy(DEFAULT_SENSOR, includeOnAdded, includeOnRemoved);
     }
 
     private void addSetChangePolicy(AttributeSensor<? extends Collection<?>> sensor, boolean includeOnAdded, boolean includeOnRemoved) {
         PolicySpec<InvokeEffectorOnCollectionSensorChange> policySpec = PolicySpec.create(InvokeEffectorOnCollectionSensorChange.class)
-                .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, sensor);
+                .configure(InvokeEffectorOnCollectionSensorChange.TRIGGER_SENSOR, sensor)
+                .configure(InvokeEffectorOnCollectionSensorChange.IS_BUSY_SENSOR_NAME, IS_BUSY_SENSOR.getName());
         if (includeOnAdded) {
             policySpec.configure(InvokeEffectorOnCollectionSensorChange.ON_ADDED_EFFECTOR_NAME, onAddedEffector.getName());
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a413cedf/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java
new file mode 100644
index 0000000..cfeaf83
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChangeIntegrationTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.AddEffector;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.entity.stock.BasicEntity;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.testng.annotations.Test;
+
+public class InvokeEffectorOnSensorChangeIntegrationTest extends BrooklynAppUnitTestSupport {
+
+    @Test(groups = "Integration")
+    public void testIsBusySensorAlwaysFalseAtEnd() throws InterruptedException {
+        /*
+         * Stress-test isBusy. Reliably failed with insufficient synchronisation
+         * in AbstractInvokeEffectorPolicy.
+         */
+        final AttributeSensor<String> sensor = Sensors.newStringSensor("my-sensor");
+        final AttributeSensor<Boolean> isBusy = Sensors.newBooleanSensor("is-busy");
+        Effector<Void> effector = Effectors.effector(Void.class, "effector")
+                .impl(new DoNothingEffector())
+                .build();
+        final BasicEntity entity = app.createAndManageChild(EntitySpec.create(BasicEntity.class)
+                .addInitializer(new AddEffector(effector))
+                .policy(PolicySpec.create(InvokeEffectorOnSensorChange.class)
+                        .configure(InvokeEffectorOnSensorChange.SENSOR, sensor)
+                        .configure(InvokeEffectorOnSensorChange.EFFECTOR, "effector")
+                        .configure(InvokeEffectorOnSensorChange.IS_BUSY_SENSOR_NAME, isBusy.getName())));
+        final AtomicInteger threadId = new AtomicInteger();
+        Thread[] threads = new Thread[10];
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(new Runnable() {
+                private int count = 0;
+                @Override
+                public void run() {
+                    int id = threadId.incrementAndGet();
+                    while (count++ < 1000) {
+                        entity.sensors().set(sensor, "thread-" + id + "-" + count);
+                    }
+                }
+            });
+            threads[i].start();
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        EntityAsserts.assertAttributeEqualsEventually(entity, isBusy, false);
+    }
+
+
+    private static class DoNothingEffector extends EffectorBody<Void> {
+        @Override
+        public Void call(ConfigBag config) {
+            return null;
+        }
+    }
+
+}
\ No newline at end of file


[3/3] brooklyn-server git commit: Closes #386

Posted by sv...@apache.org.
Closes #386

Add AbstractInvokeEffectorPolicy

Supports tracking the number of effector tasks that are ongoing and publishing that as an "is busy" sensor on the entity the policy is attached to. Tracking is enabled by providing a value for `isBusySensor` when configuring the policy.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/f4ebc282
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/f4ebc282
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/f4ebc282

Branch: refs/heads/master
Commit: f4ebc2822f5e359ac48357830ff690fa6f603812
Parents: 23be8b2 a413ced
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Mon Oct 31 13:10:38 2016 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Mon Oct 31 13:10:38 2016 +0200

----------------------------------------------------------------------
 .../policy/AbstractInvokeEffectorPolicy.java    | 145 +++++++++++++++++++
 .../InvokeEffectorOnCollectionSensorChange.java |  14 +-
 .../policy/InvokeEffectorOnSensorChange.java    |  54 +++++--
 .../AbstractInvokeEffectorPolicyTest.java       | 105 ++++++++++++++
 ...okeEffectorOnCollectionSensorChangeTest.java |  25 +++-
 ...keEffectorOnSensorChangeIntegrationTest.java |  87 +++++++++++
 .../JcloudsLocationSecurityGroupCustomizer.java |   4 +-
 7 files changed, 417 insertions(+), 17 deletions(-)
----------------------------------------------------------------------