You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/03 14:23:53 UTC

[16/35] brooklyn-server git commit: improve task cancellation code

improve task cancellation code

consistently notify listeners and update maps and counts, and remove deprecated internal listeners class;
add logging and speed increase to related tests (and Asserts.eventually can wait on on object to speed things up!)


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/7c2c8e1a
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/7c2c8e1a
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/7c2c8e1a

Branch: refs/heads/master
Commit: 7c2c8e1aa9a7a2c0be9476c93bd5f8c46c77e1ae
Parents: 5e19dec
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Sep 15 18:30:11 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Sat Sep 16 02:42:05 2017 +0100

----------------------------------------------------------------------
 .../util/core/task/BasicExecutionManager.java   | 83 ++++++++++++++------
 .../brooklyn/util/core/task/BasicTask.java      |  6 +-
 .../core/task/ListenableForwardingFuture.java   | 75 ------------------
 .../core/task/DynamicSequentialTaskTest.java    | 17 ++--
 .../java/org/apache/brooklyn/test/Asserts.java  | 27 ++++++-
 .../apache/brooklyn/util/repeat/Repeater.java   | 22 +++++-
 6 files changed, 115 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7c2c8e1a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index bed08e7..4fcfadb 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -77,6 +78,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Callables;
 import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -110,10 +112,11 @@ public class BasicExecutionManager implements ExecutionManager {
     private final ExecutorService runner;
         
     private final ScheduledExecutorService delayedRunner;
-    
-    // TODO Could have a set of all knownTasks; but instead we're having a separate set per tag,
-    // so the same task could be listed multiple times if it has multiple tags...
 
+    // inefficient having so many records, and also doing searches through ...
+    // many things in here could be more efficient however (different types of lookup etc),
+    // do that when we need to.
+    
     //access to this field AND to members in this field is synchronized, 
     //to allow us to preserve order while guaranteeing thread-safe
     //(but more testing is needed before we are completely sure it is thread-safe!)
@@ -553,9 +556,11 @@ public class BasicExecutionManager implements ExecutionManager {
             Throwable error = null;
             try {
                 beforeStartAtomicTask(flags, task);
-                if (!task.isCancelled()) {
-                    result = ((TaskInternal<T>)task).getJob().call();
-                } else throw new CancellationException();
+                if (task.isCancelled()) {
+                    afterEndForCancelBeforeStart(flags, task, false);
+                    throw new CancellationException();
+                }
+                result = ((TaskInternal<T>)task).getJob().call();
             } catch(Throwable e) {
                 error = e;
             } finally {
@@ -570,21 +575,33 @@ public class BasicExecutionManager implements ExecutionManager {
         }
     }
 
-    @SuppressWarnings("deprecation")
-    // TODO do we even need a listenable future here?  possibly if someone wants to interrogate the future it might
-    // be interesting, so possibly it is useful that we implement ListenableFuture...
-    private final static class CancellingListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
+    final static class CancellingListenableForwardingFutureForTask<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
         private final Task<T> task;
         private BasicExecutionManager execMgmt;
-
+        private final ExecutionList listeners;
+        
         private CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, Future<T> delegate, ExecutionList list, Task<T> task) {
-            super(delegate, list);
+            super(delegate);
+            this.listeners = list;
             this.execMgmt = execMgmt;
             this.task = task;
         }
 
         @Override
-        public boolean cancel(TaskCancellationMode mode) {
+        public final boolean cancel(boolean mayInterrupt) {
+            return cancel(TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS);
+        }
+
+        @Override
+        public void addListener(Runnable listener, Executor executor) {
+            listeners.add(listener, executor);
+        }
+
+        public ExecutionList getListeners() {
+            return listeners;
+        }
+        
+        boolean cancel(TaskCancellationMode mode) {
             boolean result = false;
             if (log.isTraceEnabled()) {
                 log.trace("CLFFT cancelling "+task+" mode "+mode);
@@ -616,7 +633,6 @@ public class BasicExecutionManager implements ExecutionManager {
                         }
                     }
                 }
-                // TODO this is inefficient; might want to keep an index on submitted-by
                 for (Task<?> t: execMgmt.getAllTasks()) {
                     if (task.equals(t.getSubmittedByTask())) {
                         if (mode.isAllowedToInterruptAllSubmittedTasks() || BrooklynTaskTags.isTransient(t)) {
@@ -636,7 +652,7 @@ public class BasicExecutionManager implements ExecutionManager {
                 }
             }
   
-            // note: as of 2017-09 no longer run listeners when we say to cancel, they get run when the task really ends
+            execMgmt.afterEndForCancelBeforeStart(null, task, true);
             return result;
         }
     }
@@ -644,14 +660,15 @@ public class BasicExecutionManager implements ExecutionManager {
     // NB: intended to be run by task.runListeners, used to run any listeners the manager wants 
     private final class SubmissionListenerToCallManagerListeners<T> implements Runnable {
         private final Task<T> task;
+        private final CancellingListenableForwardingFutureForTask<T> listenerSource;
 
-        private SubmissionListenerToCallManagerListeners(Task<T> task) {
+        public SubmissionListenerToCallManagerListeners(Task<T> task, CancellingListenableForwardingFutureForTask<T> listenableFuture) {
             this.task = task;
+            listenerSource = listenableFuture;
         }
 
         @Override
         public void run() {
-            
             for (ExecutionListener listener : listeners) {
                 try {
                     listener.onTaskDone(task);
@@ -659,6 +676,8 @@ public class BasicExecutionManager implements ExecutionManager {
                     log.warn("Error running execution listener "+listener+" of task "+task+" done", e);
                 }
             }
+            // run any listeners the task owner has added to its future
+            listenerSource.getListeners().execute();
         }
     }
 
@@ -710,9 +729,9 @@ public class BasicExecutionManager implements ExecutionManager {
         // this future allows a caller to add custom listeners
         // (it does not notify the listeners; that's our job);
         // except on cancel we want to listen
-        ListenableFuture<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task);
+        CancellingListenableForwardingFutureForTask<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task);
         // and we want to make sure *our* (manager) listeners are given suitable callback 
-        ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallManagerListeners<T>(task), runner);
+        ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallManagerListeners<T>(task, listenableFuture), runner);
         // NB: can the above mean multiple callbacks to TaskInternal#runListeners?
         
         // finally expose the future to callers
@@ -875,18 +894,37 @@ public class BasicExecutionManager implements ExecutionManager {
     protected void afterEndInSameThreadTask(Map<?,?> flags, Task<?> task, Throwable error) {
         internalAfterEnd(flags, task, true, true, error);
     }
+    protected void afterEndForCancelBeforeStart(Map<?,?> flags, Task<?> task, boolean calledFromCanceller) {
+        if (calledFromCanceller) {
+            if (task.isBegun()) {
+                // do nothing from canceller thread if task has begin;
+                // we don't want to set end time or listeners prematurely.
+                return ;
+            } else {
+                // normally task won't be submitted by executor, so do some of the end operations.
+                // there is a chance task has begun but not set start time,
+                // in which case _both_ canceller thread and task thread will run this,
+                // but they will happen very close in time so end time is set sensibly by either,
+                // and dedupe will be done based on presence in "incompleteTaskIds"
+                // to ensure listeners and callback only invoked once
+            }
+        }
+        internalAfterEnd(flags, task, !calledFromCanceller, true, null);
+    }
+    
     /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)},
      * and, for atomic tasks and scheduled-task submission iterations where 
      * always called once if {@link #internalBeforeStart(Map, Task)} is invoked and in the same thread as that method */
     protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations, Throwable error) {
+        boolean taskWasSubmittedAndNotYetEnded = true;
         try {
             if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task);
             if (startedInThisThread) {
                 activeTaskCount.decrementAndGet();
             }
             if (isEndingAllIterations) {
-                incompleteTaskIds.remove(task.getId());
-                if (flags!=null) {
+                taskWasSubmittedAndNotYetEnded = incompleteTaskIds.remove(task.getId());
+                if (flags!=null && taskWasSubmittedAndNotYetEnded) {
                     invokeCallback(flags.get("newTaskEndCallback"), task);
                 }
                 ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
@@ -934,7 +972,8 @@ public class BasicExecutionManager implements ExecutionManager {
                 }
             } finally {
                 synchronized (task) { task.notifyAll(); }
-                if (isEndingAllIterations) {
+                if (isEndingAllIterations && taskWasSubmittedAndNotYetEnded) {
+                    // prevent from running twice on cancellation after start
                     ((TaskInternal<?>)task).runListeners();
                 }
             }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7c2c8e1a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
index 3c9b163..d952f9e 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
@@ -45,6 +45,7 @@ import org.apache.brooklyn.api.mgmt.HasTaskChildren;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.objs.Identifiable;
 import org.apache.brooklyn.util.JavaGroovyEquivalents;
+import org.apache.brooklyn.util.core.task.BasicExecutionManager.CancellingListenableForwardingFutureForTask;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.guava.Maybe.MaybeSupplier;
@@ -327,11 +328,10 @@ public class BasicTask<T> implements TaskInternal<T> {
         return true;
     }
     
-    @SuppressWarnings("deprecation")
     protected boolean doCancel(TaskCancellationMode mode) {
         if (internalFuture!=null) { 
-            if (internalFuture instanceof ListenableForwardingFuture) {
-                return ((ListenableForwardingFuture<?>)internalFuture).cancel(mode);
+            if (internalFuture instanceof CancellingListenableForwardingFutureForTask) {
+                return ((CancellingListenableForwardingFutureForTask<?>)internalFuture).cancel(mode);
             } else {
                 return internalFuture.cancel(mode.isAllowedToInterruptTask());
             }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7c2c8e1a/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
deleted file mode 100644
index ac42d2f..0000000
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.util.core.task;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-
-import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the responsibility to:
- * <li> invoke the listeners on job completion (success or error)
- * <li> invoke the listeners on cancel
- * 
- * @deprecated since 0.9.0 likely to leave the public API */
-@Deprecated  // TODO just one subclass, it can hold the behaviour we need from this, 
-// and the methods here are surprising as they expect the caller to notify the list
-public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
-
-    private static final Logger log = LoggerFactory.getLogger(ListenableForwardingFuture.class);
-    
-    // TODO these are never accessed or used
-    final ExecutionList listeners;
-    
-    protected ListenableForwardingFuture(Future<T> delegate) {
-        super(delegate);
-        this.listeners = new ExecutionList();
-    }
-
-    protected ListenableForwardingFuture(Future<T> delegate, ExecutionList list) {
-        super(delegate);
-        this.listeners = list;
-    }
-
-    private static boolean warned = false;
-    
-    @Override
-    public void addListener(Runnable listener, Executor executor) {
-        if (!warned) {
-            log.warn("Use of deprecated ListenableForwardingFuture.addListener at "+this+" (future calls will not be logged)", new Throwable("stack trace"));
-            warned = true;
-        }
-        
-        listeners.add(listener, executor);
-    }
-    
-    public abstract boolean cancel(TaskCancellationMode mode);
-    
-    @Override
-    public final boolean cancel(boolean mayInterrupt) {
-        return cancel(TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7c2c8e1a/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
index e218cc7..f0bcdf7 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
@@ -50,7 +50,6 @@ import org.testng.annotations.Test;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -87,6 +86,7 @@ public class DynamicSequentialTaskTest {
     @AfterMethod(alwaysRun=true)
     public void tearDown() throws Exception {
         if (em != null) {
+            log.debug("tearing down");
             // need to await termination, otherwise interrupted-but-still-running threads 
             // may update the cancellations/messages and interfere with subsequent tests
             Assert.assertTrue(em.shutdownNow(Duration.FIVE_SECONDS));
@@ -250,7 +250,7 @@ public class DynamicSequentialTaskTest {
     
     @Test
     public void testCancellationModeAndSubmitted() throws Exception {
-        // seems actually to be the logging which causes this to take ~50ms ?
+        // used to spend time waiting for end time to be set; now listens for notify which makes it go faster
         
         doTestCancellationModeAndSubmitted(true, TaskCancellationMode.DO_NOT_INTERRUPT, false, false);
         
@@ -304,24 +304,21 @@ public class DynamicSequentialTaskTest {
             throw new IllegalStateException("Invalid cancellationMode: "+cancellationMode);
         }
 
-        // the cancelled task always reports cancelled and done
+        // the cancelled task always reports cancelled and done right away
         Assert.assertEquals(t.isDone(), true);
         Assert.assertEquals(t.isCancelled(), true);
-        // end time might not be set for another fraction of a second
         if (expectedTaskInterrupted) { 
-            Asserts.eventually(new Supplier<Number>() {
-                @Override public Number get() { return t.getEndTimeUtc(); }}, 
-                MathPredicates.<Number>greaterThanOrEqual(0));
+            // end time might not be set for another fraction of a second, when task truly ended, but should be set soon if task was interrupted
+            Asserts.eventually( () -> t.getEndTimeUtc(), MathPredicates.greaterThanOrEqual(0), null, null, null, t );
         } else {
+            // if interrupts not allowed, time should not be set until long delay expires
             Assert.assertTrue(t.getEndTimeUtc() < 0, "Wrong end time: "+t.getEndTimeUtc());
         }
         
         if (expectedSubtaskCancelled) {
             Asserts.eventually(Suppliers.ofInstance(t1), TaskPredicates.isDone());
             Assert.assertTrue(t1.isCancelled());
-            Asserts.eventually(new Supplier<Number>() {
-                @Override public Number get() { return t1.getEndTimeUtc(); }}, 
-                MathPredicates.<Number>greaterThanOrEqual(0));
+            Asserts.eventually( () -> t1.getEndTimeUtc(), MathPredicates.greaterThanOrEqual(0), null, null, null, t );
         } else {
             Time.sleep(TINY_TIME);
             Assert.assertFalse(t1.isCancelled());

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7c2c8e1a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index 0a105d9..dc672b5 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -805,7 +805,11 @@ public class Asserts {
     public static <T> void eventually(Supplier<? extends T> supplier, Predicate<T> predicate, Duration timeout) {
         eventually(supplier, predicate, timeout, null, null);
     }
-    
+
+    /** As {@link #eventually(Supplier, Predicate, Duration, Duration, String)} when no object is going to be notified. */
+    public static <T> void eventually(Supplier<? extends T> supplier, Predicate<T> predicate, Duration timeout, Duration period, String errMsg) {
+        eventually(supplier, predicate, timeout, period, errMsg, null);
+    }
     /** Asserts that eventually the supplier gives a value accepted by the predicate. 
      * Tests periodically and succeeds as soon as the supplier gives an allowed value.
      * Other arguments can be null.
@@ -814,9 +818,12 @@ public class Asserts {
      * @param predicate the {@link Predicate} to apply to each value given by the supplier
      * @param timeout how long to wait, default {@link #DEFAULT_LONG_TIMEOUT}
      * @param period how often to check, default quite often so you won't notice but letting the CPU do work
-     * @param errMsg an error message to display if not satisfied, in addition to the last-tested supplied value and the predicate
+     * @param errMsg optional error message to display if not satisfied, in addition to the last-tested supplied value and the predicate
+     * @param notifyObject optional object that will be notified of change and should pre-empt the period to redo the check
      */
-    public static <T> void eventually(Supplier<? extends T> supplier, Predicate<T> predicate, Duration timeout, Duration period, String errMsg) {
+    public static <T> void eventually(Supplier<? extends T> supplier, Predicate<T> predicate, Duration timeout, Duration period, String errMsg, Object notifyObject) {
+        // TODO use Repeater (there are too many args here)
+        
         if (timeout==null) timeout = DEFAULT_LONG_TIMEOUT;
         if (period==null) period = DEFAULT_SHORT_PERIOD;
         CountdownTimer timeleft = timeout.countdownTimer();
@@ -824,7 +831,19 @@ public class Asserts {
         T supplied;
         int count = 0;
         do {
-            if (count++ > 0) Duration.sleep(period);
+            if (count++ > 0) {
+                if (notifyObject!=null) {
+                    synchronized (notifyObject) {
+                        try {
+                            notifyObject.wait(period.toMilliseconds());
+                        } catch (InterruptedException e) {
+                            throw Exceptions.propagate(e);
+                        }
+                    }
+                } else {
+                    Duration.sleep(period);
+                }
+            }
             supplied = supplier.get();
             if (predicate.apply(supplied)) return;
         } while (timeleft.isNotExpired());

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7c2c8e1a/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java b/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java
index a04f58c..4c59c8d 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java
@@ -42,6 +42,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.Callables;
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -246,6 +247,10 @@ public class Repeater implements Callable<Boolean> {
         return backoff(Duration.millis(10), 1.2, finalDelay);
     }
 
+    // TODO support waitingOn to allow notify to interrupt the waits;
+    // however TBC whether such a wake increases iteration count and backoff timer;
+    // probably not as there could be any number of spurious wakes to increment that unexpectedly
+    
     /**
      * Set code fragment that tests if the loop has completed.
      *
@@ -269,6 +274,21 @@ public class Repeater implements Callable<Boolean> {
         });
     }
 
+    public <T> Repeater until(final Supplier<T> supplier, final Predicate<T> exitCondition) {
+        Preconditions.checkNotNull(supplier, "supplier must not be null");
+        Preconditions.checkNotNull(exitCondition, "exitCondition must not be null");
+        return until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                return exitCondition.apply(supplier.get());
+            }
+            @Override
+            public String toString() {
+                return supplier.get()+" "+exitCondition.toString();
+            }
+        });
+    }
+
     /**
      * If the exit condition check throws an exception, it will be recorded and the last exception will be thrown on failure.
      *
@@ -350,7 +370,7 @@ public class Repeater implements Callable<Boolean> {
         ReferenceWithError<Boolean> result = runKeepingError();
         result.checkNoError();
         if (!result.get()) {
-            throw new IllegalStateException(description+" unsatisfied after "+Duration.of(timer));
+            throw new IllegalStateException(description+" unsatisfied after "+Duration.of(timer)+": "+exitCondition);
         }
     }