You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2017/03/02 17:01:57 UTC

[13/19] brooklyn-server git commit: ensure TaskFactory items evaluated immediately don't leak long-running tasks

ensure TaskFactory items evaluated immediately don't leak long-running tasks


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/cd3d4864
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/cd3d4864
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/cd3d4864

Branch: refs/heads/master
Commit: cd3d4864aa2a59a18f28997313ca07bc9185fd62
Parents: 0aa29ef
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Sat Feb 18 16:31:26 2017 +0000
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Sat Feb 18 16:31:26 2017 +0000

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ExecutionContext.java     |  4 +-
 .../util/core/task/BasicExecutionContext.java   | 16 +++-
 .../task/InterruptingImmediateSupplier.java     | 26 ++++--
 .../brooklyn/util/core/task/ValueResolver.java  | 88 ++++++++++++++------
 .../util/core/task/ValueResolverTest.java       | 51 ++++++++++++
 5 files changed, 151 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index f8a963a..344907a 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -75,10 +75,10 @@ public interface ExecutionContext extends Executor {
      * tricks to make it look like it is in a sub-thread, and will attempt to be non-blocking but
      * if needed they may block.
      * <p>
-     * Supports {@link Callable} and {@link Runnable} targets to be evaluated with "immediate" semantics.
+     * Supports {@link Callable} and {@link Runnable} and some {@link Task} targets to be evaluated with "immediate" semantics.
      */
     // TODO reference ImmediateSupplier when that class is moved to utils project
     @Beta
-    <T> Maybe<T> getImmediately(Object callableOrSupplier);
+    <T> Maybe<T> getImmediately(Object callableOrSupplierOrTask);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 6c69509..0799607 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -41,6 +41,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,7 +104,20 @@ public class BasicExecutionContext extends AbstractExecutionContext {
     @SuppressWarnings("unchecked")
     @Override
     public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
-        BasicTask<?> fakeTaskForContext = new BasicTask<Object>(MutableMap.of("displayName", "immediate evaluation"));
+        BasicTask<?> fakeTaskForContext;
+        if (callableOrSupplier instanceof BasicTask) {
+            fakeTaskForContext = (BasicTask<?>)callableOrSupplier;
+            if (fakeTaskForContext.isQueuedOrSubmitted()) {
+                if (fakeTaskForContext.isDone()) {
+                    return Maybe.of((T)fakeTaskForContext.getUnchecked());
+                } else {
+                    throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext);
+                }
+            }
+            callableOrSupplier = fakeTaskForContext.getJob();
+        } else {
+            fakeTaskForContext = new BasicTask<Object>(MutableMap.of("displayName", "immediate evaluation"));
+        }
         fakeTaskForContext.tags.addAll(tags);
         fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
         fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
index a92a641..84b1bb4 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Semaphore;
 
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.ReferenceWithError;
 import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.guava.Maybe;
 
@@ -72,12 +73,16 @@ public class InterruptingImmediateSupplier<T> implements ImmediateSupplier<T>, D
         return nestedSupplier.get();
     }
 
-    @SuppressWarnings("unchecked")
     public static <T> InterruptingImmediateSupplier<T> of(final Object o) {
+        return InterruptingImmediateSupplier.<T>ofSafe(o).get();
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> ReferenceWithError<InterruptingImmediateSupplier<T>> ofSafe(final Object o) {
         if (o instanceof Supplier) {
-            return new InterruptingImmediateSupplier<T>((Supplier<T>)o);
+            return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier<T>((Supplier<T>)o));
         } else if (o instanceof Callable) {
-            return new InterruptingImmediateSupplier<T>(new Supplier<T>() {
+            return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier<T>(new Supplier<T>() {
                 @Override
                 public T get() {
                     try {
@@ -86,18 +91,25 @@ public class InterruptingImmediateSupplier<T> implements ImmediateSupplier<T>, D
                         throw Exceptions.propagate(e);
                     }
                 }
-            });
+            }));
         } else if (o instanceof Runnable) {
-            return new InterruptingImmediateSupplier<T>(new Supplier<T>() {
+            return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier<T>(new Supplier<T>() {
                 @Override
                 public T get() {
                     ((Runnable)o).run();
                     return null;
                 }
-            });
+            }));
         } else {
-            throw new UnsupportedOperationException("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")");
+            return ReferenceWithError.newInstanceThrowingError(null, new InterruptingImmediateSupplierNotSupportedForObject(o)); 
         }
     }
 
