You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by dr...@apache.org on 2017/09/16 08:14:15 UTC

[1/3] brooklyn-server git commit: New periodic and scheduled effector policies

Repository: brooklyn-server
Updated Branches:
  refs/heads/master c9f7b0dc5 -> e85d26485


New periodic and scheduled effector policies


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/00deb7df
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/00deb7df
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/00deb7df

Branch: refs/heads/master
Commit: 00deb7df05b52cc5168e6f56f3cc66739f11009c
Parents: 001730b
Author: Andrew Donald Kennedy <an...@cloudsoftcorp.com>
Authored: Wed Sep 13 18:03:56 2017 +0100
Committer: Andrew Donald Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Sep 15 13:19:30 2017 +0100

----------------------------------------------------------------------
 .../action/AbstractScheduledEffectorPolicy.java | 220 +++++++++++++++++++
 .../policy/action/PeriodicEffectorPolicy.java   | 102 +++++++++
 .../policy/action/ScheduledEffectorPolicy.java  | 147 +++++++++++++
 policy/src/main/resources/catalog.bom           |  15 ++
 .../action/PeriodicEffectorPolicyTest.java      |  98 +++++++++
 .../action/ScheduledEffectorPolicyTest.java     | 116 ++++++++++
 6 files changed, 698 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
new file mode 100644
index 0000000..73438c6
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.action;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.entity.trait.Startable;
+import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.config.ResolvingConfigBag;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.DurationPredicates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.reflect.TypeToken;
+
+@Beta
+public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy implements Runnable, SensorEventListener<Object> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractScheduledEffectorPolicy.class);
+
+    public static final String TIME_FORMAT = "HH:mm:ss";
+    public static final String NOW = "now";
+    public static final String IMMEDIATELY = "immediately";
+
+    private static final DateFormat FORMATTER = SimpleDateFormat.getTimeInstance();
+
+    public static final ConfigKey<String> EFFECTOR = ConfigKeys.builder(String.class)
+            .name("effector")
+            .description("The effector to be executed by this policy")
+            .constraint(Predicates.notNull())
+            .build();
+
+    public static final ConfigKey<Map<String, Object>> EFFECTOR_ARGUMENTS = ConfigKeys.builder(new TypeToken<Map<String, Object>>() { })
+            .name("args")
+            .description("The effector arguments and their values")
+            .constraint(Predicates.notNull())
+            .defaultValue(ImmutableMap.<String, Object>of())
+            .build();
+
+    public static final ConfigKey<String> TIME = ConfigKeys.builder(String.class)
+            .name("time")
+            .description("An optional time when this policy should be first executed, formatted as HH:mm:ss")
+            .build();
+
+    public static final ConfigKey<Duration> WAIT = ConfigKeys.builder(Duration.class)
+            .name("wait")
+            .description("An optional duration after which this policy should be first executed. The time config takes precedence if present")
+            .constraint(Predicates.or(Predicates.isNull(), DurationPredicates.positive()))
+            .build();
+
+    public static final ConfigKey<AttributeSensor<Boolean>> START_SENSOR = ConfigKeys.builder(new TypeToken<AttributeSensor<Boolean>>() { })
+            .name("start.sensor")
+            .description("The sensor which should trigger starting the periodic execution scheduler")
+            .defaultValue(Startable.SERVICE_UP)
+            .build();
+
+    public static final ConfigKey<Boolean> RUNNING = ConfigKeys.builder(Boolean.class)
+            .name("running")
+            .description("Set if the executor has started")
+            .defaultValue(Boolean.FALSE)
+            .reconfigurable(true)
+            .build();
+
+    protected final AtomicBoolean running = new AtomicBoolean(false);
+    protected final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    protected final Object mutex = new Object[0];
+
+    protected Effector<?> effector;
+
+    public AbstractScheduledEffectorPolicy() {
+        this(MutableMap.<String,Object>of());
+    }
+
+    public AbstractScheduledEffectorPolicy(Map<String,?> props) {
+        super(props);
+    }
+
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+
+        effector = getEffector();
+
+        AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
+        subscriptions().subscribe(entity, sensor, this);
+    }
+
+    @Override
+    public void rebind() {
+        if (config().get(RUNNING)) {
+            running.set(true);
+            start();
+        }
+    }
+
+    @Override
+    protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
+        if (key.isReconfigurable()) {
+            return;
+        } else {
+            throw new UnsupportedOperationException("Reconfiguring key " + key.getName() + " not supported on " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void destroy(){
+        super.destroy();
+        executor.shutdownNow();
+    }
+
+    public abstract void start();
+
+    protected Effector<?> getEffector() {
+        String effectorName = config().get(EFFECTOR);
+        Maybe<Effector<?>> effector = entity.getEntityType().getEffectorByName(effectorName);
+        if (effector.isAbsentOrNull()) {
+            throw new IllegalStateException("Cannot find effector " + effectorName);
+        }
+        return effector.get();
+    }
+
+    protected Duration getWaitUntil(String time) {
+        if (time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) {
+            return Duration.ZERO;
+        }
+        try {
+            Calendar now = Calendar.getInstance();
+            Calendar when = Calendar.getInstance();
+            boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion
+            Date parsed = formatted ? FORMATTER.parse(time) : new Date(Long.parseLong(time) * 1000);
+            when.setTime(parsed);
+            when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE));
+            if (when.before(now)) {
+                when.add(Calendar.DATE, 1);
+            }
+            return Duration.millis(Math.max(0, when.getTimeInMillis() - now.getTimeInMillis()));
+        } catch (ParseException | NumberFormatException e) {
+            LOG.warn("{}: Time should be formatted as {}: {}", new Object[] { this, TIME_FORMAT, e.getMessage() });
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    @Override
+    public void run() {
+        synchronized (mutex) {
+            try {
+                ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag());
+                Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS);
+                LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) });
+                Map<String, Object> resolved = (Map) Tasks.resolving(args, Object.class)
+                        .deep(true)
+                        .context(entity)
+                        .get();
+
+                LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved });
+                Object result = entity.invoke(effector, resolved).getUnchecked();
+                LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result });
+            } catch (Throwable t) {
+                LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() });
+                Exceptions.propagate(t);
+            }
+        }
+    }
+
+    @Override
+    public void onEvent(SensorEvent<Object> event) {
+        synchronized (mutex) {
+            LOG.debug("{}: Got event {}", this, event);
+            AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
+            if (event.getSensor().getName().equals(sensor.getName())) {
+                Boolean start = (Boolean) event.getValue();
+                if (start && running.compareAndSet(false, true)) {
+                    config().set(RUNNING, true);
+                    start();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
new file mode 100644
index 0000000..58c10f2
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.action;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.DurationPredicates;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Policy} that executes an {@link Effector} at specific intervals.
+ * <p>
+ * The following example shows a pair of policies that resize a cluster
+ * from one to ten entities during the day  and back to one at night,:
+ * <pre>{@code
+ * brooklyn.policies:
+ *   - type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ *     brooklyn.config:
+ *       effector: resize
+ *       args:
+ *         desiredSize: 10
+ *       period: 1 day
+ *       time: 08:00:00
+ *   - type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ *     brooklyn.config:
+ *       effector: resize
+ *       args:
+ *         desiredSize: 1
+ *       period: 1 day
+ *       time: 18:00:00
+ * }</pre>
+ */
+@Beta
+public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PeriodicEffectorPolicy.class);
+
+    public static final ConfigKey<Duration> PERIOD = ConfigKeys.builder(Duration.class)
+            .name("period")
+            .description("The duration between executions of this policy")
+            .constraint(DurationPredicates.positive())
+            .defaultValue(Duration.hours(1))
+            .build();
+
+    public PeriodicEffectorPolicy() {
+        this(MutableMap.<String,Object>of());
+    }
+
+    public PeriodicEffectorPolicy(Map<String,?> props) {
+        super(props);
+    }
+
+    @Override
+    public void setEntity(final EntityLocal entity) {
+        super.setEntity(entity);
+    }
+
+    @Override
+    public void start() {
+        Duration period = Preconditions.checkNotNull(config().get(PERIOD), "The period must be configured for this policy");
+        String time = config().get(TIME);
+        Duration wait = config().get(WAIT);
+        if (time != null) {
+            wait = getWaitUntil(time);
+        } else if (wait == null) {
+            wait = period;
+        }
+
+        LOG.debug("{}: Scheduling {} every {} in {}", new Object[] { PeriodicEffectorPolicy.this, effector.getName(),
+                Time.fromDurationToTimeStringRounded().apply(period), Time.fromDurationToTimeStringRounded().apply(wait) });
+        executor.scheduleAtFixedRate(PeriodicEffectorPolicy.this, wait.toMilliseconds(), period.toMilliseconds(), TimeUnit.MILLISECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
new file mode 100644
index 0000000..44e6aac
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.action;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A {@link Policy} the executes an {@link Effector} at a specific time in the future.
+ * <p>
+ * <pre>{@code
+ * brooklyn.policies:
+ *   - type: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy
+ *     brooklyn.config:
+ *       effector: update
+ *       time: 12:00:00
+ * }</pre>
+ */
+@Beta
+public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ScheduledEffectorPolicy.class);
+
+    public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { })
+            .name("scheduled")
+            .description("List of all scheduled execution start times")
+            .defaultValue(Lists.newCopyOnWriteArrayList())
+            .reconfigurable(true)
+            .build();
+
+    public static final AttributeSensor<Boolean> INVOKE_IMMEDIATELY = Sensors.newBooleanSensor("scheduler.invoke.now", "Invoke the configured effector immediately when this becomes true");
+    public static final AttributeSensor<Date> INVOKE_AT = Sensors.newSensor(Date.class, "scheduler.invoke.at", "Invoke the configured effector at this time");
+
+    public ScheduledEffectorPolicy() {
+        this(MutableMap.<String,Object>of());
+    }
+
+    public ScheduledEffectorPolicy(Map<String,?> props) {
+        super(props);
+    }
+
+    @Override
+    public void setEntity(final EntityLocal entity) {
+        super.setEntity(entity);
+
+        subscriptions().subscribe(entity, INVOKE_IMMEDIATELY, this);
+        subscriptions().subscribe(entity, INVOKE_AT, this);
+    }
+
+    @Override
+    public void rebind() {
+        super.rebind();
+        List<Long> scheduled = config().get(SCHEDULED);
+        for (Long when : scheduled) {
+            Duration wait = Duration.millis(when - System.currentTimeMillis());
+            if (wait.isPositive()) {
+                schedule(wait);
+            } else {
+                scheduled.remove(when);
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        String time = config().get(TIME);
+        Duration wait = config().get(WAIT);
+
+        if (time != null) {
+            LOG.debug("{}: Scheduling {} at {} (in {})",
+                    new Object[] { this, effector.getName(), time, Time.fromDurationToTimeStringRounded().apply(wait) });
+            wait = getWaitUntil(time);
+        }
+
+        if (wait != null) {
+            schedule(wait);
+        }
+    }
+
+    protected void schedule(Duration wait) {
+        List<Long> scheduled = config().get(SCHEDULED);
+        scheduled.add(System.currentTimeMillis() + wait.toMilliseconds());
+
+        LOG.debug("{}: Scheduling {} in {} ({} ms)",
+                new Object[] { this, effector.getName(), Time.fromDurationToTimeStringRounded().apply(wait), wait.toMilliseconds() });
+        executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void onEvent(SensorEvent<Object> event) {
+        synchronized (mutex) {
+            super.onEvent(event);
+
+            if (running.get()) {
+                if (event.getSensor().getName().equals(INVOKE_AT.getName())) {
+                    String time = (String) event.getValue();
+                    if (time != null) {
+                        schedule(getWaitUntil(time));
+                    }
+                }
+                if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
+                    Boolean invoke = (Boolean) event.getValue();
+                    if (invoke) {
+                        schedule(Duration.ZERO);
+                    }
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/main/resources/catalog.bom
----------------------------------------------------------------------
diff --git a/policy/src/main/resources/catalog.bom b/policy/src/main/resources/catalog.bom
index d92b836..9c9e323 100644
--- a/policy/src/main/resources/catalog.bom
+++ b/policy/src/main/resources/catalog.bom
@@ -49,6 +49,21 @@ brooklyn.catalog:
           Policy that is attached to a Resizable entity and dynamically adjusts its size in 
           response to either keep a metric within a given range, or in response to 
           POOL_COLD and POOL_HOT events
+    - id: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+      itemType: policy
+      item:
+        type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+        name: Periodic Effector Execution
+        description: |
+          Policy that executes an effector repeatedly at configurable intervals.
+    - id: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy
+      itemType: policy
+      item:
+        type: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy
+        name: Scheduled Effector Execution
+        description: |
+          Policy that executes an effector at a configurable time or after
+          a configurable delay.
 
 #  Removed from catalog because 'FollowTheSunPool' cannot currently be configured via catalog mechanisms.
 #  Also removing associated 'BalanceableWorkerPool' etc as they are only useful with 'FollowTheSunPool'

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
new file mode 100644
index 0000000..e6ae74d
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy.action;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+
+    @Test
+    public void testPeriodicEffectorFires() {
+        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
+                        .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
+                        .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+                        .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
+                        .configure(PeriodicEffectorPolicy.TIME, "immediately")
+                        .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+        Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
+        Asserts.assertNotNull(policy);
+
+        Asserts.assertTrue(entity.getCallHistory().isEmpty());
+        Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
+
+        entity.sensors().set(start, Boolean.TRUE);
+        Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
+        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+        int calls = entity.getCallHistory().size();
+        Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500));
+    }
+
+    @Test
+    public void testPeriodicEffectorFiresAfterDelay() {
+        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
+                        .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
+                        .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+                        .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
+                        .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS)
+                        .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+        Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
+        Asserts.assertNotNull(policy);
+
+        Asserts.assertTrue(entity.getCallHistory().isEmpty());
+        Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
+
+        entity.sensors().set(start, Boolean.TRUE);
+        Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
+        sleep(Duration.seconds(5));
+        Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
+        sleep(Duration.seconds(5));
+        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+        int calls = entity.getCallHistory().size();
+        Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500));
+    }
+
+    private void sleep(Duration duration) {
+        try {
+            Thread.sleep(duration.toMilliseconds());
+        } catch (InterruptedException ie) {
+            Exceptions.propagate(ie);  
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
new file mode 100644
index 0000000..daa186e
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy.action;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+
+    @Test
+    public void testScheduledEffectorFiresImmediately() {
+        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
+                        .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
+                        .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+                        .configure(ScheduledEffectorPolicy.TIME, "immediately")
+                        .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+        Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
+        Asserts.assertNotNull(policy);
+
+        Asserts.assertTrue(entity.getCallHistory().isEmpty());
+        Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
+
+        entity.sensors().set(start, Boolean.TRUE);
+        Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
+        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+    }
+
+    @Test
+    public void testScheduledEffectorFiresAfterDelay() {
+        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
+                        .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
+                        .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+                        .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS)
+                        .configure(ScheduledEffectorPolicy.START_SENSOR, start)));
+        Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
+        Asserts.assertNotNull(policy);
+
+        Asserts.assertTrue(entity.getCallHistory().isEmpty());
+        Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
+
+        entity.sensors().set(start, Boolean.TRUE);
+        Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
+        sleep(Duration.seconds(5));
+        Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
+        sleep(Duration.seconds(5));
+        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+    }
+
+    @Test
+    public void testScheduledEffectorFiresOnSensor() {
+        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
+                        .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
+                        .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+                        .configure(ScheduledEffectorPolicy.START_SENSOR, start)));
+        Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
+        Asserts.assertNotNull(policy);
+
+        Asserts.assertTrue(entity.getCallHistory().isEmpty());
+        Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
+
+        entity.sensors().set(start, Boolean.TRUE);
+        Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
+        sleep(Duration.seconds(5));
+        Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
+
+        entity.sensors().set(ScheduledEffectorPolicy.INVOKE_IMMEDIATELY, Boolean.TRUE);
+        Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+    }
+
+    private void sleep(Duration duration) {
+        try {
+            Thread.sleep(duration.toMilliseconds());
+        } catch (InterruptedException ie) {
+            Exceptions.propagate(ie);  
+        }
+    }
+}


