You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/03 14:23:58 UTC

[21/35] brooklyn-server git commit: apply context-switching task wrapper for same-thread and immediate tasks

apply context-switching task wrapper for same-thread and immediate tasks

now behave same as submitted tasks. add tests, and comments about limits of getImmediately.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/2bdcf1ac
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/2bdcf1ac
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/2bdcf1ac

Branch: refs/heads/master
Commit: 2bdcf1ac81853410100920968f92a82c41b1f5b6
Parents: ab480ed
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Sep 18 12:49:51 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 16:40:56 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/core/effector/Effectors.java       |   2 +-
 .../util/core/task/BasicExecutionContext.java   | 158 +++++++++++--------
 .../core/effector/EffectorSayHiTest.java        |  19 ++-
 3 files changed, 114 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2bdcf1ac/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
index 53db25a..653c607 100644
--- a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
+++ b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
@@ -202,7 +202,7 @@ public class Effectors {
 
     /** returns an unsubmitted task which will invoke the given effector on the given entities
      * (this form of method is a convenience for {@link #invocation(Effector, Map, Iterable)}) */
-    public static TaskAdaptable<List<?>> invocation(Effector<?> eff, MutableMap<?, ?> params, Entity ...entities) {
+    public static TaskAdaptable<List<?>> invocation(Effector<?> eff, Map<?, ?> params, Entity ...entities) {
         return invocation(eff, params, Arrays.asList(entities));
     }
     

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2bdcf1ac/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 114aac7..429cad7 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -21,6 +21,7 @@ package org.apache.brooklyn.util.core.task;
 import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
@@ -133,6 +134,11 @@ public class BasicExecutionContext extends AbstractExecutionContext {
                 throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+t);
             }
         }