+    public static class InterruptingImmediateSupplierNotSupportedForObject extends UnsupportedOperationException {
+        private static final long serialVersionUID = 307517409005386500L;
+
+        public InterruptingImmediateSupplierNotSupportedForObject(Object o) {
+            super("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 6644a9a..f8cb91b 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -361,37 +361,59 @@ public class ValueResolver<T> implements DeferredSupplier<T>, Iterable<Maybe<Obj
             return Maybe.of((T) v);
         
         try {
-            if (isEvaluatingImmediately() && v instanceof ImmediateSupplier) {
-                final ImmediateSupplier<Object> supplier = (ImmediateSupplier<Object>) v;
-                try {
-                    Maybe<Object> result = exec.getImmediately(supplier);
-                    
-                    // Recurse: need to ensure returned value is cast, etc
-                    return (result.isPresent())
-                            ? recursive
-                                ? new ValueResolver(result.get(), type, this).getMaybe()
-                                : result
-                            : result;
-                } catch (ImmediateSupplier.ImmediateUnsupportedException e) {
-                    log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e);
+            boolean allowImmediateExecution = false;
+            boolean bailOutAfterImmediateExecution = false;
+            
+            if (v instanceof ImmediateSupplier) {
+                allowImmediateExecution = true;
+                
+            } else {
+                if ((v instanceof TaskFactory<?>) && !(v instanceof DeferredSupplier)) {
+                    v = ((TaskFactory<?>)v).newTask();
+                    allowImmediateExecution = true;
+                    bailOutAfterImmediateExecution = true;
+                    BrooklynTaskTags.setTransient(((TaskAdaptable<?>)v).asTask());
+                    if (isEvaluatingImmediately()) {
+                        // not needed if executing immediately
+                        BrooklynTaskTags.addTagDynamically( ((TaskAdaptable<?>)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG );
+                    }
+                }
+                
+                //if it's a task or a future, we wait for the task to complete
+                if (v instanceof TaskAdaptable<?>) {
+                    v = ((TaskAdaptable<?>) v).asTask();
                 }
             }
             
-            // TODO if evaluating immediately should use a new ExecutionContext.submitImmediate(...)
-            // and sets a timeout but which wraps a task but does not spawn a new thread
-            
-            if ((v instanceof TaskFactory<?>) && !(v instanceof DeferredSupplier)) {
-                v = ((TaskFactory<?>)v).newTask();
-                BrooklynTaskTags.setTransient(((TaskAdaptable<?>)v).asTask());
-                if (isEvaluatingImmediately()) {
-                    BrooklynTaskTags.addTagDynamically( ((TaskAdaptable<?>)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG );
+            if (allowImmediateExecution && isEvaluatingImmediately()) {
+                // TODO could allow for everything, when evaluating immediately -- but if the target isn't safe to run again
+                // then we have to fail if immediate didn't work; to avoid breaking semantics we only do that for a few cases;
+                // might be nice to get to the point where we can break those semantics however, 
+                // ie weakening what getImmediate supports and making it be non-blocking, so that bailOut=true is the default.
+                // if: v instanceof TaskFactory -- it is safe, it's a new API (but it is currently the only one supported);
+                //     more than safe, we have to do it -- or add code here to cancel tasks -- because it spawns new tasks
+                //     (other objects passed through here don't get cancelled, because other things might try again later;
+                //     ie a task or future passed in here might naturally be long-running so cancelling is wrong,
+                //     but with a task factory generated task it would leak if we submitted and didn't cancel!)
+                // if: v instanceof ImmediateSupplier -- it probably is safe to change to bailOut = true  ?
+                // if: v instanceof Task or other things -- it currently isn't safe, there are places where
+                //     we expect to getImmediate on things which don't support it nicely,
+                //     and we rely on the blocking-short-wait behaviour, e.g. QuorumChecks in ConfigYamlTest
+                try {
+                    Maybe<T> result = execImmediate(exec, v);
+                    if (result!=null) return result;
+                    if (bailOutAfterImmediateExecution) {
+                        throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v);
+                    }
+                } catch (InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject o) {
+                    // ignore, continue below
+                    log.debug("Unable to resolve-immediately for "+description+" ("+v+", wrong type "+v.getClass()+"); falling back to executing with timeout");
                 }
             }
             
-            //if it's a task or a future, we wait for the task to complete
-            if (v instanceof TaskAdaptable<?>) {
+            if (v instanceof Task) {
                 //if it's a task, we make sure it is submitted
-                Task<?> task = ((TaskAdaptable<?>) v).asTask();
+                Task<?> task = (Task<?>) v;
                 if (!task.isSubmitted()) {
                     if (exec==null) {
                         return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available");
@@ -537,6 +559,24 @@ public class ValueResolver<T> implements DeferredSupplier<T>, Iterable<Maybe<Obj
         }
     }
 
+    protected Maybe<T> execImmediate(ExecutionContext exec, Object immediateSupplierOrImmediateTask) {
+        Maybe<T> result;
+        try {
+            result = exec.getImmediately(immediateSupplierOrImmediateTask);
+        } catch (ImmediateSupplier.ImmediateUnsupportedException e) {
+            return null;
+        }
+        // let InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject 
+        // bet thrown, and caller who cares will catch that to know it can continue
+        
+        // Recurse: need to ensure returned value is cast, etc
+        return (result.isPresent())
+            ? recursive
+                ? new ValueResolver<T>(result.get(), type, this).getMaybe()
+                    : result
+                    : result;
+    }
+
     protected String getDescription() {
         return description!=null ? description : ""+value;
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index 358f39d..550d475 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -24,8 +24,11 @@ import static org.testng.Assert.fail;
 
 import java.util.Arrays;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.test.Asserts;
@@ -36,6 +39,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.util.concurrent.Callables;
+
 /**
  * see also {@link TasksTest} for more tests
  */
@@ -219,6 +224,52 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
             assertEquals(result.getClass(), FailingImmediateAndDeferredSupplier.class);
     }
 
+    public void testTaskFactoryGet() {
+        TaskFactory<TaskAdaptable<String>> taskFactory = new TaskFactory<TaskAdaptable<String>>() {
+            @Override public TaskAdaptable<String> newTask() {
+                return new BasicTask<>(Callables.returning("myval"));
+            }
+        };
+        String result = Tasks.resolving(taskFactory).as(String.class).context(app).get();
+        assertEquals(result, "myval");
+    }
+    
+    public void testTaskFactoryGetImmediately() {
+        TaskFactory<TaskAdaptable<String>> taskFactory = new TaskFactory<TaskAdaptable<String>>() {
+            @Override public TaskAdaptable<String> newTask() {
+                return new BasicTask<>(Callables.returning("myval"));
+            }
+        };
+        String result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).get();
+        assertEquals(result, "myval");
+    }
+    
+    public void testTaskFactoryGetImmediatelyDoesNotBlock() {
+        final AtomicBoolean executing = new AtomicBoolean();
+        TaskFactory<TaskAdaptable<String>> taskFactory = new TaskFactory<TaskAdaptable<String>>() {
+            @Override public TaskAdaptable<String> newTask() {
+                return new BasicTask<>(new Callable<String>() {
+                    public String call() {
+                        executing.set(true);
+                        try {
+                            Time.sleep(Duration.ONE_MINUTE);
+                            return "myval";
+                        } finally {
+                            executing.set(false);
+                        }
+                    }});
+            }
+        };
+        Maybe<String> result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).getMaybe();
+        Asserts.assertTrue(result.isAbsent(), "result="+result);
+        // the call below default times out after 30s while the task above is still running
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                Asserts.assertFalse(executing.get());
+            }
+        });
+    }
+    
     private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier<CallInfo>, DeferredSupplier<CallInfo> {
         private final boolean failImmediately;