[2/3] brooklyn-server git commit: Fix issues with rebind

Posted by dr...@apache.org.
Fix issues with rebind


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/0a71cf58
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/0a71cf58
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/0a71cf58

Branch: refs/heads/master
Commit: 0a71cf58b018cde15b2f71f8b97dc063b871ebd6
Parents: 00deb7d
Author: Andrew Donald Kennedy <an...@cloudsoftcorp.com>
Authored: Fri Sep 15 17:14:11 2017 +0100
Committer: Andrew Donald Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Sep 15 17:21:19 2017 +0100

----------------------------------------------------------------------
 .../action/AbstractScheduledEffectorPolicy.java | 114 ++++++++++++-------
 .../policy/action/PeriodicEffectorPolicy.java   |  49 +++++---
 .../policy/action/ScheduledEffectorPolicy.java  |  72 ++----------
 .../action/PeriodicEffectorPolicyTest.java      |  18 ++-
 .../action/ScheduledEffectorPolicyTest.java     |  26 ++---
 5 files changed, 139 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
index 73438c6..5978ec5 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
@@ -23,9 +23,11 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.brooklyn.api.effector.Effector;
@@ -38,11 +40,11 @@ import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.trait.Startable;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
-import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.config.ResolvingConfigBag;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.DurationPredicates;
@@ -53,6 +55,7 @@ import com.google.common.annotations.Beta;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.reflect.TypeToken;
 
 @Beta
