You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/06 08:06:42 UTC

[18/23] brooklyn-server git commit: switch queue-or-submit-blocking-then-get invocations to new simpler DynamicTasks.get. but note some things fail getImmediately if they are running a queueing context eg EffectorSayHiTest, until fixed in the next PR.

switch queue-or-submit-blocking-then-get invocations to new simpler DynamicTasks.get.
but note some things fail getImmediately if they are running a queueing context eg EffectorSayHiTest,
until fixed in the next PR.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/98888708
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/98888708
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/98888708

Branch: refs/heads/master
Commit: 98888708957d0cea0f6bf219c5ea30b243094138
Parents: 0c2e1f6
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Oct 2 15:43:02 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:18 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ExecutionContext.java     | 16 +++---
 .../core/entity/trait/StartableMethods.java     |  6 +--
 .../core/location/BasicMachineDetails.java      |  5 +-
 .../location/access/BrooklynAccessUtils.java    |  2 +-
 .../core/objs/proxy/EntityProxyImpl.java        |  2 +-
 .../entity/group/DynamicClusterImpl.java        |  7 +--
 .../brooklyn/util/core/task/DynamicTasks.java   | 53 +++++++++++++++-----
 .../apache/brooklyn/core/feed/PollerTest.java   |  2 +-
 .../core/mgmt/rebind/RebindManagerTest.java     | 13 +----
 .../entity/group/DynamicClusterTest.java        |  8 +--
 .../SameServerDriverLifecycleEffectorTasks.java |  5 +-
 11 files changed, 63 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 3ef3470..2d8fe23 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -101,16 +101,18 @@ public interface ExecutionContext extends Executor {
     <T> Maybe<T> getImmediately(Task<T> callableOrSupplierOrTask);
 
     /**
-     * Efficient shortcut for {@link #submit(TaskAdaptable)} followed by an immediate {@link Task#get()}.
+     * Efficient implementation of common case when {@link #submit(TaskAdaptable)} is followed by an immediate {@link Task#get()}.
      * <p>
-     * Implementations will typically attempt to execute in the current thread, with appropriate
-     * configuration to make it look like it is in a sub-thread, 
-     * ie registering this as a task and allowing
-     * context methods on tasks to see the given sub-task.
+     * This is efficient in that it may typically attempt to execute in the current thread, 
+     * with appropriate configuration to make it look like it is in a sub-thread, 
+     * ie registering this as a task and allowing context methods on tasks to see the given sub-task.
+     * However it will normally be non-blocking which reduces overhead and 
+     * is permissible within a {@link #getImmediately(Object)} task
      * <p>
-     * If the argument has already been submitted it simply blocks on it.
+     * If the argument has already been submitted it simply blocks on it 
+     * (i.e. no additional execution, and in that case would fail within a {@link #getImmediately(Object)}).
      * 
-     * @param task
+     * @param task the task whose result is being sought
      * @return result of the task execution
      */
     @Beta

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
index 1883166..30b2348 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
@@ -49,20 +49,20 @@ public class StartableMethods {
     /** Common implementation for start in parent nodes; just invokes start on all children of the entity */
     public static void start(Entity e, Collection<? extends Location> locations) {
         log.debug("Starting entity "+e+" at "+locations);
-        DynamicTasks.queueIfPossible(startingChildren(e, locations)).orSubmitAsync(e).getTask().getUnchecked();
+        DynamicTasks.get(startingChildren(e, locations), e);
     }
     
     /** Common implementation for stop in parent nodes; just invokes stop on all children of the entity */
     public static void stop(Entity e) {
         log.debug("Stopping entity "+e);
-        DynamicTasks.queueIfPossible(stoppingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
+        DynamicTasks.get(stoppingChildren(e), e);
         if (log.isDebugEnabled()) log.debug("Stopped entity "+e);
     }
 
     /** Common implementation for restart in parent nodes; just invokes restart on all children of the entity */
     public static void restart(Entity e) {
         log.debug("Restarting entity "+e);
-        DynamicTasks.queueIfPossible(restartingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
+        DynamicTasks.get(restartingChildren(e), e);
         if (log.isDebugEnabled()) log.debug("Restarted entity "+e);
     }
     

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
index f8a7cce..7906616 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
@@ -96,10 +96,7 @@ public class BasicMachineDetails implements MachineDetails {
      */
     @Beta
     public static BasicMachineDetails forSshMachineLocationLive(SshMachineLocation location) {
-        return TaskTags.markInessential(DynamicTasks.queueIfPossible(taskForSshMachineLocation(location))
-                .orSubmitAsync()
-                .asTask())
-                .getUnchecked();
+        return DynamicTasks.get(TaskTags.markInessential(taskForSshMachineLocation(location)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
index 40f71e2..35bc2f8 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
@@ -119,7 +119,7 @@ public class BrooklynAccessUtils {
     public static String getResolvedAddress(Entity entity, SshMachineLocation origin, String hostnameTarget) {
         ProcessTaskWrapper<Integer> task = SshTasks.newSshExecTaskFactory(origin, "ping -c 1 -t 1 "+hostnameTarget)
             .summary("checking resolution of "+hostnameTarget).allowingNonZeroExitCode().newTask();
-        DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).asTask().blockUntilEnded();
+        DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).getTask().blockUntilEnded();
         if (task.asTask().isError()) {
             log.warn("ping could not be run, at "+entity+" / "+origin+": "+Tasks.getError(task.asTask()));
             return "";

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
index daaf18b..4b6e704 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
@@ -209,7 +209,7 @@ public class EntityProxyImpl implements java.lang.reflect.InvocationHandler {
                     TaskAdaptable<?> task = ((EffectorWithBody)eff).getBody().newTask(delegate, eff, ConfigBag.newInstance(parameters));
                     // as per LocalManagementContext.runAtEntity(Entity entity, TaskAdaptable<T> task) 
                     TaskTags.markInessential(task);
-                    result = DynamicTasks.queueIfPossible(task.asTask()).orSubmitAsync(delegate).andWaitForSuccess();
+                    result = DynamicTasks.get(task.asTask(), delegate);
                 } else {
                     result = m.invoke(delegate, nonNullArgs);
                 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 23b48f0..b9a9041 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -827,10 +827,8 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
         for (Entity member : removedStartables) {
             tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()));
         }
-        Task<?> invoke = Tasks.parallel(tasks.build());
-        DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
         try {
-            invoke.get();
+            DynamicTasks.get( Tasks.parallel(tasks.build()) );
             return removedEntities;
         } catch (Exception e) {
             throw Exceptions.propagate(e);
@@ -1075,8 +1073,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
         try {
             if (member instanceof Startable) {
                 Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
-                DynamicTasks.queueIfPossible(task).orSubmitAsync();
-                task.getUnchecked();
+                DynamicTasks.get(task);
             }
         } finally {
             Entities.unmanage(member);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 5426baf..20b80ab 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -41,6 +41,16 @@ import com.google.common.collect.Iterables;
 
 /** 
  * Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks.
+ * <p>
+ * Queueing is supported by some task contexts (eg {@link DynamicSequentialTask}) to let that task
+ * build up a complex sequence of tasks and logic. This utility class gives conveniences to allow:
+ * <p>
+ * <li> "queue-if-possible-else-submit-async", so that it is backgrounded, using queueing semantics if available;
+ * <li> "queue-if-possible-else-submit-blocking", so that it is in the queue if there is one, else it will complete synchronously;
+ * <li> "queue-if-possible-else-submit-and-in-both-cases-block", so that it is returned immediately, but waits in its queue if there is one.
+ * <p>
+ * Over time the last mode has been the most prevalent and {@link #get(TaskAdaptable)} is introduced here
+ * as a convenience.  If a timeout is desired then the first should be used.
  * 
  * @since 0.6.0
  */
@@ -126,36 +136,50 @@ public class DynamicTasks {
                 return false;
             }
         }
-        /** causes the task to be submitted (asynchronously) if it hasn't already been,
-         * requiring an entity execution context (will try to find a default if not set) */
+        /** Causes the task to be submitted (asynchronously) if it hasn't already been,
+         * such as if a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context.
+         * <p>
+         * An {@link #executionContext(ExecutionContext)} should typically have been set
+         * (or use {@link #orSubmitAsync(Entity)}).
+         */
         public TaskQueueingResult<T> orSubmitAsync() {
             orSubmitInternal(false);
             return this;
         }
-        /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
+        /** Convenience for setting {@link #executionContext(Entity)} then {@link #orSubmitAsync()}. */
         public TaskQueueingResult<T> orSubmitAsync(Entity entity) {
             executionContext(entity);
             return orSubmitAsync();
         }
-        /** causes the task to be submitted *synchronously* if it hasn't already been submitted;
-         * useful in contexts such as libraries where callers may be either on a legacy call path 
-         * (which assumes all commands complete immediately);
-         * requiring an entity execution context (will try to find a default if not set) */
+        /** Alternative to {@link #orSubmitAsync()} but where, if the submission is needed
+         * (usually because a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context)
+         * it will wait until execution completes (and in fact will execute the task in this thread,
+         * as per {@link ExecutionContext#get(TaskAdaptable)}. 
+         * <p>
+         * If the task is already queued, this method does nothing, not even blocks,
+         * to permit cases where a caller is building up a set of tasks to be executed sequentially:
+         * with a queueing context the caller can line them all up, but without that the caller needs this task
+         * finished before submitting subsequent tasks. 
+         * <p>
+         * If blocking is desired in all cases and this call should fail on task failure, invoke {@link #andWaitForSuccess()} on the result,
+         * or consider using {@link DynamicTasks#get(TaskAdaptable)} instead of this method,
+         * or {@link DynamicTasks#get(TaskAdaptable, Entity)} if an execuiton context a la {@link #orSubmitAndBlock(Entity)} is needed. */
         public TaskQueueingResult<T> orSubmitAndBlock() {
             orSubmitInternal(true);
             return this;
         }
-        /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
+        /** Variant of {@link #orSubmitAndBlock()} doing what {@link #orSubmitAsync(Entity)} does for {@link #orSubmitAsync()}. */
         public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) {
             executionContext(entity);
             return orSubmitAndBlock();
         }
-        /** blocks for the task to be completed
+        /** Blocks for the task to be completed, throwing if there are any errors
+         * and otherwise returning the value.
          * <p>
-         * needed in any context where subsequent commands assume the task has completed.
+         * In addition to cases where a result is wanted, this is needed in any context where subsequent commands assume the task has completed.
          * not needed in a context where the task is simply being built up and queued.
          * <p>
-         * throws if there are any errors
+         * 
          */
         public T andWaitForSuccess() {
             return task.getUnchecked();
@@ -170,7 +194,7 @@ public class DynamicTasks {
     /**
      * Tries to add the task to the current addition context if there is one, otherwise does nothing.
      * <p/>
-     * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned
+     * Call {@link TaskQueueingResult#orSubmitAsync()} on the returned
      * {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a
      * {@link BasicExecutionContext}.
      */
@@ -332,6 +356,11 @@ public class DynamicTasks {
     public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) {
         return queueIfPossible(task).orSubmitAsync(entity).asTask();
     }
+    
+    /** queues the task if possible and waits for the result, otherwise executes synchronously as per {@link ExecutionContext#get(TaskAdaptable)} */
+    public static <T> T get(TaskAdaptable<T> task, Entity e) {
+        return queueIfPossible(task).orSubmitAndBlock(e).andWaitForSuccess();
+    }
 
     /** Breaks the parent-child relation between Tasks.current() and the task passed,
      *  making the new task a top-level one at the target entity.

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
index 2251153..f270e54 100644
--- a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
@@ -133,7 +133,7 @@ public class PollerTest extends BrooklynAppUnitTestSupport {
                         }
                     })
                     .build();
-            return DynamicTasks.queueIfPossible(t).orSubmitAsync().asTask().getUnchecked();
+            return DynamicTasks.get(t);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
index 45589b1..25eb00f 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
@@ -20,11 +20,8 @@ package org.apache.brooklyn.core.mgmt.rebind;
 
 import static org.testng.Assert.assertEquals;
 
-import java.util.concurrent.Callable;
-
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.test.entity.TestEntityImpl;
 import org.apache.brooklyn.util.core.task.BasicTask;
@@ -48,15 +45,7 @@ public class RebindManagerTest extends RebindTestFixtureWithApp {
         @Override
         public void rebind() {
             super.rebind();
-            Task<String> task = new BasicTask<String>(new Callable<String>() {
-                @Override public String call() {
-                    return "abc";
-                }});
-            String val = DynamicTasks.queueIfPossible(task)
-                    .orSubmitAsync()
-                    .asTask()
-                    .getUnchecked();
-            sensors().set(TestEntity.NAME, val);
+            sensors().set(TestEntity.NAME, DynamicTasks.get(new BasicTask<String>(() -> "abc")));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 80f1be6..0827664 100644
--- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -1325,12 +1325,8 @@ public class DynamicClusterTest extends AbstractDynamicClusterOrFabricTest {
                 .body(new Callable<Boolean>() {
                     @Override
                     public Boolean call() throws Exception {
-                        Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
-                        DynamicTasks.queueIfPossible(first).orSubmitAsync();
-                        final Entity source = first.get();
-                        final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
-                        DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
-                        return booleanTask.get();
+                        final Entity source = DynamicTasks.get( DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST) );
+                        return DynamicTasks.get( DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP) );
                     }
                 })
                 .build();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
index faed3db..52ce82f 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
@@ -111,10 +111,7 @@ public class SameServerDriverLifecycleEffectorTasks extends MachineLifecycleEffe
 
     @Override
     protected String startProcessesAtMachine(Supplier<MachineLocation> machineS) {
-        DynamicTasks.queueIfPossible(StartableMethods.startingChildren(entity(), machineS.get()))
-                .orSubmitAsync(entity())
-                .getTask()
-                .getUnchecked();
+        DynamicTasks.get(StartableMethods.startingChildren(entity(), machineS.get()), entity());
         DynamicTasks.waitForLast();
         return "children started";
     }