You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/03 16:52:04 UTC

[20/29] git commit: refactor recent changes so that Repeater tasks are in Tasks and EntityTasks reuse cleaned up DependentCongifuration, with DependentConfiguration now relying on an internal class and: * checking for unmanaged (and now this is the defau

refactor recent changes so that Repeater tasks are in Tasks and EntityTasks reuse cleaned up DependentCongifuration,
with DependentConfiguration now relying on an internal class and:
* checking for unmanaged (and now this is the default)
* supporting a timeout
* ensuring that all values put into the subscription queue are read (previously it could miss a value if subscriptions were updated twice in succession)


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/14f17bc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/14f17bc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/14f17bc9

Branch: refs/heads/master
Commit: 14f17bc94d76693eadafe7d33a34e4c5fad06ff9
Parents: 7f09a80
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Oct 28 00:42:36 2014 -0700
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 31 09:39:50 2014 -0500

----------------------------------------------------------------------
 .../java/brooklyn/entity/basic/EntityTasks.java |  72 +--
 .../event/basic/DependentConfiguration.java     | 468 +++++++++++++++----
 .../src/main/java/brooklyn/util/task/Tasks.java |  28 +-
 .../entity/basic/EntityPredicatesTest.java      |   4 +-
 .../test/java/brooklyn/util/task/TasksTest.java |  29 ++
 .../BrooklynClusterUpgradeEffectorBody.java     |  16 +-
 .../BrooklynNodeUpgradeEffectorBody.java        |   2 +-
 .../util/collections/CollectionFunctionals.java |   2 +-
 .../util/exceptions/NotManagedException.java    |  36 ++
 .../util/exceptions/TimeoutException.java       |  36 ++
 .../java/brooklyn/util/repeat/Repeater.java     |   4 +
 11 files changed, 547 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/entity/basic/EntityTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/EntityTasks.java b/core/src/main/java/brooklyn/entity/basic/EntityTasks.java
index 37114c5..99c5ca3 100644
--- a/core/src/main/java/brooklyn/entity/basic/EntityTasks.java
+++ b/core/src/main/java/brooklyn/entity/basic/EntityTasks.java
@@ -20,44 +20,62 @@ package brooklyn.entity.basic;
 
 import brooklyn.entity.Entity;
 import brooklyn.event.AttributeSensor;
-import brooklyn.management.TaskAdaptable;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.management.Task;
 import brooklyn.util.collections.CollectionFunctionals;
-import brooklyn.util.guava.Functionals;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.task.Tasks;
 import brooklyn.util.time.Duration;
 
 import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Predicates;
 
 /** Generally useful tasks related to entities */
 public class EntityTasks {
 
     /** creates an (unsubmitted) task which waits for the attribute to satisfy the given predicate,
-     * with an optional timeout */
-    public static <T> TaskAdaptable<Boolean> awaitingAttribute(Entity entity, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
-        return Tasks.awaitingBuilder(Repeater.create("waiting on "+sensor.getName())
-                .backoff(Duration.millis(10), 1.5, Duration.millis(200))
-                .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
-//                TODO abort if entity is unmanaged
-                .until(Functionals.callable(Functions.forPredicate(EntityPredicates.attributeSatisfies(sensor, condition)), entity)),
-                true)
-            .description("waiting on "+entity+" "+sensor.getName()+" "+condition+
-                (timeout!=null ? ", timeout "+timeout : "")).build();
+     * returning false if it times out or becomes unmanaged */
+    public static <T> Task<Boolean> testingAttributeEventually(Entity entity, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return DependentConfiguration.builder().attributeWhenReady(entity, sensor)
+            .readiness(condition)
+            .postProcess(Functions.constant(true))
+            .timeout(timeout)
+            .onTimeoutReturn(false)
+            .onUnmanagedReturn(false)
+            .build();
     }
 
-    /** as {@link #awaitingAttribute(Entity, AttributeSensor, Predicate, Duration)} for multiple entities */
-    public static <T> TaskAdaptable<Boolean> awaitingAttribute(Iterable<Entity> entities, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
-        return Tasks.awaitingBuilder(Repeater.create("waiting on "+sensor.getName())
-                .backoff(Duration.millis(10), 1.5, Duration.millis(200))
-                .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
-//                TODO abort if entity is unmanaged
-                .until(Functionals.callable(Functions.forPredicate(
-                    CollectionFunctionals.all(EntityPredicates.attributeSatisfies(sensor, condition))), entities)),
-                true)
-            .description("waiting on "+Iterables.size(entities)+", "+sensor.getName()+" "+condition+
-                (timeout!=null ? ", timeout "+timeout : "")+
-                ": "+entities).build();
+    /** creates an (unsubmitted) task which waits for the attribute to satisfy the given predicate,
+     * throwing if it times out or becomes unmanaged */
+    public static <T> Task<Boolean> requiringAttributeEventually(Entity entity, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return DependentConfiguration.builder().attributeWhenReady(entity, sensor)
+            .readiness(condition)
+            .postProcess(Functions.constant(true))
+            .timeout(timeout)
+            .onTimeoutThrow()
+            .onUnmanagedThrow()
+            .build();
+    }
+
+    /** as {@link #testingAttributeEventually(Entity, AttributeSensor, Predicate, Duration) for multiple entities */
+    public static <T> Task<Boolean> testingAttributeEventually(Iterable<Entity> entities, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return DependentConfiguration.builder().attributeWhenReadyFromMultiple(entities, sensor, condition)
+            .postProcess(Functions.constant(true))
+            .timeout(timeout)
+            .onTimeoutReturn(false)
+            .onUnmanagedReturn(false)
+            .postProcessFromMultiple(CollectionFunctionals.all(Predicates.equalTo(true)))
+            .build();
     }
+    
+    /** as {@link #requiringAttributeEventually(Entity, AttributeSensor, Predicate, Duration) for multiple entities */
+    public static <T> Task<Boolean> requiringAttributeEventually(Iterable<Entity> entities, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return DependentConfiguration.builder().attributeWhenReadyFromMultiple(entities, sensor, condition)
+            .postProcess(Functions.constant(true))
+            .timeout(timeout)
+            .onTimeoutThrow()
+            .onUnmanagedThrow()
+            .postProcessFromMultiple(CollectionFunctionals.all(Predicates.equalTo(true)))
+            .build();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java b/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java
index de45514..51af110 100644
--- a/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java
+++ b/core/src/main/java/brooklyn/event/basic/DependentConfiguration.java
@@ -19,18 +19,18 @@
 package brooklyn.event.basic;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 import groovy.lang.Closure;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
@@ -41,6 +41,7 @@ import brooklyn.config.ConfigKey;
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
 import brooklyn.entity.basic.EntityLocal;
 import brooklyn.entity.basic.Lifecycle;
@@ -53,9 +54,15 @@ import brooklyn.management.Task;
 import brooklyn.management.TaskAdaptable;
 import brooklyn.management.TaskFactory;
 import brooklyn.util.GroovyJavaMethods;
+import brooklyn.util.collections.CollectionFunctionals;
+import brooklyn.util.collections.MutableList;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.exceptions.CompoundRuntimeException;
 import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.NotManagedException;
+import brooklyn.util.exceptions.TimeoutException;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.guava.Maybe;
 import brooklyn.util.task.BasicExecutionContext;
 import brooklyn.util.task.BasicTask;
 import brooklyn.util.task.DeferredSupplier;
@@ -63,6 +70,9 @@ import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.ParallelTask;
 import brooklyn.util.task.TaskInternal;
 import brooklyn.util.task.Tasks;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Function;
@@ -105,7 +115,7 @@ public class DependentConfiguration {
         return attributeWhenReady(source, sensor, readyPredicate);
     }
     
-    /** returns a {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value;
+    /** returns an unsubmitted {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value;
      * particular useful in Entity configuration where config will block until Tasks have a value
      */
     public static <T> Task<T> attributeWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready) {
@@ -141,11 +151,19 @@ public class DependentConfiguration {
         return attributePostProcessedWhenReady(source, sensor, ready, GroovyJavaMethods.<T,V>functionFromClosure(postProcess));
     }
     
+    @SuppressWarnings("unchecked")
     public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Function<? super T,V> postProcess) {
-        Builder<T, T> builder = builder().attributeWhenReady(source, sensor);
+        Builder<T,T> builder1 = DependentConfiguration.builder().attributeWhenReady(source, sensor);
+        // messy generics here to support null postProcess; would be nice to disallow that here
+        Builder<T,V> builder;
+        if (postProcess != null) {
+            builder = builder1.postProcess(postProcess);
+        } else {
+            builder = (Builder<T,V>)builder1;
+        }
         if (ready != null) builder.readiness(ready);
-        if (postProcess != null) builder.postProcess(postProcess);
-        return ((Builder)builder).build();
+        
+        return builder.build();
     }
 
     public static <T> T waitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready) {
@@ -158,49 +176,83 @@ public class DependentConfiguration {
     }
     
     // TODO would be nice to have an easy semantics for whenServiceUp (cf DynamicWebAppClusterImpl.whenServiceUp)
-    // and TODO would be nice to have it stop when source is unmanaged (with ability to define post-processing)
-    // probably using the builder for both of these...
+    
     public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
+        return new WaitInTaskForAttributeReady<T,T>(source, sensor, ready, abortConditions, blockingDetails).call();
+    }
+    
+    protected static class WaitInTaskForAttributeReady<T,V> implements Callable<V> {
+
+        /* This is a change since before Oct 2014. Previously it would continue to poll,
+         * (maybe finding a different error) if the target entity becomes unmanaged. 
+         * Now it actively checks unmanaged by default, and still throws although it might 
+         * now find a different problem. */
+        private final static boolean DEFAULT_IGNORE_UNMANAGED = false;
         
-        T value = source.getAttribute(sensor);
-        final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
-
-        // return immediately if either the ready predicate or the abort conditions hold
-        if (ready==null) ready = GroovyJavaMethods.truthPredicate();
-        if (ready.apply(value)) return value;
-        for (AttributeAndSensorCondition abortCondition : abortConditions) {
-            Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor);
-            if (abortCondition.predicate.apply(abortValue)) {
-                abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
-            }
+        final Entity source;
+        final AttributeSensor<T> sensor;
+        final Predicate<? super T> ready;
+        final List<AttributeAndSensorCondition<?>> abortSensorConditions;
+        final String blockingDetails;
+        final Function<? super T,? extends V> postProcess;
+        final Duration timeout;
+        final Maybe<V> onTimeout;
+        final boolean ignoreUnmanaged;
+        final Maybe<V> onUnmanaged;
+        // TODO onError Continue / Throw / Return(V)
+        
+        protected WaitInTaskForAttributeReady(Builder<T, V> builder) {
+            this.source = builder.source;
+            this.sensor = builder.sensor;
+            this.ready = builder.readiness;
+            this.abortSensorConditions = builder.abortSensorConditions;
+            this.blockingDetails = builder.blockingDetails;
+            this.postProcess = builder.postProcess;
+            this.timeout = builder.timeout;
+            this.onTimeout = builder.onTimeout;
+            this.ignoreUnmanaged = builder.ignoreUnmanaged;
+            this.onUnmanaged = builder.onUnmanaged;
         }
-        if (abortionExceptions.size() > 0) {
-            throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions);
+        
+        private WaitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready,
+                List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
+            this.source = source;
+            this.sensor = sensor;
+            this.ready = ready;
+            this.abortSensorConditions = abortConditions;
+            this.blockingDetails = blockingDetails;
+            
+            this.timeout = Duration.PRACTICALLY_FOREVER;
+            this.onTimeout = Maybe.absent();
+            this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED;
+            this.onUnmanaged = Maybe.absent();
+            this.postProcess = null;
         }
 
-        TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
-        if (current == null) throw new IllegalStateException("Should only be invoked in a running task");
-        Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
-        if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
-                current+" has no entity tag ("+current.getStatusDetail(false)+")");
-        final AtomicReference<T> data = new AtomicReference<T>();
-        final Semaphore semaphore = new Semaphore(0); // could use Exchanger
-        SubscriptionHandle subscription = null;
-        List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
-        try {
-            subscription = ((EntityInternal)entity).getSubscriptionContext().subscribe(source, sensor, new SensorEventListener<T>() {
-                @Override public void onEvent(SensorEvent<T> event) {
-                    data.set(event.getValue());
-                    semaphore.release();
-                }});
-            for (final AttributeAndSensorCondition abortCondition : abortConditions) {
-                abortSubscriptions.add(((EntityInternal)entity).getSubscriptionContext().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
-                    @Override public void onEvent(SensorEvent<Object> event) {
-                        if (abortCondition.predicate.apply(event.getValue())) {
-                            abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
-                            semaphore.release();
-                        }
-                    }}));
+        @SuppressWarnings("unchecked")
+        protected V postProcess(T value) {
+            if (this.postProcess!=null) return postProcess.apply(value);
+            // if no post-processing assume the types are correct
+            return (V) value;
+        }
+        
+        protected boolean ready(T value) {
+            if (ready!=null) return ready.apply(value);
+            return GroovyJavaMethods.truth(value);
+        }
+        
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public V call() {
+            T value = source.getAttribute(sensor);
+
+            // return immediately if either the ready predicate or the abort conditions hold
+            if (ready(value)) return postProcess(value);
+
+            final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
+            long start = System.currentTimeMillis();
+            
+            for (AttributeAndSensorCondition abortCondition : abortSensorConditions) {
                 Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor);
                 if (abortCondition.predicate.apply(abortValue)) {
                     abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
@@ -209,31 +261,101 @@ public class DependentConfiguration {
             if (abortionExceptions.size() > 0) {
                 throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions);
             }
+
+            TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
+            if (current == null) throw new IllegalStateException("Should only be invoked in a running task");
+            Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
+            if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+
+                current+" has no entity tag ("+current.getStatusDetail(false)+")");
+            
+            final LinkedList<T> publishedValues = new LinkedList<T>();
+            final Semaphore semaphore = new Semaphore(0); // could use Exchanger
+            SubscriptionHandle subscription = null;
+            List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
             
-            value = source.getAttribute(sensor);
-            while (!ready.apply(value)) {
-                String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
-                try {
-                    semaphore.acquire();
-                } finally {
-                    current.setBlockingDetails(prevBlockingDetails);
+            try {
+                subscription = ((EntityInternal)entity).getSubscriptionContext().subscribe(source, sensor, new SensorEventListener<T>() {
+                    @Override public void onEvent(SensorEvent<T> event) {
+                        synchronized (publishedValues) { publishedValues.add(event.getValue()); }
+                        semaphore.release();
+                    }});
+                for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) {
+                    abortSubscriptions.add(((EntityInternal)entity).getSubscriptionContext().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() {
+                        @Override public void onEvent(SensorEvent<Object> event) {
+                            if (abortCondition.predicate.apply(event.getValue())) {
+                                abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
+                                semaphore.release();
+                            }
+                        }}));
+                    Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor);
+                    if (abortCondition.predicate.apply(abortValue)) {
+                        abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor));
+                    }
                 }
-                
                 if (abortionExceptions.size() > 0) {
                     throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions);
                 }
