You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by dr...@apache.org on 2017/09/16 08:14:15 UTC
[1/3] brooklyn-server git commit: New periodic and scheduled effector
policies
Repository: brooklyn-server
Updated Branches:
refs/heads/master c9f7b0dc5 -> e85d26485
New periodic and scheduled effector policies
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/00deb7df
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/00deb7df
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/00deb7df
Branch: refs/heads/master
Commit: 00deb7df05b52cc5168e6f56f3cc66739f11009c
Parents: 001730b
Author: Andrew Donald Kennedy <an...@cloudsoftcorp.com>
Authored: Wed Sep 13 18:03:56 2017 +0100
Committer: Andrew Donald Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Sep 15 13:19:30 2017 +0100
----------------------------------------------------------------------
.../action/AbstractScheduledEffectorPolicy.java | 220 +++++++++++++++++++
.../policy/action/PeriodicEffectorPolicy.java | 102 +++++++++
.../policy/action/ScheduledEffectorPolicy.java | 147 +++++++++++++
policy/src/main/resources/catalog.bom | 15 ++
.../action/PeriodicEffectorPolicyTest.java | 98 +++++++++
.../action/ScheduledEffectorPolicyTest.java | 116 ++++++++++
6 files changed, 698 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
new file mode 100644
index 0000000..73438c6
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.action;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.entity.trait.Startable;
+import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.config.ResolvingConfigBag;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.DurationPredicates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.reflect.TypeToken;
+
+@Beta
+public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy implements Runnable, SensorEventListener<Object> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractScheduledEffectorPolicy.class);
+
+ public static final String TIME_FORMAT = "HH:mm:ss";
+ public static final String NOW = "now";
+ public static final String IMMEDIATELY = "immediately";
+
+ private static final DateFormat FORMATTER = SimpleDateFormat.getTimeInstance();
+
+ public static final ConfigKey<String> EFFECTOR = ConfigKeys.builder(String.class)
+ .name("effector")
+ .description("The effector to be executed by this policy")
+ .constraint(Predicates.notNull())
+ .build();
+
+ public static final ConfigKey<Map<String, Object>> EFFECTOR_ARGUMENTS = ConfigKeys.builder(new TypeToken<Map<String, Object>>() { })
+ .name("args")
+ .description("The effector arguments and their values")
+ .constraint(Predicates.notNull())
+ .defaultValue(ImmutableMap.<String, Object>of())
+ .build();
+
+ public static final ConfigKey<String> TIME = ConfigKeys.builder(String.class)
+ .name("time")
+ .description("An optional time when this policy should be first executed, formatted as HH:mm:ss")
+ .build();
+
+ public static final ConfigKey<Duration> WAIT = ConfigKeys.builder(Duration.class)
+ .name("wait")
+ .description("An optional duration after which this policy should be first executed. The time config takes precedence if present")
+ .constraint(Predicates.or(Predicates.isNull(), DurationPredicates.positive()))
+ .build();
+
+ public static final ConfigKey<AttributeSensor<Boolean>> START_SENSOR = ConfigKeys.builder(new TypeToken<AttributeSensor<Boolean>>() { })
+ .name("start.sensor")
+ .description("The sensor which should trigger starting the periodic execution scheduler")
+ .defaultValue(Startable.SERVICE_UP)
+ .build();
+
+ public static final ConfigKey<Boolean> RUNNING = ConfigKeys.builder(Boolean.class)
+ .name("running")
+ .description("Set if the executor has started")
+ .defaultValue(Boolean.FALSE)
+ .reconfigurable(true)
+ .build();
+
+ protected final AtomicBoolean running = new AtomicBoolean(false);
+ protected final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ protected final Object mutex = new Object[0];
+
+ protected Effector<?> effector;
+
+ public AbstractScheduledEffectorPolicy() {
+ this(MutableMap.<String,Object>of());
+ }
+
+ public AbstractScheduledEffectorPolicy(Map<String,?> props) {
+ super(props);
+ }
+
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+
+ effector = getEffector();
+
+ AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
+ subscriptions().subscribe(entity, sensor, this);
+ }
+
+ @Override
+ public void rebind() {
+ if (config().get(RUNNING)) {
+ running.set(true);
+ start();
+ }
+ }
+
+ @Override
+ protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
+ if (key.isReconfigurable()) {
+ return;
+ } else {
+ throw new UnsupportedOperationException("Reconfiguring key " + key.getName() + " not supported on " + getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public void destroy(){
+ super.destroy();
+ executor.shutdownNow();
+ }
+
+ public abstract void start();
+
+ protected Effector<?> getEffector() {
+ String effectorName = config().get(EFFECTOR);
+ Maybe<Effector<?>> effector = entity.getEntityType().getEffectorByName(effectorName);
+ if (effector.isAbsentOrNull()) {
+ throw new IllegalStateException("Cannot find effector " + effectorName);
+ }
+ return effector.get();
+ }
+
+ protected Duration getWaitUntil(String time) {
+ if (time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) {
+ return Duration.ZERO;
+ }
+ try {
+ Calendar now = Calendar.getInstance();
+ Calendar when = Calendar.getInstance();
+ boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion
+ Date parsed = formatted ? FORMATTER.parse(time) : new Date(Long.parseLong(time) * 1000);
+ when.setTime(parsed);
+ when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE));
+ if (when.before(now)) {
+ when.add(Calendar.DATE, 1);
+ }
+ return Duration.millis(Math.max(0, when.getTimeInMillis() - now.getTimeInMillis()));
+ } catch (ParseException | NumberFormatException e) {
+ LOG.warn("{}: Time should be formatted as {}: {}", new Object[] { this, TIME_FORMAT, e.getMessage() });
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public void run() {
+ synchronized (mutex) {
+ try {
+ ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag());
+ Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS);
+ LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) });
+ Map<String, Object> resolved = (Map) Tasks.resolving(args, Object.class)
+ .deep(true)
+ .context(entity)
+ .get();
+
+ LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved });
+ Object result = entity.invoke(effector, resolved).getUnchecked();
+ LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result });
+ } catch (Throwable t) {
+ LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() });
+ Exceptions.propagate(t);
+ }
+ }
+ }
+
+ @Override
+ public void onEvent(SensorEvent<Object> event) {
+ synchronized (mutex) {
+ LOG.debug("{}: Got event {}", this, event);
+ AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
+ if (event.getSensor().getName().equals(sensor.getName())) {
+ Boolean start = (Boolean) event.getValue();
+ if (start && running.compareAndSet(false, true)) {
+ config().set(RUNNING, true);
+ start();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
new file mode 100644
index 0000000..58c10f2
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.action;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.DurationPredicates;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Policy} that executes an {@link Effector} at specific intervals.
+ * <p>
+ * The following example shows a pair of policies that resize a cluster
+ * from one to ten entities during the day and back to one at night,:
+ * <pre>{@code
+ * brooklyn.policies:
+ * - type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ * brooklyn.config:
+ * effector: resize
+ * args:
+ * desiredSize: 10
+ * period: 1 day
+ * time: 08:00:00
+ * - type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ * brooklyn.config:
+ * effector: resize
+ * args:
+ * desiredSize: 1
+ * period: 1 day
+ * time: 18:00:00
+ * }</pre>
+ */
+@Beta
+public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PeriodicEffectorPolicy.class);
+
+ public static final ConfigKey<Duration> PERIOD = ConfigKeys.builder(Duration.class)
+ .name("period")
+ .description("The duration between executions of this policy")
+ .constraint(DurationPredicates.positive())
+ .defaultValue(Duration.hours(1))
+ .build();
+
+ public PeriodicEffectorPolicy() {
+ this(MutableMap.<String,Object>of());
+ }
+
+ public PeriodicEffectorPolicy(Map<String,?> props) {
+ super(props);
+ }
+
+ @Override
+ public void setEntity(final EntityLocal entity) {
+ super.setEntity(entity);
+ }
+
+ @Override
+ public void start() {
+ Duration period = Preconditions.checkNotNull(config().get(PERIOD), "The period must be configured for this policy");
+ String time = config().get(TIME);
+ Duration wait = config().get(WAIT);
+ if (time != null) {
+ wait = getWaitUntil(time);
+ } else if (wait == null) {
+ wait = period;
+ }
+
+ LOG.debug("{}: Scheduling {} every {} in {}", new Object[] { PeriodicEffectorPolicy.this, effector.getName(),
+ Time.fromDurationToTimeStringRounded().apply(period), Time.fromDurationToTimeStringRounded().apply(wait) });
+ executor.scheduleAtFixedRate(PeriodicEffectorPolicy.this, wait.toMilliseconds(), period.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
new file mode 100644
index 0000000..44e6aac
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.action;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A {@link Policy} the executes an {@link Effector} at a specific time in the future.
+ * <p>
+ * <pre>{@code
+ * brooklyn.policies:
+ * - type: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy
+ * brooklyn.config:
+ * effector: update
+ * time: 12:00:00
+ * }</pre>
+ */
+@Beta
+public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ScheduledEffectorPolicy.class);
+
+ public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { })
+ .name("scheduled")
+ .description("List of all scheduled execution start times")
+ .defaultValue(Lists.newCopyOnWriteArrayList())
+ .reconfigurable(true)
+ .build();
+
+ public static final AttributeSensor<Boolean> INVOKE_IMMEDIATELY = Sensors.newBooleanSensor("scheduler.invoke.now", "Invoke the configured effector immediately when this becomes true");
+ public static final AttributeSensor<Date> INVOKE_AT = Sensors.newSensor(Date.class, "scheduler.invoke.at", "Invoke the configured effector at this time");
+
+ public ScheduledEffectorPolicy() {
+ this(MutableMap.<String,Object>of());
+ }
+
+ public ScheduledEffectorPolicy(Map<String,?> props) {
+ super(props);
+ }
+
+ @Override
+ public void setEntity(final EntityLocal entity) {
+ super.setEntity(entity);
+
+ subscriptions().subscribe(entity, INVOKE_IMMEDIATELY, this);
+ subscriptions().subscribe(entity, INVOKE_AT, this);
+ }
+
+ @Override
+ public void rebind() {
+ super.rebind();
+ List<Long> scheduled = config().get(SCHEDULED);
+ for (Long when : scheduled) {
+ Duration wait = Duration.millis(when - System.currentTimeMillis());
+ if (wait.isPositive()) {
+ schedule(wait);
+ } else {
+ scheduled.remove(when);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ String time = config().get(TIME);
+ Duration wait = config().get(WAIT);
+
+ if (time != null) {
+ LOG.debug("{}: Scheduling {} at {} (in {})",
+ new Object[] { this, effector.getName(), time, Time.fromDurationToTimeStringRounded().apply(wait) });
+ wait = getWaitUntil(time);
+ }
+
+ if (wait != null) {
+ schedule(wait);
+ }
+ }
+
+ protected void schedule(Duration wait) {
+ List<Long> scheduled = config().get(SCHEDULED);
+ scheduled.add(System.currentTimeMillis() + wait.toMilliseconds());
+
+ LOG.debug("{}: Scheduling {} in {} ({} ms)",
+ new Object[] { this, effector.getName(), Time.fromDurationToTimeStringRounded().apply(wait), wait.toMilliseconds() });
+ executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void onEvent(SensorEvent<Object> event) {
+ synchronized (mutex) {
+ super.onEvent(event);
+
+ if (running.get()) {
+ if (event.getSensor().getName().equals(INVOKE_AT.getName())) {
+ String time = (String) event.getValue();
+ if (time != null) {
+ schedule(getWaitUntil(time));
+ }
+ }
+ if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
+ Boolean invoke = (Boolean) event.getValue();
+ if (invoke) {
+ schedule(Duration.ZERO);
+ }
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/main/resources/catalog.bom
----------------------------------------------------------------------
diff --git a/policy/src/main/resources/catalog.bom b/policy/src/main/resources/catalog.bom
index d92b836..9c9e323 100644
--- a/policy/src/main/resources/catalog.bom
+++ b/policy/src/main/resources/catalog.bom
@@ -49,6 +49,21 @@ brooklyn.catalog:
Policy that is attached to a Resizable entity and dynamically adjusts its size in
response to either keep a metric within a given range, or in response to
POOL_COLD and POOL_HOT events
+ - id: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ itemType: policy
+ item:
+ type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ name: Periodic Effector Execution
+ description: |
+ Policy that executes an effector repeatedly at configurable intervals.
+ - id: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy
+ itemType: policy
+ item:
+ type: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy
+ name: Scheduled Effector Execution
+ description: |
+ Policy that executes an effector at a configurable time or after
+ a configurable delay.
# Removed from catalog because 'FollowTheSunPool' cannot currently be configured via catalog mechanisms.
# Also removing associated 'BalanceableWorkerPool' etc as they are only useful with 'FollowTheSunPool'
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
new file mode 100644
index 0000000..e6ae74d
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy.action;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+
+ @Test
+ public void testPeriodicEffectorFires() {
+ final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+ final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
+ .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
+ .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+ .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
+ .configure(PeriodicEffectorPolicy.TIME, "immediately")
+ .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+ Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
+ Asserts.assertNotNull(policy);
+
+ Asserts.assertTrue(entity.getCallHistory().isEmpty());
+ Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
+
+ entity.sensors().set(start, Boolean.TRUE);
+ Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
+ Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+ int calls = entity.getCallHistory().size();
+ Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500));
+ }
+
+ @Test
+ public void testPeriodicEffectorFiresAfterDelay() {
+ final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+ final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(PeriodicEffectorPolicy.class)
+ .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
+ .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+ .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
+ .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS)
+ .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+ Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
+ Asserts.assertNotNull(policy);
+
+ Asserts.assertTrue(entity.getCallHistory().isEmpty());
+ Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
+
+ entity.sensors().set(start, Boolean.TRUE);
+ Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
+ sleep(Duration.seconds(5));
+ Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
+ sleep(Duration.seconds(5));
+ Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+ int calls = entity.getCallHistory().size();
+ Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500));
+ }
+
+ private void sleep(Duration duration) {
+ try {
+ Thread.sleep(duration.toMilliseconds());
+ } catch (InterruptedException ie) {
+ Exceptions.propagate(ie);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00deb7df/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
new file mode 100644
index 0000000..daa186e
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.policy.action;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+
+ @Test
+ public void testScheduledEffectorFiresImmediately() {
+ final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+ final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
+ .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
+ .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+ .configure(ScheduledEffectorPolicy.TIME, "immediately")
+ .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+ Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
+ Asserts.assertNotNull(policy);
+
+ Asserts.assertTrue(entity.getCallHistory().isEmpty());
+ Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
+
+ entity.sensors().set(start, Boolean.TRUE);
+ Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
+ Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+ }
+
+ @Test
+ public void testScheduledEffectorFiresAfterDelay() {
+ final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+ final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
+ .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
+ .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+ .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS)
+ .configure(ScheduledEffectorPolicy.START_SENSOR, start)));
+ Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
+ Asserts.assertNotNull(policy);
+
+ Asserts.assertTrue(entity.getCallHistory().isEmpty());
+ Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
+
+ entity.sensors().set(start, Boolean.TRUE);
+ Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
+ sleep(Duration.seconds(5));
+ Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
+ sleep(Duration.seconds(5));
+ Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+ }
+
+ @Test
+ public void testScheduledEffectorFiresOnSensor() {
+ final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
+
+ final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .policy(PolicySpec.create(ScheduledEffectorPolicy.class)
+ .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
+ .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
+ .configure(ScheduledEffectorPolicy.START_SENSOR, start)));
+ Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
+ Asserts.assertNotNull(policy);
+
+ Asserts.assertTrue(entity.getCallHistory().isEmpty());
+ Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
+
+ entity.sensors().set(start, Boolean.TRUE);
+ Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
+ sleep(Duration.seconds(5));
+ Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
+
+ entity.sensors().set(ScheduledEffectorPolicy.INVOKE_IMMEDIATELY, Boolean.TRUE);
+ Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
+ }
+
+ private void sleep(Duration duration) {
+ try {
+ Thread.sleep(duration.toMilliseconds());
+ } catch (InterruptedException ie) {
+ Exceptions.propagate(ie);
+ }
+ }
+}
[2/3] brooklyn-server git commit: Fix issues with rebind
Posted by dr...@apache.org.
Fix issues with rebind
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/0a71cf58
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/0a71cf58
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/0a71cf58
Branch: refs/heads/master
Commit: 0a71cf58b018cde15b2f71f8b97dc063b871ebd6
Parents: 00deb7d
Author: Andrew Donald Kennedy <an...@cloudsoftcorp.com>
Authored: Fri Sep 15 17:14:11 2017 +0100
Committer: Andrew Donald Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Sep 15 17:21:19 2017 +0100
----------------------------------------------------------------------
.../action/AbstractScheduledEffectorPolicy.java | 114 ++++++++++++-------
.../policy/action/PeriodicEffectorPolicy.java | 49 +++++---
.../policy/action/ScheduledEffectorPolicy.java | 72 ++----------
.../action/PeriodicEffectorPolicyTest.java | 18 ++-
.../action/ScheduledEffectorPolicyTest.java | 26 ++---
5 files changed, 139 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
index 73438c6..5978ec5 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java
@@ -23,9 +23,11 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.brooklyn.api.effector.Effector;
@@ -38,11 +40,11 @@ import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.EntityInitializers;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.policy.AbstractPolicy;
-import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.config.ResolvingConfigBag;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.DurationPredicates;
@@ -53,6 +55,7 @@ import com.google.common.annotations.Beta;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
@Beta
@@ -103,20 +106,35 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
.reconfigurable(true)
.build();
- protected final AtomicBoolean running = new AtomicBoolean(false);
- protected final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- protected final Object mutex = new Object[0];
+ public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { })
+ .name("scheduled")
+ .description("List of all scheduled execution start times")
+ .defaultValue(Lists.newCopyOnWriteArrayList())
+ .reconfigurable(true)
+ .build();
- protected Effector<?> effector;
+ protected AtomicBoolean running;
+ protected ScheduledExecutorService executor;
+ protected Effector effector;
public AbstractScheduledEffectorPolicy() {
- this(MutableMap.<String,Object>of());
+ LOG.debug("Created new scheduled effector policy");
+ }
+
+ @Override
+ public void init() {
+ setup();
}
- public AbstractScheduledEffectorPolicy(Map<String,?> props) {
- super(props);
+ public void setup() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ executor = Executors.newSingleThreadScheduledExecutor();
+ running = new AtomicBoolean(false);
}
+ @Override
public void setEntity(EntityLocal entity) {
super.setEntity(entity);
@@ -128,9 +146,22 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
@Override
public void rebind() {
+ setup();
+
if (config().get(RUNNING)) {
running.set(true);
- start();
+ }
+
+ if (running.get()) {
+ List<Long> scheduled = config().get(SCHEDULED);
+ for (Long when : scheduled) {
+ Duration wait = Duration.millis(when - System.currentTimeMillis());
+ if (wait.isPositive()) {
+ schedule(wait);
+ } else {
+ scheduled.remove(when);
+ }
+ }
}
}
@@ -145,15 +176,15 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
@Override
public void destroy(){
- super.destroy();
executor.shutdownNow();
+ super.destroy();
}
public abstract void start();
protected Effector<?> getEffector() {
String effectorName = config().get(EFFECTOR);
- Maybe<Effector<?>> effector = entity.getEntityType().getEffectorByName(effectorName);
+ Maybe<Effector<?>> effector = getEntity().getEntityType().getEffectorByName(effectorName);
if (effector.isAbsentOrNull()) {
throw new IllegalStateException("Cannot find effector " + effectorName);
}
@@ -181,39 +212,46 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
}
}
+ protected void schedule(Duration wait) {
+ List<Long> scheduled = config().get(SCHEDULED);
+ scheduled.add(System.currentTimeMillis() + wait.toMilliseconds());
+
+ executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
@Override
- public void run() {
- synchronized (mutex) {
- try {
- ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag());
- Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS);
- LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) });
- Map<String, Object> resolved = (Map) Tasks.resolving(args, Object.class)
- .deep(true)
- .context(entity)
- .get();
-
- LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved });
- Object result = entity.invoke(effector, resolved).getUnchecked();
- LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result });
- } catch (Throwable t) {
- LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() });
- Exceptions.propagate(t);
- }
+ public synchronized void run() {
+ if (effector == null) return;
+ try {
+ ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag());
+ Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS);
+ LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) });
+ Map<String, Object> resolved = (Map) Tasks.resolving(args, Object.class)
+ .deep(true)
+ .context(entity)
+ .get();
+
+ LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved });
+ Object result = entity.invoke(effector, resolved).getUnchecked();
+ LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result });
+ } catch (RuntimeInterruptedException rie) {
+ Thread.interrupted();
+ // TODO sometimes this seems to hang the executor?
+ } catch (Throwable t) {
+ LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() });
+ Exceptions.propagate(t);
}
}
@Override
public void onEvent(SensorEvent<Object> event) {
- synchronized (mutex) {
- LOG.debug("{}: Got event {}", this, event);
- AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
- if (event.getSensor().getName().equals(sensor.getName())) {
- Boolean start = (Boolean) event.getValue();
- if (start && running.compareAndSet(false, true)) {
- config().set(RUNNING, true);
- start();
- }
+ LOG.debug("{}: Got event {}", this, event);
+ AttributeSensor<Boolean> sensor = config().get(START_SENSOR);
+ if (event.getSensor().getName().equals(sensor.getName())) {
+ Boolean start = (Boolean) event.getValue();
+ if (start && running.compareAndSet(false, true)) {
+ config().set(RUNNING, true);
+ start();
}
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
index 58c10f2..f2c0936 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java
@@ -18,18 +18,17 @@
*/
package org.apache.brooklyn.policy.action;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
import org.apache.brooklyn.api.effector.Effector;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.DurationPredicates;
-import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,16 +71,7 @@ public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy {
.build();
public PeriodicEffectorPolicy() {
- this(MutableMap.<String,Object>of());
- }
-
- public PeriodicEffectorPolicy(Map<String,?> props) {
- super(props);
- }
-
- @Override
- public void setEntity(final EntityLocal entity) {
- super.setEntity(entity);
+ super();
}
@Override
@@ -95,8 +85,33 @@ public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy {
wait = period;
}
- LOG.debug("{}: Scheduling {} every {} in {}", new Object[] { PeriodicEffectorPolicy.this, effector.getName(),
- Time.fromDurationToTimeStringRounded().apply(period), Time.fromDurationToTimeStringRounded().apply(wait) });
- executor.scheduleAtFixedRate(PeriodicEffectorPolicy.this, wait.toMilliseconds(), period.toMilliseconds(), TimeUnit.MILLISECONDS);
+ schedule(wait);
+ }
+
+ @Override
+ public void rebind() {
+ super.rebind();
+
+ // Check if we missed an entire period
+ List<Long> scheduled = config().get(SCHEDULED);
+ if (running.get() && scheduled.isEmpty()) {
+ start();
+ }
+ }
+
+ @Override
+ public synchronized void run() {
+ try {
+ super.run();
+ } finally {
+ Duration period = config().get(PERIOD);
+ String time = config().get(TIME);
+ if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) {
+ schedule(period);
+ } else {
+ Duration wait = getWaitUntil(time);
+ schedule(wait.upperBound(period));
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
index 44e6aac..5a3bed7 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java
@@ -19,27 +19,18 @@
package org.apache.brooklyn.policy.action;
import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.brooklyn.api.effector.Effector;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
-import com.google.common.collect.Lists;
-import com.google.common.reflect.TypeToken;
/**
* A {@link Policy} the executes an {@link Effector} at a specific time in the future.
@@ -57,22 +48,11 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
private static final Logger LOG = LoggerFactory.getLogger(ScheduledEffectorPolicy.class);
- public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { })
- .name("scheduled")
- .description("List of all scheduled execution start times")
- .defaultValue(Lists.newCopyOnWriteArrayList())
- .reconfigurable(true)
- .build();
-
public static final AttributeSensor<Boolean> INVOKE_IMMEDIATELY = Sensors.newBooleanSensor("scheduler.invoke.now", "Invoke the configured effector immediately when this becomes true");
public static final AttributeSensor<Date> INVOKE_AT = Sensors.newSensor(Date.class, "scheduler.invoke.at", "Invoke the configured effector at this time");
public ScheduledEffectorPolicy() {
- this(MutableMap.<String,Object>of());
- }
-
- public ScheduledEffectorPolicy(Map<String,?> props) {
- super(props);
+ super();
}
@Override
@@ -84,27 +64,11 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
}
@Override
- public void rebind() {
- super.rebind();
- List<Long> scheduled = config().get(SCHEDULED);
- for (Long when : scheduled) {
- Duration wait = Duration.millis(when - System.currentTimeMillis());
- if (wait.isPositive()) {
- schedule(wait);
- } else {
- scheduled.remove(when);
- }
- }
- }
-
- @Override
public void start() {
String time = config().get(TIME);
Duration wait = config().get(WAIT);
if (time != null) {
- LOG.debug("{}: Scheduling {} at {} (in {})",
- new Object[] { this, effector.getName(), time, Time.fromDurationToTimeStringRounded().apply(wait) });
wait = getWaitUntil(time);
}
@@ -113,35 +77,23 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
}
}
- protected void schedule(Duration wait) {
- List<Long> scheduled = config().get(SCHEDULED);
- scheduled.add(System.currentTimeMillis() + wait.toMilliseconds());
-
- LOG.debug("{}: Scheduling {} in {} ({} ms)",
- new Object[] { this, effector.getName(), Time.fromDurationToTimeStringRounded().apply(wait), wait.toMilliseconds() });
- executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
-
@Override
public void onEvent(SensorEvent<Object> event) {
- synchronized (mutex) {
- super.onEvent(event);
+ super.onEvent(event);
- if (running.get()) {
- if (event.getSensor().getName().equals(INVOKE_AT.getName())) {
- String time = (String) event.getValue();
- if (time != null) {
- schedule(getWaitUntil(time));
- }
+ if (running.get()) {
+ if (event.getSensor().getName().equals(INVOKE_AT.getName())) {
+ String time = (String) event.getValue();
+ if (time != null) {
+ schedule(getWaitUntil(time));
}
- if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
- Boolean invoke = (Boolean) event.getValue();
- if (invoke) {
- schedule(Duration.ZERO);
- }
+ }
+ if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
+ Boolean invoke = (Boolean) event.getValue();
+ if (invoke) {
+ schedule(Duration.ZERO);
}
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
index e6ae74d..0268d60 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java
@@ -37,24 +37,24 @@ import com.google.common.collect.Iterables;
public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+ private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+
@Test
public void testPeriodicEffectorFires() {
- final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
- final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
.policy(PolicySpec.create(PeriodicEffectorPolicy.class)
.configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
.configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
.configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
.configure(PeriodicEffectorPolicy.TIME, "immediately")
- .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+ .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
Asserts.assertNotNull(policy);
Asserts.assertTrue(entity.getCallHistory().isEmpty());
Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
- entity.sensors().set(start, Boolean.TRUE);
+ entity.sensors().set(START, Boolean.TRUE);
Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
int calls = entity.getCallHistory().size();
@@ -63,22 +63,20 @@ public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport {
@Test
public void testPeriodicEffectorFiresAfterDelay() {
- final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
- final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
.policy(PolicySpec.create(PeriodicEffectorPolicy.class)
.configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector")
.configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
.configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND)
.configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS)
- .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+ .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull();
Asserts.assertNotNull(policy);
Asserts.assertTrue(entity.getCallHistory().isEmpty());
Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING));
- entity.sensors().set(start, Boolean.TRUE);
+ entity.sensors().set(START, Boolean.TRUE);
Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b);
sleep(Duration.seconds(5));
Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a71cf58/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
index daa186e..5271de3 100644
--- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
+++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java
@@ -37,44 +37,42 @@ import com.google.common.collect.Iterables;
public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport {
+ private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start");
+
@Test
public void testScheduledEffectorFiresImmediately() {
- final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
- final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
.policy(PolicySpec.create(ScheduledEffectorPolicy.class)
.configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
.configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
.configure(ScheduledEffectorPolicy.TIME, "immediately")
- .configure(PeriodicEffectorPolicy.START_SENSOR, start)));
+ .configure(PeriodicEffectorPolicy.START_SENSOR, START)));
Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
Asserts.assertNotNull(policy);
Asserts.assertTrue(entity.getCallHistory().isEmpty());
Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
- entity.sensors().set(start, Boolean.TRUE);
+ entity.sensors().set(START, Boolean.TRUE);
Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector"));
}
@Test
public void testScheduledEffectorFiresAfterDelay() {
- final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
- final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
.policy(PolicySpec.create(ScheduledEffectorPolicy.class)
.configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
.configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
.configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS)
- .configure(ScheduledEffectorPolicy.START_SENSOR, start)));
+ .configure(ScheduledEffectorPolicy.START_SENSOR, START)));
Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
Asserts.assertNotNull(policy);
Asserts.assertTrue(entity.getCallHistory().isEmpty());
Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
- entity.sensors().set(start, Boolean.TRUE);
+ entity.sensors().set(START, Boolean.TRUE);
Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
sleep(Duration.seconds(5));
Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
@@ -84,20 +82,18 @@ public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport {
@Test
public void testScheduledEffectorFiresOnSensor() {
- final AttributeSensor<Boolean> start = Sensors.newBooleanSensor("start");
-
- final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
.policy(PolicySpec.create(ScheduledEffectorPolicy.class)
.configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector")
.configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of())
- .configure(ScheduledEffectorPolicy.START_SENSOR, start)));
+ .configure(ScheduledEffectorPolicy.START_SENSOR, START)));
Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull();
Asserts.assertNotNull(policy);
Asserts.assertTrue(entity.getCallHistory().isEmpty());
Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING));
- entity.sensors().set(start, Boolean.TRUE);
+ entity.sensors().set(START, Boolean.TRUE);
Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b);
sleep(Duration.seconds(5));
Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));
[3/3] brooklyn-server git commit: This closes #822
Posted by dr...@apache.org.
This closes #822
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/e85d2648
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/e85d2648
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/e85d2648
Branch: refs/heads/master
Commit: e85d26485f8e62d3ba97d57bf16b6f3434b616cf
Parents: c9f7b0d 0a71cf5
Author: Duncan Godwin <dr...@googlemail.com>
Authored: Sat Sep 16 09:14:04 2017 +0100
Committer: Duncan Godwin <dr...@googlemail.com>
Committed: Sat Sep 16 09:14:04 2017 +0100
----------------------------------------------------------------------
.../action/AbstractScheduledEffectorPolicy.java | 258 +++++++++++++++++++
.../policy/action/PeriodicEffectorPolicy.java | 117 +++++++++
.../policy/action/ScheduledEffectorPolicy.java | 99 +++++++
policy/src/main/resources/catalog.bom | 15 ++
.../action/PeriodicEffectorPolicyTest.java | 96 +++++++
.../action/ScheduledEffectorPolicyTest.java | 112 ++++++++
6 files changed, 697 insertions(+)
----------------------------------------------------------------------