You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/03 14:23:58 UTC
[21/35] brooklyn-server git commit: apply context-switching task
wrapper for same-thread and immediate tasks
apply context-switching task wrapper for same-thread and immediate tasks
now behave same as submitted tasks. add tests, and comments about limits of getImmediately.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/2bdcf1ac
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/2bdcf1ac
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/2bdcf1ac
Branch: refs/heads/master
Commit: 2bdcf1ac81853410100920968f92a82c41b1f5b6
Parents: ab480ed
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Sep 18 12:49:51 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 16:40:56 2017 +0100
----------------------------------------------------------------------
.../brooklyn/core/effector/Effectors.java | 2 +-
.../util/core/task/BasicExecutionContext.java | 158 +++++++++++--------
.../core/effector/EffectorSayHiTest.java | 19 ++-
3 files changed, 114 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2bdcf1ac/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
index 53db25a..653c607 100644
--- a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
+++ b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
@@ -202,7 +202,7 @@ public class Effectors {
/** returns an unsubmitted task which will invoke the given effector on the given entities
* (this form of method is a convenience for {@link #invocation(Effector, Map, Iterable)}) */
- public static TaskAdaptable<List<?>> invocation(Effector<?> eff, MutableMap<?, ?> params, Entity ...entities) {
+ public static TaskAdaptable<List<?>> invocation(Effector<?> eff, Map<?, ?> params, Entity ...entities) {
return invocation(eff, params, Arrays.asList(entities));
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2bdcf1ac/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 114aac7..429cad7 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -21,6 +21,7 @@ package org.apache.brooklyn.util.core.task;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -133,6 +134,11 @@ public class BasicExecutionContext extends AbstractExecutionContext {
throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+t);
}
}
+
+ ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(t, Collections.emptyList(), false);
+ if (switchContextWrapper!=null) {
+ return switchContextWrapper.context.get(switchContextWrapper.wrapperTask);
+ }
try {
return runInSameThread(t, new Callable<Maybe<T>>() {
@@ -213,12 +219,6 @@ public class BasicExecutionContext extends AbstractExecutionContext {
try {
((BasicExecutionManager)executionManager).afterSubmitRecordFuture(task, future);
((BasicExecutionManager)executionManager).beforeStartInSameThreadTask(null, task);
-
- // TODO this does not apply the same context-switching logic as submit;
- // means if task on X submits via this method a non-child (non-queued) task for an effector on Y,
- // a request to get all of X's tasks recursively won't pick up the call on Y
- // (because parent tasks don't have records of submitted tasks unless they are children)
-
return future.set(job.call());
} catch (Exception e) {
@@ -253,24 +253,35 @@ public class BasicExecutionContext extends AbstractExecutionContext {
fakeTaskForContext = (BasicTask<T>)callableOrSupplier;
if (fakeTaskForContext.isQueuedOrSubmitted()) {
if (fakeTaskForContext.isDone()) {
- return Maybe.of((T)fakeTaskForContext.getUnchecked());
+ return Maybe.of(fakeTaskForContext.getUnchecked());
} else {
throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext);
}
}
callableOrSupplier = fakeTaskForContext.getJob();
+ } else if (callableOrSupplier instanceof TaskAdaptable) {
+ return getImmediately( ((TaskAdaptable<T>)callableOrSupplier).asTask() );
} else {
- fakeTaskForContext = new BasicTask<T>(MutableMap.of("displayName", "immediate evaluation"));
+ fakeTaskForContext = new BasicTask<T>(MutableMap.of("displayName", "Immediate evaluation"));
}
final ImmediateSupplier<T> job = callableOrSupplier instanceof ImmediateSupplier ? (ImmediateSupplier<T>) callableOrSupplier
: InterruptingImmediateSupplier.<T>of(callableOrSupplier);
fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
+ ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(fakeTaskForContext, Collections.emptyList(), true);
+ if (switchContextWrapper!=null) {
+ return switchContextWrapper.context.getImmediately(switchContextWrapper.wrapperTask);
+ }
+
try {
return runInSameThread(fakeTaskForContext, new Callable<Maybe<T>>() {
public Maybe<T> call() {
- // TODO do we really want to cancel? means if it submits other things they won't run, even if they return immediately
+ // TODO could try to make this work for more types of tasks by not cancelling, it just interrupting;
+ // however the biggest place "getImmediate" fails is with DSTs where interrupting is sufficient to abort them
+ // unnecessarily, as queue.andWait attempts to block (again, unnecessarily, but not a straightforward fix).
+ // limited success of getImmediately is okay -- but no harm in expanding coverage by resolving that and removing cancel.
+ // see WIP test in EffectorSayHiTest
fakeTaskForContext.cancel();
boolean wasAlreadyInterrupted = Thread.interrupted();
@@ -294,68 +305,18 @@ public class BasicExecutionContext extends AbstractExecutionContext {
return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask());
Map properties = MutableMap.copyOf(propertiesQ);
- Collection taskTags;
+ Collection<Object> taskTags;
if (properties.get("tags")==null) {
taskTags = new ArrayList();
} else {
taskTags = new ArrayList((Collection)properties.get("tags"));
}
properties.put("tags", taskTags);
-
- // FIXME some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass;
- // the issue is that we want to ensure that cross-entity calls switch execution contexts;
- // previously it was all very messy how that was handled (and it didn't really handle it in many cases)
if (task instanceof Task<?>) taskTags.addAll( ((Task<?>)task).getTags() );
- Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY);
- checkUserSuppliedContext(task, taskTags);
-
- if (target!=null && !tags.contains(BrooklynTaskTags.tagForContextEntity(target))) {
- // task is switching execution context boundaries
- /*
- * longer notes:
- * you fall in to this block if the caller requests a target entity different to the current context
- * (e.g. where entity X is invoking an effector on Y, it will start in X's context,
- * but the effector should run in Y's context).
- *
- * we need to make sure there is a reference from this execution context to the submitted task,
- * IE the submitted task is a child of something in this execution context.
- * this ensures it shows up via the REST API and in the UI; without it we lose the reference to the child when browsing in the context of the parent.
- *
- * if it is queued or it is already recorded as a child we can simply submit in target context;
- * but if not we need to wrap it in a task running in this context with the submitted task as a child to have that reference.
- */
- final ExecutionContext tc = ((EntityInternal)target).getExecutionContext();
- if (log.isDebugEnabled())
- log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")");
-
- final Task<T> t;
- if (task instanceof Task<?>) {
- t = (Task<T>)task;
- if (Tasks.isQueuedOrSubmitted(t) ||
- ((Tasks.current() instanceof HasTaskChildren) && Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) {
- // we are already tracked by parent, just submit it
- return tc.submit(t);
- }
- } else {
- // for callables and runnables there is definitely no record
- if (task instanceof Callable) {
- t = Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build();
- } else if (task instanceof Runnable) {
- t = Tasks.<T>builder().dynamic(false).body((Runnable)task).build();
- } else {
- throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null"));
- }
- }
-
- return submit(
- // 2017-09 changed, doesn't have to be a dynamic task; can be a simple sequential task wrapping the child
- Tasks.<T>builder().displayName("Cross-context execution: "+t.getDescription()).dynamic(false).parallel(false).body(new Callable<T>() {
- @Override
- public T call() {
- return tc.get(t);
- }
- }).build() );
+ ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(task, taskTags, false);
+ if (switchContextWrapper!=null) {
+ return switchContextWrapper.context.submit(switchContextWrapper.wrapperTask);
}
EntitlementContext entitlementContext = BrooklynTaskTags.getEntitlement(taskTags);
@@ -405,6 +366,77 @@ public class BasicExecutionContext extends AbstractExecutionContext {
}
}
+ private static class ContextSwitchingInfo<T> {
+ final ExecutionContext context;
+ final Task<T> wrapperTask;
+ ContextSwitchingInfo(ExecutionContext context, Task<T> wrapperTask) {
+ this.context = context;
+ this.wrapperTask = wrapperTask;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> ContextSwitchingInfo<T> getContextSwitchingTask(final Object task, Collection<Object> taskTags, boolean immediate) {
+ checkUserSuppliedContext(task, taskTags);
+
+ Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY);
+ if (target==null || tags.contains(BrooklynTaskTags.tagForContextEntity(target))) {
+ return null;
+ }
+
+ // task is switching execution context boundaries
+
+ // some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass;
+ // the issue is that we want to ensure that cross-entity calls switch execution contexts;
+ // previously it was all very messy how that was handled (and it didn't really handle it in many cases)
+
+ /*
+ * longer notes:
+ * you fall in to this block if the caller requests a target entity different to the current context
+ * (e.g. where entity X is invoking an effector on Y, it will start in X's context,
+ * but the effector should run in Y's context).
+ *
+ * we need to make sure there is a reference from this execution context to the submitted task,
+ * IE the submitted task is a child of something in this execution context.
+ * this ensures it shows up via the REST API and in the UI; without it we lose the reference to the child when browsing in the context of the parent.
+ *
+ * if it is queued or it is already recorded as a child we can simply submit in target context;
+ * but if not we need to wrap it in a task running in this context with the submitted task as a child to have that reference.
+ */
+ final ExecutionContext tc = ((EntityInternal)target).getExecutionContext();
+ if (log.isDebugEnabled())
+ log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")");
+
+ final Task<T> t;
+ if (task instanceof Task<?>) {
+ t = (Task<T>)task;
+ if (Tasks.isQueuedOrSubmitted(t) ||
+ ((Tasks.current() instanceof HasTaskChildren) && Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) {
+ // we are already tracked by parent, just submit it
+ return new ContextSwitchingInfo<>(tc, t);
+ }
+ } else {
+ // for callables and runnables there is definitely no record
+ if (task instanceof Callable) {
+ t = Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build();
+ } else if (task instanceof Runnable) {
+ t = Tasks.<T>builder().dynamic(false).body((Runnable)task).build();
+ } else {
+ throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null"));
+ }
+ }
+
+ return
+ // 2017-09 changed, doesn't have to be a dynamic task; can be a simple sequential task wrapping the child
+ new ContextSwitchingInfo<>(tc, Tasks.<T>builder().displayName("Cross-context execution: "+t.getDescription()).dynamic(false).parallel(false).body(new Callable<T>() {
+ @Override
+ public T call() throws Exception {
+ if (immediate) return tc.<T>getImmediately(t).get();
+ return tc.get(t);
+ }
+ }).build());
+ }
+
private void registerPerThreadExecutionContext() { perThreadExecutionContext.set(this); }
private void clearPerThreadExecutionContext() { perThreadExecutionContext.remove(); }
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2bdcf1ac/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
index df8a2cb..9039cf2 100644
--- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
@@ -34,8 +34,8 @@ import org.apache.brooklyn.api.entity.ImplementedBy;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.annotation.EffectorParam;
-import org.apache.brooklyn.core.effector.MethodEffector;
import org.apache.brooklyn.core.entity.AbstractEntity;
+import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
@@ -99,6 +99,23 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
}
@Test
+ public void testInvocationSubmission() throws Exception {
+ assertEquals(((EntityInternal)e).getExecutionContext()
+ .submit( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ).get(), "hi Bob");
+ }
+ @Test
+ public void testInvocationGet() throws Exception {
+ assertEquals(((EntityInternal)e).getExecutionContext()
+ .get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
+ }
+
+ @Test(groups="WIP") // see comments at BasicExecutionContext.getImmediately
+ public void testInvocationGetImmediately() throws Exception {
+ assertEquals(((EntityInternal)e).getExecutionContext()
+ .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
+ }
+
+ @Test
public void testCanRetrieveTaskForEffector() {
e.sayHi1("Bob", "hi");