@@ -103,20 +106,35 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
             .reconfigurable(true)
             .build();
 
-    protected final AtomicBoolean running = new AtomicBoolean(false);
-    protected final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    protected final Object mutex = new Object[0];
+    public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { })
+            .name("scheduled")
+            .description("List of all scheduled execution start times")
+            .defaultValue(Lists.newCopyOnWriteArrayList())
+            .reconfigurable(true)
+            .build();
 
-    protected Effector<?> effector;
+    protected AtomicBoolean running;
+    protected ScheduledExecutorService executor;
+    protected Effector effector;
 
     public AbstractScheduledEffectorPolicy() {
-        this(MutableMap.<String,Object>of());
+        LOG.debug("Created new scheduled effector policy");
+    }
+
+    @Override
+    public void init() {
+        setup();
     }
 
-    public AbstractScheduledEffectorPolicy(Map<String,?> props) {
-        super(props);
+    public void setup() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+        executor = Executors.newSingleThreadScheduledExecutor();
+        running = new AtomicBoolean(false);
     }
 
+    @Override
     public void setEntity(EntityLocal entity) {
         super.setEntity(entity);
 
@@ -128,9 +146,22 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
 
     @Override
     public void rebind() {
+        setup();
+
         if (config().get(RUNNING)) {
             running.set(true);
-            start();
+        }
+
+        if (running.get()) {
+            List<Long> scheduled = config().get(SCHEDULED);
+            for (Long when : scheduled) {
+                Duration wait = Duration.millis(when - System.currentTimeMillis());
+                if (wait.isPositive()) {
+                    schedule(wait);
+                } else {
+                    scheduled.remove(when);
+                }
+            }
         }
     }
 
@@ -145,15 +176,15 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
 
     @Override
     public void destroy(){
-        super.destroy();
         executor.shutdownNow();
+        super.destroy();
     }
 
     public abstract void start();
 
     protected Effector<?> getEffector() {
         String effectorName = config().get(EFFECTOR);
-        Maybe<Effector<?>> effector = entity.getEntityType().getEffectorByName(effectorName);
+        Maybe<Effector<?>> effector = getEntity().getEntityType().getEffectorByName(effectorName);
         if (effector.isAbsentOrNull()) {
             throw new IllegalStateException("Cannot find effector " + effectorName);
         }
@@ -181,39 +212,46 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
         }
     }
 