-                value = data.get();
-            }
-            if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source);
-            return value;
-        } catch (InterruptedException e) {
-            throw Exceptions.propagate(e);
-        } finally {
-            if (subscription != null) {
-                ((EntityInternal)entity).getSubscriptionContext().unsubscribe(subscription);
-            }
-            for (SubscriptionHandle handle : abortSubscriptions) {
-                ((EntityInternal)entity).getSubscriptionContext().unsubscribe(handle);
+
+                CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null;
+                Duration maxPeriod = Duration.millis(200);
+                Duration nextPeriod = Duration.millis(10);
+                while (true) {
+                    // check the source on initial run (could be done outside the loop) 
+                    // and also (optionally) on each iteration in case it is more recent 
+                    value = source.getAttribute(sensor);
+                    if (ready(value)) break;
+
+                    if (timer!=null) {
+                        if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
+                            nextPeriod = timer.getDurationRemaining();
+                        }
+                        if (timer.isExpired()) {
+                            if (onTimeout.isPresent()) return onTimeout.get();
+                            throw new TimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
+                        }
+                    }
+
+                    String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
+                    try {
+                        if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) {
+                            // immediately release so we are available for the next check
+                            semaphore.release();
+                            // if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times
+                            semaphore.drainPermits();
+                        }
+                    } finally {
+                        current.setBlockingDetails(prevBlockingDetails);
+                    }
+
+                    // check any subscribed values which have come in first
+                    while (!publishedValues.isEmpty()) {
+                        synchronized (publishedValues) { value = publishedValues.pop(); }
+                        if (ready(value)) break;
+                    }
+
+                    // if unmanaged then ignore the other abort conditions
+                    if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) {
+                        if (onTimeout.isPresent()) return onTimeout.get();
+                        throw new NotManagedException(entity);                        
+                    }
+                    
+                    if (abortionExceptions.size() > 0) {
+                        throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions);
+                    }
+
+                    nextPeriod = nextPeriod.times(2).maximum(maxPeriod);
+                }
+                if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source);
+                return postProcess(value);
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            } finally {
+                if (subscription != null) {
+                    ((EntityInternal)entity).getSubscriptionContext().unsubscribe(subscription);
+                }
+                for (SubscriptionHandle handle : abortSubscriptions) {
+                    ((EntityInternal)entity).getSubscriptionContext().unsubscribe(handle);
+                }
             }
         }
     }
