You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/06 08:06:42 UTC
[18/23] brooklyn-server git commit: switch
queue-or-submit-blocking-then-get invocations to new simpler
DynamicTasks.get. but note some things fail getImmediately if they are
running a queueing context eg EffectorSayHiTest, until fixed in the next PR.
switch queue-or-submit-blocking-then-get invocations to new simpler DynamicTasks.get.
but note some things fail getImmediately if they are running a queueing context eg EffectorSayHiTest,
until fixed in the next PR.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/98888708
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/98888708
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/98888708
Branch: refs/heads/master
Commit: 98888708957d0cea0f6bf219c5ea30b243094138
Parents: 0c2e1f6
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Oct 2 15:43:02 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:18 2017 +0100
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ExecutionContext.java | 16 +++---
.../core/entity/trait/StartableMethods.java | 6 +--
.../core/location/BasicMachineDetails.java | 5 +-
.../location/access/BrooklynAccessUtils.java | 2 +-
.../core/objs/proxy/EntityProxyImpl.java | 2 +-
.../entity/group/DynamicClusterImpl.java | 7 +--
.../brooklyn/util/core/task/DynamicTasks.java | 53 +++++++++++++++-----
.../apache/brooklyn/core/feed/PollerTest.java | 2 +-
.../core/mgmt/rebind/RebindManagerTest.java | 13 +----
.../entity/group/DynamicClusterTest.java | 8 +--
.../SameServerDriverLifecycleEffectorTasks.java | 5 +-
11 files changed, 63 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 3ef3470..2d8fe23 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -101,16 +101,18 @@ public interface ExecutionContext extends Executor {
<T> Maybe<T> getImmediately(Task<T> callableOrSupplierOrTask);
/**
- * Efficient shortcut for {@link #submit(TaskAdaptable)} followed by an immediate {@link Task#get()}.
+ * Efficient implementation of common case when {@link #submit(TaskAdaptable)} is followed by an immediate {@link Task#get()}.
* <p>
- * Implementations will typically attempt to execute in the current thread, with appropriate
- * configuration to make it look like it is in a sub-thread,
- * ie registering this as a task and allowing
- * context methods on tasks to see the given sub-task.
+ * This is efficient in that it may typically attempt to execute in the current thread,
+ * with appropriate configuration to make it look like it is in a sub-thread,
+ * ie registering this as a task and allowing context methods on tasks to see the given sub-task.
+ * However it will normally be non-blocking which reduces overhead and
+ * is permissible within a {@link #getImmediately(Object)} task
* <p>
- * If the argument has already been submitted it simply blocks on it.
+ * If the argument has already been submitted it simply blocks on it
+ * (i.e. no additional execution, and in that case would fail within a {@link #getImmediately(Object)}).
*
- * @param task
+ * @param task the task whose result is being sought
* @return result of the task execution
*/
@Beta
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
index 1883166..30b2348 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
@@ -49,20 +49,20 @@ public class StartableMethods {
/** Common implementation for start in parent nodes; just invokes start on all children of the entity */
public static void start(Entity e, Collection<? extends Location> locations) {
log.debug("Starting entity "+e+" at "+locations);
- DynamicTasks.queueIfPossible(startingChildren(e, locations)).orSubmitAsync(e).getTask().getUnchecked();
+ DynamicTasks.get(startingChildren(e, locations), e);
}
/** Common implementation for stop in parent nodes; just invokes stop on all children of the entity */
public static void stop(Entity e) {
log.debug("Stopping entity "+e);
- DynamicTasks.queueIfPossible(stoppingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
+ DynamicTasks.get(stoppingChildren(e), e);
if (log.isDebugEnabled()) log.debug("Stopped entity "+e);
}
/** Common implementation for restart in parent nodes; just invokes restart on all children of the entity */
public static void restart(Entity e) {
log.debug("Restarting entity "+e);
- DynamicTasks.queueIfPossible(restartingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
+ DynamicTasks.get(restartingChildren(e), e);
if (log.isDebugEnabled()) log.debug("Restarted entity "+e);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
index f8a7cce..7906616 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
@@ -96,10 +96,7 @@ public class BasicMachineDetails implements MachineDetails {
*/
@Beta
public static BasicMachineDetails forSshMachineLocationLive(SshMachineLocation location) {
- return TaskTags.markInessential(DynamicTasks.queueIfPossible(taskForSshMachineLocation(location))
- .orSubmitAsync()
- .asTask())
- .getUnchecked();
+ return DynamicTasks.get(TaskTags.markInessential(taskForSshMachineLocation(location)));
}
/**
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
index 40f71e2..35bc2f8 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
@@ -119,7 +119,7 @@ public class BrooklynAccessUtils {
public static String getResolvedAddress(Entity entity, SshMachineLocation origin, String hostnameTarget) {
ProcessTaskWrapper<Integer> task = SshTasks.newSshExecTaskFactory(origin, "ping -c 1 -t 1 "+hostnameTarget)
.summary("checking resolution of "+hostnameTarget).allowingNonZeroExitCode().newTask();
- DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).asTask().blockUntilEnded();
+ DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).getTask().blockUntilEnded();
if (task.asTask().isError()) {
log.warn("ping could not be run, at "+entity+" / "+origin+": "+Tasks.getError(task.asTask()));
return "";
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
index daaf18b..4b6e704 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
@@ -209,7 +209,7 @@ public class EntityProxyImpl implements java.lang.reflect.InvocationHandler {
TaskAdaptable<?> task = ((EffectorWithBody)eff).getBody().newTask(delegate, eff, ConfigBag.newInstance(parameters));
// as per LocalManagementContext.runAtEntity(Entity entity, TaskAdaptable<T> task)
TaskTags.markInessential(task);
- result = DynamicTasks.queueIfPossible(task.asTask()).orSubmitAsync(delegate).andWaitForSuccess();
+ result = DynamicTasks.get(task.asTask(), delegate);
} else {
result = m.invoke(delegate, nonNullArgs);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 23b48f0..b9a9041 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -827,10 +827,8 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
for (Entity member : removedStartables) {
tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()));
}
- Task<?> invoke = Tasks.parallel(tasks.build());
- DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
try {
- invoke.get();
+ DynamicTasks.get( Tasks.parallel(tasks.build()) );
return removedEntities;
} catch (Exception e) {
throw Exceptions.propagate(e);
@@ -1075,8 +1073,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
try {
if (member instanceof Startable) {
Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
- DynamicTasks.queueIfPossible(task).orSubmitAsync();
- task.getUnchecked();
+ DynamicTasks.get(task);
}
} finally {
Entities.unmanage(member);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 5426baf..20b80ab 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -41,6 +41,16 @@ import com.google.common.collect.Iterables;
/**
* Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks.
+ * <p>
+ * Queueing is supported by some task contexts (eg {@link DynamicSequentialTask}) to let that task
+ * build up a complex sequence of tasks and logic. This utility class gives conveniences to allow:
+ * <p>
+ * <li> "queue-if-possible-else-submit-async", so that it is backgrounded, using queueing semantics if available;
+ * <li> "queue-if-possible-else-submit-blocking", so that it is in the queue if there is one, else it will complete synchronously;
+ * <li> "queue-if-possible-else-submit-and-in-both-cases-block", so that it is returned immediately, but waits in its queue if there is one.
+ * <p>
+ * Over time the last mode has been the most prevalent and {@link #get(TaskAdaptable)} is introduced here
+ * as a convenience. If a timeout is desired then the first should be used.
*
* @since 0.6.0
*/
@@ -126,36 +136,50 @@ public class DynamicTasks {
return false;
}
}
- /** causes the task to be submitted (asynchronously) if it hasn't already been,
- * requiring an entity execution context (will try to find a default if not set) */
+ /** Causes the task to be submitted (asynchronously) if it hasn't already been,
+ * such as if a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context.
+ * <p>
+ * An {@link #executionContext(ExecutionContext)} should typically have been set
+ * (or use {@link #orSubmitAsync(Entity)}).
+ */
public TaskQueueingResult<T> orSubmitAsync() {
orSubmitInternal(false);
return this;
}
- /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
+ /** Convenience for setting {@link #executionContext(Entity)} then {@link #orSubmitAsync()}. */
public TaskQueueingResult<T> orSubmitAsync(Entity entity) {
executionContext(entity);
return orSubmitAsync();
}
- /** causes the task to be submitted *synchronously* if it hasn't already been submitted;
- * useful in contexts such as libraries where callers may be either on a legacy call path
- * (which assumes all commands complete immediately);
- * requiring an entity execution context (will try to find a default if not set) */
+ /** Alternative to {@link #orSubmitAsync()} but where, if the submission is needed
+ * (usually because a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context)
+ * it will wait until execution completes (and in fact will execute the task in this thread,
+ * as per {@link ExecutionContext#get(TaskAdaptable)}.
+ * <p>
+ * If the task is already queued, this method does nothing, not even blocks,
+ * to permit cases where a caller is building up a set of tasks to be executed sequentially:
+ * with a queueing context the caller can line them all up, but without that the caller needs this task
+ * finished before submitting subsequent tasks.
+ * <p>
+ * If blocking is desired in all cases and this call should fail on task failure, invoke {@link #andWaitForSuccess()} on the result,
+ * or consider using {@link DynamicTasks#get(TaskAdaptable)} instead of this method,
+ * or {@link DynamicTasks#get(TaskAdaptable, Entity)} if an execuiton context a la {@link #orSubmitAndBlock(Entity)} is needed. */
public TaskQueueingResult<T> orSubmitAndBlock() {
orSubmitInternal(true);
return this;
}
- /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
+ /** Variant of {@link #orSubmitAndBlock()} doing what {@link #orSubmitAsync(Entity)} does for {@link #orSubmitAsync()}. */
public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) {
executionContext(entity);
return orSubmitAndBlock();
}
- /** blocks for the task to be completed
+ /** Blocks for the task to be completed, throwing if there are any errors
+ * and otherwise returning the value.
* <p>
- * needed in any context where subsequent commands assume the task has completed.
+ * In addition to cases where a result is wanted, this is needed in any context where subsequent commands assume the task has completed.
* not needed in a context where the task is simply being built up and queued.
* <p>
- * throws if there are any errors
+ *
*/
public T andWaitForSuccess() {
return task.getUnchecked();
@@ -170,7 +194,7 @@ public class DynamicTasks {
/**
* Tries to add the task to the current addition context if there is one, otherwise does nothing.
* <p/>
- * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned
+ * Call {@link TaskQueueingResult#orSubmitAsync()} on the returned
* {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a
* {@link BasicExecutionContext}.
*/
@@ -332,6 +356,11 @@ public class DynamicTasks {
public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) {
return queueIfPossible(task).orSubmitAsync(entity).asTask();
}
+
+ /** queues the task if possible and waits for the result, otherwise executes synchronously as per {@link ExecutionContext#get(TaskAdaptable)} */
+ public static <T> T get(TaskAdaptable<T> task, Entity e) {
+ return queueIfPossible(task).orSubmitAndBlock(e).andWaitForSuccess();
+ }
/** Breaks the parent-child relation between Tasks.current() and the task passed,
* making the new task a top-level one at the target entity.
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
index 2251153..f270e54 100644
--- a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
@@ -133,7 +133,7 @@ public class PollerTest extends BrooklynAppUnitTestSupport {
}
})
.build();
- return DynamicTasks.queueIfPossible(t).orSubmitAsync().asTask().getUnchecked();
+ return DynamicTasks.get(t);
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
index 45589b1..25eb00f 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
@@ -20,11 +20,8 @@ package org.apache.brooklyn.core.mgmt.rebind;
import static org.testng.Assert.assertEquals;
-import java.util.concurrent.Callable;
-
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.core.test.entity.TestEntityImpl;
import org.apache.brooklyn.util.core.task.BasicTask;
@@ -48,15 +45,7 @@ public class RebindManagerTest extends RebindTestFixtureWithApp {
@Override
public void rebind() {
super.rebind();
- Task<String> task = new BasicTask<String>(new Callable<String>() {
- @Override public String call() {
- return "abc";
- }});
- String val = DynamicTasks.queueIfPossible(task)
- .orSubmitAsync()
- .asTask()
- .getUnchecked();
- sensors().set(TestEntity.NAME, val);
+ sensors().set(TestEntity.NAME, DynamicTasks.get(new BasicTask<String>(() -> "abc")));
}
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 80f1be6..0827664 100644
--- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -1325,12 +1325,8 @@ public class DynamicClusterTest extends AbstractDynamicClusterOrFabricTest {
.body(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
- DynamicTasks.queueIfPossible(first).orSubmitAsync();
- final Entity source = first.get();
- final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
- DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
- return booleanTask.get();
+ final Entity source = DynamicTasks.get( DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST) );
+ return DynamicTasks.get( DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP) );
}
})
.build();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
index faed3db..52ce82f 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
@@ -111,10 +111,7 @@ public class SameServerDriverLifecycleEffectorTasks extends MachineLifecycleEffe
@Override
protected String startProcessesAtMachine(Supplier<MachineLocation> machineS) {
- DynamicTasks.queueIfPossible(StartableMethods.startingChildren(entity(), machineS.get()))
- .orSubmitAsync(entity())
- .getTask()
- .getUnchecked();
+ DynamicTasks.get(StartableMethods.startingChildren(entity(), machineS.get()), entity());
DynamicTasks.waitForLast();
return "children started";
}