+    protected void schedule(Duration wait) {
+        List<Long> scheduled = config().get(SCHEDULED);
+        scheduled.add(System.currentTimeMillis() + wait.toMilliseconds());
+
+        executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
+    }
+
     @Override
-    public void run() {
-        synchronized (mutex) {
-            try {
-                ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag());
-                Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS);
-                LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) });
-                Map<String, Object> resolved = (Map) Tasks.resolving(args, Object.class)
-                        .deep(true)
-                        .context(entity)
-                        .get();
-
-                LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved });
-                Object result = entity.invoke(effector, resolved).getUnchecked();
-                LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result });
-            } catch (Throwable t) {
-                LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() });
-                Exceptions.propagate(t);
-            }
+    public synchronized void run() {
+        if (effector == null) return;
+        try {
+            ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag());
+            Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS);
+            LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) });
+            Map<String, Object> resolved = (Map) Tasks.resolving(args, Object.class)
+                    .deep(true)
+                    .context(entity)
+                    .get();
+
+            LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved });
+            Object result = entity.invoke(effector, resolved).getUnchecked();
+            LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result });
+        } catch (RuntimeInterruptedException rie) {
+            Thread.interrupted();
+            // TODO sometimes this seems to hang the executor?
+        } catch (Throwable t) {
+            LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() });
+            Exceptions.propagate(t);
         }
     }
 
     @Override
     public void onEvent(SensorEvent<Object> event) {
-        synchronized (mutex) {
-            LOG.debug("{}: Got event {}", this, event);
-            AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
-            if (event.getSensor().getName().equals(sensor.getName())) {
-                Boolean start = (Boolean) event.getValue();
-                if (start && running.compareAndSet(false, true)) {
-                    config().set(RUNNING, true);
-                    start();
-                }
+        LOG.debug("{}: Got event {}", this, event);
+        AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
+        if (event.getSensor().getName().equals(sensor.getName())) {
+            Boolean start = (Boolean) event.getValue();
+            if (start && running.compareAndSet(false, true)) {
+                config().set(RUNNING, true);
+                start();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
index 58c10f2..f2c0936 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
@@ -18,18 +18,17 @@
  */
 package org.apache.brooklyn.policy.action;
 
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
 
 import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.policy.Policy;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.DurationPredicates;
-import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,16 +71,7 @@ public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy {
             .build();
 
     public PeriodicEffectorPolicy() {
-        this(MutableMap.<String,Object>of());
-    }
-
-    public PeriodicEffectorPolicy(Map<String,?> props) {
-        super(props);
-    }
-
-    @Override
-    public void setEntity(final EntityLocal entity) {
-        super.setEntity(entity);
+        super();
     }
 
     @Override
@@ -95,8 +85,33 @@ public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy {
             wait = period;
         }
 
-        LOG.debug("{}: Scheduling {} every {} in {}", new Object[] { PeriodicEffectorPolicy.this, effector.getName(),
-                Time.fromDurationToTimeStringRounded().apply(period), Time.fromDurationToTimeStringRounded().apply(wait) });
-        executor.scheduleAtFixedRate(PeriodicEffectorPolicy.this, wait.toMilliseconds(), period.toMilliseconds(), TimeUnit.MILLISECONDS);
+        schedule(wait);
+    }
+
+    @Override
+    public void rebind() {
+        super.rebind();
+
+        // Check if we missed an entire period
+        List<Long> scheduled = config().get(SCHEDULED);
+        if (running.get() && scheduled.isEmpty()) {
+            start();
+        }
+    }
+
+    @Override
+    public synchronized void run() {
+        try {
+            super.run();
+        } finally {
+            Duration period = config().get(PERIOD);
+            String time = config().get(TIME);
+            if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) {
+                schedule(period);
+            } else {
+                Duration wait = getWaitUntil(time);
+                schedule(wait.upperBound(period));
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
index 44e6aac..5a3bed7 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
@@ -19,27 +19,18 @@
 package org.apache.brooklyn.policy.action;
 
 import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.policy.Policy;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.Beta;
-import com.google.common.collect.Lists;
-import com.google.common.reflect.TypeToken;
 
 /**
  * A {@link Policy} the executes an {@link Effector} at a specific time in the future.
@@ -57,22 +48,11 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
 
     private static final Logger LOG = LoggerFactory.getLogger(ScheduledEffectorPolicy.class);
 
-    public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { })
-            .name("scheduled")
-            .description("List of all scheduled execution start times")
-            .defaultValue(Lists.newCopyOnWriteArrayList())
-            .reconfigurable(true)
-            .build();
-
     public static final AttributeSensor<Boolean> INVOKE_IMMEDIATELY = Sensors.newBooleanSensor("scheduler.invoke.now", "Invoke the configured effector immediately when this becomes true");
     public static final AttributeSensor<Date> INVOKE_AT = Sensors.newSensor(Date.class, "scheduler.invoke.at", "Invoke the configured effector at this time");
 
     public ScheduledEffectorPolicy() {
-        this(MutableMap.<String,Object>of());
-    }
-
-    public ScheduledEffectorPolicy(Map<String,?> props) {
-        super(props);
+        super();
     }
 
     @Override
@@ -84,27 +64,11 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
     }
 
     @Override
-    public void rebind() {
-        super.rebind();
-        List<Long> scheduled = config().get(SCHEDULED);
-        for (Long when : scheduled) {
-            Duration wait = Duration.millis(when - System.currentTimeMillis());
-            if (wait.isPositive()) {
-                schedule(wait);
-            } else {
-                scheduled.remove(when);
-            }
-        }
-    }
-
-    @Override
     public void start() {
         String time = config().get(TIME);
         Duration wait = config().get(WAIT);
 
         if (time != null) {
-            LOG.debug("{}: Scheduling {} at {} (in {})",
-                    new Object[] { this, effector.getName(), time, Time.fromDurationToTimeStringRounded().apply(wait) });
             wait = getWaitUntil(time);
         }
 
@@ -113,35 +77,23 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
         }
     }
 
-    protected void schedule(Duration wait) {
-        List<Long> scheduled = config().get(SCHEDULED);
-        scheduled.add(System.currentTimeMillis() + wait.toMilliseconds());
-
-        LOG.debug("{}: Scheduling {} in {} ({} ms)",
-                new Object[] { this, effector.getName(), Time.fromDurationToTimeStringRounded().apply(wait), wait.toMilliseconds() });
-        executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
-    }
-
     @Override
     public void onEvent(SensorEvent<Object> event) {
-        synchronized (mutex) {
-            super.onEvent(event);
+        super.onEvent(event);
 
-            if (running.get()) {
-                if (event.getSensor().getName().equals(INVOKE_AT.getName())) {
-                    String time = (String) event.getValue();
-                    if (time != null) {
-                        schedule(getWaitUntil(time));
-                    }
+        if (running.get()) {
+            if (event.getSensor().getName().equals(INVOKE_AT.getName())) {
+                String time = (String) event.getValue();
+                if (time != null) {
+                    schedule(getWaitUntil(time));
                 }
-                if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
-                    Boolean invoke = (Boolean) event.getValue();
-                    if (invoke) {
-                        schedule(Duration.ZERO);
-                    }
+            }
+            if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
+                Boolean invoke = (Boolean) event.getValue();
+                if (invoke) {
+                    schedule(Duration.ZERO);
                 }
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
index e6ae74d..0268d60 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
@@ -37,24 +37,24 @@ import com.google.common.collect.Iterables;
 
 public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport {
 
+    private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+
     @Test
     public void testPeriodicEffectorFires() {
-        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
-        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
                 .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
                         .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
                         .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
                         .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
                         .configure(PeriodicEffectorPolicy.TIME, "immediately")
-                        .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+                        .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
         Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
         Asserts.assertNotNull(policy);
 
         Asserts.assertTrue(entity.getCallHistory().isEmpty());
         Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
 
-        entity.sensors().set(start, Boolean.TRUE);
+        entity.sensors().set(START, Boolean.TRUE);
         Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
         Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
         int calls = entity.getCallHistory().size();
@@ -63,22 +63,20 @@ public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport {
 
     @Test
     public void testPeriodicEffectorFiresAfterDelay() {
-        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
-        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
                 .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
                         .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
                         .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
                         .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
                         .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS)
-                        .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+                        .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
         Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
         Asserts.assertNotNull(policy);
 
         Asserts.assertTrue(entity.getCallHistory().isEmpty());
         Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
 
-        entity.sensors().set(start, Boolean.TRUE);
+        entity.sensors().set(START, Boolean.TRUE);
         Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
         sleep(Duration.seconds(5));
         Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
index daa186e..5271de3 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
@@ -37,44 +37,42 @@ import com.google.common.collect.Iterables;
 
 public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport {
 
+    private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+
     @Test
     public void testScheduledEffectorFiresImmediately() {
-        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
-        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
                 .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
                         .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
                         .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
                         .configure(ScheduledEffectorPolicy.TIME, "immediately")
-                        .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+                        .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
         Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
         Asserts.assertNotNull(policy);
 
         Asserts.assertTrue(entity.getCallHistory().isEmpty());
         Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
 
-        entity.sensors().set(start, Boolean.TRUE);
+        entity.sensors().set(START, Boolean.TRUE);
         Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
         Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
     }
 
     @Test
     public void testScheduledEffectorFiresAfterDelay() {
-        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
-        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
                 .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
                         .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
                         .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
                         .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS)
-                        .configure(ScheduledEffectorPolicy.START_SENSOR, start)));
+                        .configure(ScheduledEffectorPolicy.START_SENSOR, START)));
         Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
         Asserts.assertNotNull(policy);
 
         Asserts.assertTrue(entity.getCallHistory().isEmpty());
         Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
 
-        entity.sensors().set(start, Boolean.TRUE);
+        entity.sensors().set(START, Boolean.TRUE);
         Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
         sleep(Duration.seconds(5));
         Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
@@ -84,20 +82,18 @@ public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport {
 
     @Test
     public void testScheduledEffectorFiresOnSensor() {
-        final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
-        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
                 .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
                         .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
                         .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
-                        .configure(ScheduledEffectorPolicy.START_SENSOR, start)));
+                        .configure(ScheduledEffectorPolicy.START_SENSOR, START)));
         Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
         Asserts.assertNotNull(policy);
 
         Asserts.assertTrue(entity.getCallHistory().isEmpty());
         Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
 
-        entity.sensors().set(start, Boolean.TRUE);
+        entity.sensors().set(START, Boolean.TRUE);
         Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
         sleep(Duration.seconds(5));
         Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));


[3/3] brooklyn-server git commit: This closes #822

Posted by dr...@apache.org.
This closes #822


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/e85d2648
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/e85d2648
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/e85d2648

Branch: refs/heads/master
Commit: e85d26485f8e62d3ba97d57bf16b6f3434b616cf
Parents: c9f7b0d 0a71cf5
Author: Duncan Godwin <dr...@googlemail.com>
Authored: Sat Sep 16 09:14:04 2017 +0100
Committer: Duncan Godwin <dr...@googlemail.com>
Committed: Sat Sep 16 09:14:04 2017 +0100

----------------------------------------------------------------------
 .../action/AbstractScheduledEffectorPolicy.java | 258 +++++++++++++++++++
 .../policy/action/PeriodicEffectorPolicy.java   | 117 +++++++++
 .../policy/action/ScheduledEffectorPolicy.java  |  99 +++++++
 policy/src/main/resources/catalog.bom           |  15 ++
 .../action/PeriodicEffectorPolicyTest.java      |  96 +++++++
 .../action/ScheduledEffectorPolicyTest.java     | 112 ++++++++
 6 files changed, 697 insertions(+)
----------------------------------------------------------------------