@@ -268,6 +390,7 @@ public class DependentConfiguration {
     }
     
     /** @see #transform(Task, Function) */
+    @SuppressWarnings({ "rawtypes" })
     public static <U,T> Task<T> transform(final Map flags, final TaskAdaptable<U> task, final Function<U,T> transformer) {
         return new BasicTask<T>(flags, new Callable<T>() {
             public T call() throws Exception {
@@ -286,19 +409,23 @@ public class DependentConfiguration {
     }
 
     /** @see #transformMultiple(Function, TaskAdaptable...) */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public static <U,T> Task<T> transformMultiple(Closure transformer, TaskAdaptable<U> ...tasks) {
         return transformMultiple(GroovyJavaMethods.functionFromClosure(transformer), tasks);
     }
 
     /** @see #transformMultiple(Function, TaskAdaptable...) */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public static <U,T> Task<T> transformMultiple(Map flags, Closure transformer, TaskAdaptable<U> ...tasks) {
         return transformMultiple(flags, GroovyJavaMethods.functionFromClosure(transformer), tasks);
     }
     
     /** @see #transformMultiple(Function, TaskAdaptable...) */
+    @SuppressWarnings({ "rawtypes" })
     public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, TaskAdaptable<U> ...tasks) {
         return transformMultiple(flags, transformer, Arrays.asList(tasks));
     }
+    @SuppressWarnings({ "rawtypes" })
     public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, Collection<? extends TaskAdaptable<U>> tasks) {
         if (tasks.size()==1) {
             return transform(flags, Iterables.getOnlyElement(tasks), new Function<U,T>() {
@@ -411,18 +538,26 @@ public class DependentConfiguration {
     public static class ProtoBuilder {
         /**
          * Will wait for the attribute on the given entity.
-         * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort. 
+         * If that entity reports {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort. 
          */
         public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) {
-            return new Builder<T2,T2>().attributeWhenReady(source, sensor);
+            return new Builder<T2,T2>(source, sensor).abortIfOnFire();
         }
 
-        /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, 
+        /**
+         * Will wait for the attribute on the given entity, not aborting when it goes {@link Lifecycle#ON_FIRE}.
+         */
+        public <T2> Builder<T2,T2> attributeWhenReadyAllowingOnFire(Entity source, AttributeSensor<T2> sensor) {
+            return new Builder<T2,T2>(source, sensor);
+        }
+
+        /** Constructs a builder for task for parallel execution returning a list of values of the given sensor list on the given entity, 
          * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ 
         @Beta
         public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor) {
             return attributeWhenReadyFromMultiple(sources, sensor, GroovyJavaMethods.truthPredicate());
         }
+        /** As {@link #attributeWhenReadyFromMultiple(Iterable, AttributeSensor)} with an explicit readiness test. */
         @Beta
         public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) {
             return new MultiBuilder<T, T, List<T>>(sources, sensor, readiness);
@@ -432,24 +567,33 @@ public class DependentConfiguration {
     /**
      * Builder for producing variants of attributeWhenReady.
      */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Beta
     public static class Builder<T,V> {
         protected Entity source;
         protected AttributeSensor<T> sensor;
         protected Predicate<? super T> readiness;
         protected Function<? super T, ? extends V> postProcess;
-        protected List<AttributeAndSensorCondition<?>> abortConditions = Lists.newArrayList();
+        protected List<AttributeAndSensorCondition<?>> abortSensorConditions = Lists.newArrayList();
         protected String blockingDetails;
+        protected Duration timeout;
+        protected Maybe<V> onTimeout;
+        protected  boolean ignoreUnmanaged = WaitInTaskForAttributeReady.DEFAULT_IGNORE_UNMANAGED;
+        protected Maybe<V> onUnmanaged;
+
+        protected Builder(Entity source, AttributeSensor<T> sensor) {
+            this.source = source;
+            this.sensor = sensor;
+        }
         
         /**
          * Will wait for the attribute on the given entity.
-         * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort. 
+         * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort.
+         * @deprecated since 0.7.0 use {@link DependentConfiguration#builder()} then {@link ProtoBuilder#attributeWhenReady(Entity, AttributeSensor)} then {@link #abortIfOnFire()} 
          */
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) {
             this.source = checkNotNull(source, "source");
             this.sensor = (AttributeSensor) checkNotNull(sensor, "sensor");
-            abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE));
+            abortIfOnFire();
             return (Builder<T2, T2>) this;
         }
         public Builder<T,V> readiness(Closure<Boolean> val) {
@@ -460,10 +604,12 @@ public class DependentConfiguration {
             this.readiness = checkNotNull(val, "ready");
             return this;
         }
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         public <V2> Builder<T,V2> postProcess(Closure<V2> val) {
             this.postProcess = (Function) GroovyJavaMethods.<T,V2>functionFromClosure(checkNotNull(val, "postProcess"));
             return (Builder<T,V2>) this;
         }
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         public <V2> Builder<T,V2> postProcess(final Function<? super T, V2>  val) {
             this.postProcess = (Function) checkNotNull(val, "postProcess");
             return (Builder<T,V2>) this;
@@ -472,28 +618,71 @@ public class DependentConfiguration {
             return abortIf(source, sensor, GroovyJavaMethods.truthPredicate());
         }
         public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) {
-            abortConditions.add(new AttributeAndSensorCondition<T2>(source, sensor, predicate));
+            abortSensorConditions.add(new AttributeAndSensorCondition<T2>(source, sensor, predicate));
+            return this;
+        }
+        public Builder<T,V> abortIfOnFire() {
+            abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE));
             return this;
         }
         public Builder<T,V> blockingDetails(String val) {
             blockingDetails = val;
             return this;
         }
+        /** specifies an optional timeout; by default it waits forever, or until unmanaged or other abort condition */
+        public Builder<T,V> timeout(Duration val) {
+            timeout = val;
+            return this;
+        }
+        public Builder<T,V> onTimeoutReturn(V val) {
+            onTimeout = Maybe.of(val);
+            return this;
+        }
+        public Builder<T,V> onTimeoutThrow() {
+            onTimeout = Maybe.<V>absent();
+            return this;
+        }
+        public Builder<T,V> onUnmanagedReturn(V val) {
+            onUnmanaged = Maybe.of(val);
+            return this;
+        }
+        public Builder<T,V> onUnmanagedThrow() {
+            onUnmanaged = Maybe.<V>absent();
+            return this;
+        }
+        /** @since 0.7.0 included in case old behaviour of not checking whether the entity is managed is required
+         * (I can't see why it is; polling will likely give errors, once it is unmanaged this will never completed,
+         * and before management the current code will continue, so long as there are no other errors) */ @Deprecated
+        public Builder<T,V> onUnmanagedContinue() {
+            ignoreUnmanaged = true;
+            return this;
+        }
+        /** take advantage of the fact that this builder can build multiple times, allowing subclasses 
+         * to change the source along the way */
+        protected Builder<T,V> source(Entity source) {
+            this.source = source;
+            return this;
+        }
+        /** as {@link #source(Entity)} */
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        protected Builder<T,V> sensor(AttributeSensor<? extends T> sensor) {
+            this.sensor = (AttributeSensor) sensor;
+            return this;
+        }
         public Task<V> build() {
             validate();
-            return new BasicTask<V>(
-                    MutableMap.of("tag", "attributeWhenReady", "displayName", "retrieving sensor "+sensor.getName()+" from "+source.getDisplayName()), 
-                    new Callable<V>() {
-                        @Override public V call() {
-                            T result = waitInTaskForAttributeReady(source, sensor, readiness, abortConditions, blockingDetails);
-                            return postProcess.apply(result);
-                        }
-                    });
+            
+            return Tasks.<V>builder().dynamic(false)
+                .name("waiting on "+sensor.getName())
+                .description("Waiting on sensor "+sensor.getName()+" from "+source)
+                .tag("attributeWhenReady")
+                .body(new WaitInTaskForAttributeReady<T,V>(this))
+                .build();
         }
+        
         public V runNow() {
             validate();
-            T result = waitInTaskForAttributeReady(source, sensor, readiness, abortConditions, blockingDetails);
-            return postProcess.apply(result);
+            return new WaitInTaskForAttributeReady<T,V>(this).call();
         }
         private void validate() {
             checkNotNull(source, "Entity source");
@@ -509,7 +698,12 @@ public class DependentConfiguration {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Beta
     public static class MultiBuilder<T, V, V2> {
-        protected List<AttributeAndSensorCondition<?>> multiSource = Lists.newArrayList();
+        protected final String name;
+        protected final String descriptionBase;
+        protected final Builder<T,V> builder;
+        // if desired, the use of this multiSource could allow different conditions; 
+        // but probably an easier API just for the caller to build the parallel task  
+        protected final List<AttributeAndSensorCondition<?>> multiSource = Lists.newArrayList();
         protected Function<? super List<V>, ? extends V2> postProcessFromMultiple;
         
         /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, 
@@ -520,34 +714,106 @@ public class DependentConfiguration {
         }
         @Beta
         protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) {
+            builder = new Builder<T,V>(null, sensor);
+            builder.readiness(readiness);
+            
             for (Entity s : checkNotNull(sources, "sources")) {
-                AttributeAndSensorCondition<T> condition = new AttributeAndSensorCondition<T>(s, sensor, readiness);
-                multiSource.add(condition);
+                multiSource.add(new AttributeAndSensorCondition<T>(s, sensor, readiness));
             }
+            this.name = "waiting on "+sensor.getName();
+            this.descriptionBase = "waiting on "+sensor.getName()+" "+readiness
+                +" from "+Iterables.size(sources)+" entit"+Strings.ies(sources);
         }
+        
+        /** Apply post-processing to the entire list of results */
         public <V2b> MultiBuilder<T, V, V2b> postProcessFromMultiple(final Function<? super List<V>, V2b> val) {
-            this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcess");
+            this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcessFromMulitple");
             return (MultiBuilder<T,V, V2b>) this;
         }
+        /** Apply post-processing to the entire list of results 
+         * See {@link CollectionFunctionals#all(Predicate)} and {@link CollectionFunctionals#quorum(brooklyn.util.collections.QuorumCheck, Predicate)
+         * which allow useful arguments. */
+        public MultiBuilder<T, V, Boolean> postProcessFromMultiple(final Predicate<? super List<V>> val) {
+            return postProcessFromMultiple(Functions.forPredicate(val));
+        }
+        
+        public <V1> MultiBuilder<T, V1, V2> postProcess(Closure<V1> val) {
+            builder.postProcess(val);
+            return (MultiBuilder<T, V1, V2>) this;
+        }
+        public <V1> MultiBuilder<T, V1, V2> postProcess(final Function<? super T, V1>  val) {
+            builder.postProcess(val);
+            return (MultiBuilder<T, V1, V2>) this;
+        }
+        public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor) {
+            builder.abortIf(source, sensor);
+            return this;
+        }
+        public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) {
+            builder.abortIf(source, sensor, predicate);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> abortIfOnFire() {
+            builder.abortIfOnFire();
+            return this;
+        }
+        public MultiBuilder<T, V, V2> blockingDetails(String val) {
+            builder.blockingDetails(val);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> timeout(Duration val) {
+            builder.timeout(val);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> onTimeoutReturn(V val) {
+            builder.onTimeoutReturn(val);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> onTimeoutThrow() {
+            builder.onTimeoutThrow();
+            return this;
+        }
+        public MultiBuilder<T, V, V2> onUnmanagedReturn(V val) {
+            builder.onUnmanagedReturn(val);
+            return this;
+        }
+        public MultiBuilder<T, V, V2> onUnmanagedThrow() {
+            builder.onUnmanagedThrow();
+            return this;
+        }
+        
         public Task<V2> build() {
-            checkState(multiSource.size() > 0, "Entity sources must be set: multiSource=%s", multiSource);
+            List<Task<V>> tasks = MutableList.of();
+            for (AttributeAndSensorCondition<?> source: multiSource) {
+                builder.source(source.source);
+                builder.sensor((AttributeSensor)source.sensor);
+                builder.readiness((Predicate)source.predicate);
+                tasks.add(builder.build());
+            }
+            final Task<List<V>> parallelTask = Tasks.<List<V>>builder().parallel(true).addAll(tasks)
+                .name(name)
+                .description(descriptionBase+
+                    (builder.timeout!=null ? ", timeout "+builder.timeout : ""))
+                .build();
             
-            // TODO Do we really want to try to support the list-of-entities?
-            final Task<List<V>> task = (Task<List<V>>) new ParallelTask<V>(Iterables.transform(multiSource, new Function<AttributeAndSensorCondition<?>, Task<T>>() {
-                @Override public Task<T> apply(AttributeAndSensorCondition<?> it) {
-                    return (Task) builder().attributeWhenReady(it.source, it.sensor).readiness((Predicate)it.predicate).build();
-                }
-            }));
             if (postProcessFromMultiple == null) {
-                return (Task<V2>) task;
+                // V2 should be the right type in normal operations
+                return (Task<V2>) parallelTask;
             } else {
-                return new BasicTask(new Callable<V2>() {
-                    @Override public V2 call() throws Exception {
-                        List<V> prePostProgress = DynamicTasks.queueIfPossible(task).orSubmitAndBlock().getTask().get();
-                        return postProcessFromMultiple.apply(prePostProgress);
-                    }
-                });
+                return Tasks.<V2>builder().name(name).description(descriptionBase)
+                    .tag("attributeWhenReady")
+                    .body(new Callable<V2>() {
+                        @Override public V2 call() throws Exception {
+                            List<V> prePostProgress = DynamicTasks.queue(parallelTask).get();
+                            return DynamicTasks.queue(
+                                Tasks.<V2>builder().name("post-processing").description("Applying "+postProcessFromMultiple)
+                                    .body(Functionals.<List<V>,V2>callable((Function)postProcessFromMultiple, prePostProgress))
+                                    .build()).get();
+                        }
+                    })
+                    .build();
             }
         }
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/main/java/brooklyn/util/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java
index 0edd07b..fd33985 100644
--- a/core/src/main/java/brooklyn/util/task/Tasks.java
+++ b/core/src/main/java/brooklyn/util/task/Tasks.java
@@ -445,16 +445,28 @@ public class Tasks {
             return false;
         }
     }
-    
-    /** creates an (unsubmitted) task which waits for the given repeater, optionally failing if it does not complete with success */
-    public static TaskAdaptable<Boolean> awaiting(Repeater repeater, boolean requireTrue) {
-        return awaitingBuilder(repeater, requireTrue).build();
+
+    /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe,
+     * returning true or false depending on whether repeater succeed */
+    public static TaskBuilder<Boolean> testing(Repeater repeater) {
+        return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, false))
+            .name("waiting for condition")
+            .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription());
     }
 
-    /** creates a partially instantiated builder which waits for the given repeater, optionally failing if it does not complete with success,
-     * for further task customization and then {@link TaskBuilder#build()} */
-    public static TaskBuilder<Boolean> awaitingBuilder(Repeater repeater, boolean requireTrue) {
-        return Tasks.<Boolean>builder().name(repeater.getDescription()).body(new WaitForRepeaterCallable(repeater, requireTrue));
+    /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe,
+     * throwing if it does not */
+    public static TaskBuilder<?> requiring(Repeater repeater) {
+        return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, true))
+            .name("waiting for condition")
+            .description("Requiring " + getTimeoutString(repeater) + ": "+repeater);
+    }
+    
+    private static String getTimeoutString(Repeater repeater) {
+        Duration timeout = repeater.getTimeLimit();
+        if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout))
+            return "eventually";
+        return "in "+timeout;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java b/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java
index b4c144f..9394b85 100644
--- a/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java
+++ b/core/src/test/java/brooklyn/entity/basic/EntityPredicatesTest.java
@@ -110,7 +110,7 @@ public class EntityPredicatesTest extends BrooklynAppUnitTestSupport {
     @Test
     public void testWithLocation() throws Exception {
         entity.addLocations(ImmutableList.of(loc));
-        assertTrue(EntityPredicates.locationsInclude(loc).apply(entity));
-        assertFalse(EntityPredicates.locationsInclude(loc).apply(app));
+        assertTrue(EntityPredicates.locationsIncludes(loc).apply(entity));
+        assertFalse(EntityPredicates.locationsIncludes(loc).apply(app));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/core/src/test/java/brooklyn/util/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/TasksTest.java b/core/src/test/java/brooklyn/util/task/TasksTest.java
index 9ee9970..68aa2ea 100644
--- a/core/src/test/java/brooklyn/util/task/TasksTest.java
+++ b/core/src/test/java/brooklyn/util/task/TasksTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertEquals;
 import java.util.Map;
 import java.util.Set;
 
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -34,12 +35,14 @@ import brooklyn.management.Task;
 import brooklyn.test.entity.TestApplication;
 import brooklyn.test.entity.TestEntity;
 import brooklyn.util.guava.Functionals;
+import brooklyn.util.repeat.Repeater;
 import brooklyn.util.time.Duration;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Callables;
 
 
 public class TasksTest extends BrooklynAppUnitTestSupport {
@@ -127,4 +130,30 @@ public class TasksTest extends BrooklynAppUnitTestSupport {
         assertResolvesValue(v, Object.class, "foo");
     }
 
+    @Test
+    public void testRepeater() throws Exception {
+        Task<?> t;
+        
+        t = Tasks.requiring(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        t.get(Duration.ONE_SECOND);
+        
+        t = Tasks.testing(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        Assert.assertEquals(t.get(Duration.ONE_SECOND), true);
+        
+        t = Tasks.requiring(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        try {
+            t.get(Duration.ONE_SECOND);
+            Assert.fail("Should have failed");
+        } catch (Exception e) {
+            // expected
+        }
+
+        t = Tasks.testing(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        Assert.assertEquals(t.get(Duration.ONE_SECOND), false);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
index b815ee4..48553b3 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
@@ -22,14 +22,12 @@ import java.io.File;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import brooklyn.config.ConfigKey;
 import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
 import brooklyn.entity.Group;
@@ -91,9 +89,8 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> imple
             
             specCfg.putAll(ConfigBag.newInstance(parameters.get(EXTRA_CONFIG)).getAllConfigAsConfigKeyMap());
             
-            Map<ConfigKey<?>, Object> cfgLive = memberSpec.getConfigLive();
-            cfgLive.clear();
-            cfgLive.putAll(specCfg.getAllConfigAsConfigKeyMap());
+            memberSpec.clearConfig();
+            memberSpec.configure(specCfg.getAllConfigAsConfigKeyMap());
             // not necessary, but good practice
             entity().setConfig(BrooklynCluster.MEMBER_SPEC, memberSpec);
             log.debug("Upgrading "+entity()+", new "+BrooklynCluster.MEMBER_SPEC+": "+memberSpec+" / "+specCfg);
@@ -101,9 +98,8 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> imple
             upgrade(parameters);
         } catch (Exception e) {
             log.debug("Upgrading "+entity()+" failed, will rethrow after restoring "+BrooklynCluster.MEMBER_SPEC+" to: "+origSpecCfg);
-            Map<ConfigKey<?>, Object> cfgLive = memberSpec.getConfigLive();
-            cfgLive.clear();
-            cfgLive.putAll(origSpecCfg.getAllConfigAsConfigKeyMap());
+            memberSpec.clearConfig();
+            memberSpec.configure(origSpecCfg.getAllConfigAsConfigKeyMap());
             // not necessary, but good practice
             entity().setConfig(BrooklynCluster.MEMBER_SPEC, memberSpec);
             
@@ -187,7 +183,7 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> imple
 
         //2. Wait for them to be RUNNING (or at least STARTING to have completed)
         // (should already be the case, because above is synchronous and, we think, it will fail if start does not succeed)
-        DynamicTasks.queue(EntityTasks.awaitingAttribute(newNodes, Attributes.SERVICE_STATE_ACTUAL, 
+        DynamicTasks.queue(EntityTasks.requiringAttributeEventually(newNodes, Attributes.SERVICE_STATE_ACTUAL, 
                 Predicates.not(Predicates.equalTo(Lifecycle.STARTING)), Duration.minutes(30)));
 
         //3. Set HOT_STANDBY in case it is not enabled on the command line ...
@@ -197,7 +193,7 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> imple
                 newNodes)).asTask().getUnchecked();
         //... and wait until all of the nodes change state
         // TODO fail quicker if state changes to FAILED
-        DynamicTasks.queue(EntityTasks.awaitingAttribute(newNodes, BrooklynNode.MANAGEMENT_NODE_STATE, 
+        DynamicTasks.queue(EntityTasks.requiringAttributeEventually(newNodes, BrooklynNode.MANAGEMENT_NODE_STATE, 
                 Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.FIVE_MINUTES));
 
         //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well)

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
index 299bd5b..00ab7bc 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
@@ -128,7 +128,7 @@ public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void> {
         DynamicTasks.queue(Effectors.invocation(dryRunChild, BrooklynNode.START, ConfigBag.EMPTY));
 
         // 2 confirm hot standby status
-        DynamicTasks.queue(EntityTasks.awaitingAttribute(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE, 
+        DynamicTasks.queue(EntityTasks.requiringAttributeEventually(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE, 
             Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.FIVE_MINUTES));
 
         // 3 stop new version

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
index a42091e..9ac7202 100644
--- a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
+++ b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
@@ -181,7 +181,7 @@ public class CollectionFunctionals {
     // ---------
     
     public static <T,TT extends Iterable<T>> Predicate<TT> all(Predicate<T> attributeSatisfies) {
-        return new QuorumSatisfies<T, TT>(QuorumChecks.all(), attributeSatisfies);
+        return quorum(QuorumChecks.all(), attributeSatisfies);
     }
 
     public static <T,TT extends Iterable<T>> Predicate<TT> quorum(QuorumCheck quorumCheck, Predicate<T> attributeSatisfies) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java b/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java
new file mode 100644
index 0000000..9550288
--- /dev/null
+++ b/utils/common/src/main/java/brooklyn/util/exceptions/NotManagedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.util.exceptions;
+
+public class NotManagedException extends IllegalStateException {
+
+    private static final long serialVersionUID = -3359163414517503809L;
+
+    public NotManagedException(Object object) {
+        super(object+" is not managed");
+    }
+    
+    public NotManagedException(String message) {
+        super(message);
+    }
+    
+    public NotManagedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java b/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java
new file mode 100644
index 0000000..c31512f
--- /dev/null
+++ b/utils/common/src/main/java/brooklyn/util/exceptions/TimeoutException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.util.exceptions;
+
+public class TimeoutException extends IllegalStateException {
+
+    private static final long serialVersionUID = -3359163414517503809L;
+
+    public TimeoutException() {
+        super("timeout");
+    }
+    
+    public TimeoutException(String message) {
+        super(message);
+    }
+    
+    public TimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/14f17bc9/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java b/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java
index 1e3e188..bd76ea8 100644
--- a/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java
+++ b/utils/common/src/main/java/brooklyn/util/repeat/Repeater.java
@@ -386,5 +386,9 @@ public class Repeater {
     public String getDescription() {
         return description;
     }
+
+    public Duration getTimeLimit() {
+        return timeLimit;
+    }
     
 }