+        
+        ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(t, Collections.emptyList(), false);
+        if (switchContextWrapper!=null) {
+            return switchContextWrapper.context.get(switchContextWrapper.wrapperTask);
+        }
 
         try {
             return runInSameThread(t, new Callable<Maybe<T>>() {
@@ -213,12 +219,6 @@ public class BasicExecutionContext extends AbstractExecutionContext {
         try {
             ((BasicExecutionManager)executionManager).afterSubmitRecordFuture(task, future);
             ((BasicExecutionManager)executionManager).beforeStartInSameThreadTask(null, task);
-
-            // TODO this does not apply the same context-switching logic as submit;
-            // means if task on X submits via this method a non-child (non-queued) task for an effector on Y,
-            // a request to get all of X's tasks recursively won't pick up the call on Y
-            // (because parent tasks don't have records of submitted tasks unless they are children) 
-            
             return future.set(job.call());
             
         } catch (Exception e) {
@@ -253,24 +253,35 @@ public class BasicExecutionContext extends AbstractExecutionContext {
             fakeTaskForContext = (BasicTask<T>)callableOrSupplier;
             if (fakeTaskForContext.isQueuedOrSubmitted()) {
                 if (fakeTaskForContext.isDone()) {
-                    return Maybe.of((T)fakeTaskForContext.getUnchecked());
+                    return Maybe.of(fakeTaskForContext.getUnchecked());
                 } else {
                     throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext);
                 }
             }
             callableOrSupplier = fakeTaskForContext.getJob();
+        } else if (callableOrSupplier instanceof TaskAdaptable) {
+            return getImmediately( ((TaskAdaptable<T>)callableOrSupplier).asTask() );
         } else {
-            fakeTaskForContext = new BasicTask<T>(MutableMap.of("displayName", "immediate evaluation"));
+            fakeTaskForContext = new BasicTask<T>(MutableMap.of("displayName", "Immediate evaluation"));
         }
         final ImmediateSupplier<T> job = callableOrSupplier instanceof ImmediateSupplier ? (ImmediateSupplier<T>) callableOrSupplier 
             : InterruptingImmediateSupplier.<T>of(callableOrSupplier);
         fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
         fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
 
+        ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(fakeTaskForContext, Collections.emptyList(), true);
+        if (switchContextWrapper!=null) {
+            return switchContextWrapper.context.getImmediately(switchContextWrapper.wrapperTask);
+        }
+
         try {
             return runInSameThread(fakeTaskForContext, new Callable<Maybe<T>>() {
                 public Maybe<T> call() {
-                    // TODO do we really want to cancel? means if it submits other things they won't run, even if they return immediately
+                    // TODO could try to make this work for more types of tasks by not cancelling, it just interrupting;
+                    // however the biggest place "getImmediate" fails is with DSTs where interrupting is sufficient to abort them
+                    // unnecessarily, as queue.andWait attempts to block (again, unnecessarily, but not a straightforward fix).
+                    // limited success of getImmediately is okay -- but no harm in expanding coverage by resolving that and removing cancel.
+                    // see WIP test in EffectorSayHiTest
                     fakeTaskForContext.cancel();
                     
                     boolean wasAlreadyInterrupted = Thread.interrupted();
@@ -294,68 +305,18 @@ public class BasicExecutionContext extends AbstractExecutionContext {
             return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask());
         
         Map properties = MutableMap.copyOf(propertiesQ);
-        Collection taskTags;
+        Collection<Object> taskTags;
         if (properties.get("tags")==null) {
             taskTags = new ArrayList();
         } else {
             taskTags = new ArrayList((Collection)properties.get("tags"));
         }
         properties.put("tags", taskTags);
-        
-        // FIXME some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass;
-        // the issue is that we want to ensure that cross-entity calls switch execution contexts;
-        // previously it was all very messy how that was handled (and it didn't really handle it in many cases)
         if (task instanceof Task<?>) taskTags.addAll( ((Task<?>)task).getTags() ); 
-        Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY);
 
-        checkUserSuppliedContext(task, taskTags);
-
-        if (target!=null && !tags.contains(BrooklynTaskTags.tagForContextEntity(target))) {
-            // task is switching execution context boundaries
-            /* 
-             * longer notes:
-             * you fall in to this block if the caller requests a target entity different to the current context 
-             * (e.g. where entity X is invoking an effector on Y, it will start in X's context, 
-             * but the effector should run in Y's context).
-             * 
-             * we need to make sure there is a reference from this execution context to the submitted task,
-             * IE the submitted task is a child of something in this execution context.
-             * this ensures it shows up via the REST API and in the UI; without it we lose the reference to the child when browsing in the context of the parent.
-             * 
-             * if it is queued or it is already recorded as a child we can simply submit in target context;
-             * but if not we need to wrap it in a task running in this context with the submitted task as a child to have that reference.
-             */
-            final ExecutionContext tc = ((EntityInternal)target).getExecutionContext();
-            if (log.isDebugEnabled())
-                log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")");
-            
-            final Task<T> t;
-            if (task instanceof Task<?>) {
-                t = (Task<T>)task;
-                if (Tasks.isQueuedOrSubmitted(t) ||
-                        ((Tasks.current() instanceof HasTaskChildren) && Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) {
-                    // we are already tracked by parent, just submit it 
-                    return tc.submit(t);
-                }
-            } else {
-                // for callables and runnables there is definitely no record
-                if (task instanceof Callable) {
-                    t = Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build();
-                } else if (task instanceof Runnable) {
-                    t = Tasks.<T>builder().dynamic(false).body((Runnable)task).build();
-                } else {
-                    throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null"));
-                }                
-            }
-                
-            return submit(
-                // 2017-09 changed, doesn't have to be a dynamic task; can be a simple sequential task wrapping the child
-                Tasks.<T>builder().displayName("Cross-context execution: "+t.getDescription()).dynamic(false).parallel(false).body(new Callable<T>() {
-                    @Override
-                    public T call() { 
-                        return tc.get(t); 
-                    }
-                }).build() );
+        ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(task, taskTags, false);
+        if (switchContextWrapper!=null) {
+            return switchContextWrapper.context.submit(switchContextWrapper.wrapperTask);
         }
 
         EntitlementContext entitlementContext = BrooklynTaskTags.getEntitlement(taskTags);
@@ -405,6 +366,77 @@ public class BasicExecutionContext extends AbstractExecutionContext {
         }
     }
 
+    private static class ContextSwitchingInfo<T> {
+        final ExecutionContext context;
+        final Task<T> wrapperTask;
+        ContextSwitchingInfo(ExecutionContext context, Task<T> wrapperTask) {
+            this.context = context;
+            this.wrapperTask = wrapperTask;
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected <T> ContextSwitchingInfo<T> getContextSwitchingTask(final Object task, Collection<Object> taskTags, boolean immediate) {
+        checkUserSuppliedContext(task, taskTags);
+        
+        Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY);
+        if (target==null || tags.contains(BrooklynTaskTags.tagForContextEntity(target))) {
+            return null;
+        }
+        
+        // task is switching execution context boundaries
+        
+        // some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass;
+        // the issue is that we want to ensure that cross-entity calls switch execution contexts;
+        // previously it was all very messy how that was handled (and it didn't really handle it in many cases)
+
+        /* 
+         * longer notes:
+         * you fall in to this block if the caller requests a target entity different to the current context 
+         * (e.g. where entity X is invoking an effector on Y, it will start in X's context, 
+         * but the effector should run in Y's context).
+         * 
+         * we need to make sure there is a reference from this execution context to the submitted task,
+         * IE the submitted task is a child of something in this execution context.
+         * this ensures it shows up via the REST API and in the UI; without it we lose the reference to the child when browsing in the context of the parent.
+         * 
+         * if it is queued or it is already recorded as a child we can simply submit in target context;
+         * but if not we need to wrap it in a task running in this context with the submitted task as a child to have that reference.
+         */
+        final ExecutionContext tc = ((EntityInternal)target).getExecutionContext();
+        if (log.isDebugEnabled())
+            log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")");
+            
+        final Task<T> t;
+        if (task instanceof Task<?>) {
+            t = (Task<T>)task;
+            if (Tasks.isQueuedOrSubmitted(t) ||
+                    ((Tasks.current() instanceof HasTaskChildren) && Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) {
+                // we are already tracked by parent, just submit it 
+                return new ContextSwitchingInfo<>(tc, t);
+            }
+        } else {
+            // for callables and runnables there is definitely no record
+            if (task instanceof Callable) {
+                t = Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build();
+            } else if (task instanceof Runnable) {
+                t = Tasks.<T>builder().dynamic(false).body((Runnable)task).build();
+            } else {
+                throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null"));
+            }                
+        }
+            
+        return 
+            // 2017-09 changed, doesn't have to be a dynamic task; can be a simple sequential task wrapping the child
+            new ContextSwitchingInfo<>(tc, Tasks.<T>builder().displayName("Cross-context execution: "+t.getDescription()).dynamic(false).parallel(false).body(new Callable<T>() {
+                @Override
+                public T call() throws Exception {
+                    if (immediate) return tc.<T>getImmediately(t).get();
+                    return tc.get(t); 
+                }
+            }).build());
+    }
+
     private void registerPerThreadExecutionContext() { perThreadExecutionContext.set(this); }
 
     private void clearPerThreadExecutionContext() { perThreadExecutionContext.remove(); }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2bdcf1ac/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
index df8a2cb..9039cf2 100644
--- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
@@ -34,8 +34,8 @@ import org.apache.brooklyn.api.entity.ImplementedBy;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.core.annotation.EffectorParam;
-import org.apache.brooklyn.core.effector.MethodEffector;
 import org.apache.brooklyn.core.entity.AbstractEntity;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.entity.trait.Startable;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
@@ -99,6 +99,23 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
     }
 
     @Test
+    public void testInvocationSubmission() throws Exception {
+        assertEquals(((EntityInternal)e).getExecutionContext()
+            .submit( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ).get(), "hi Bob");
+    }
+    @Test
+    public void testInvocationGet() throws Exception {
+        assertEquals(((EntityInternal)e).getExecutionContext()
+            .get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
+    }
+    
+    @Test(groups="WIP")  // see comments at BasicExecutionContext.getImmediately
+    public void testInvocationGetImmediately() throws Exception {
+        assertEquals(((EntityInternal)e).getExecutionContext()
+            .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
+    }
+
+    @Test
     public void testCanRetrieveTaskForEffector() {
         e.sayHi1("Bob", "hi");