You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2017/03/02 17:01:49 UTC

[05/19] brooklyn-server git commit: immediate execution runs in a fake tag allowing context to be evaluated

immediate execution runs in a fake tag allowing context to be evaluated

most new immediate tests now passing, including a new test which detects recursive config values for immediate;
except we still have:

* cancellations of immediate execution goes too far, and cancels tasks which are set as values
* task factories still not supported for evaluation


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/faeeb1bd
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/faeeb1bd
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/faeeb1bd

Branch: refs/heads/master
Commit: faeeb1bdfc065b7e44554f45cca6a3f87074efb1
Parents: a679682
Author: Alex Heneveld <al...@Alexs-MacBook-Pro.local>
Authored: Tue Dec 6 22:15:10 2016 +0000
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Feb 14 16:49:00 2017 +0000

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ExecutionContext.java     |  3 ++
 .../brooklyn/spi/dsl/methods/DslComponent.java  | 31 +++++++++++---------
 .../brooklyn/core/mgmt/BrooklynTaskTags.java    |  2 ++
 .../AbstractConfigurationSupportInternal.java   |  1 -
 .../util/core/task/BasicExecutionContext.java   | 31 +++++++++++++++++++-
 .../task/InterruptingImmediateSupplier.java     |  3 +-
 .../brooklyn/util/core/task/ValueResolver.java  |  4 +--
 7 files changed, 56 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/faeeb1bd/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 4540240..d8e538c 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 
 import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.util.guava.Maybe;
 
 /**
  * This is a Brooklyn extension to the Java {@link Executor}.
@@ -64,4 +65,6 @@ public interface ExecutionContext extends Executor {
 
     boolean isShutdown();
 
+    <T> Maybe<T> getImmediately(Object callableOrSupplier);
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/faeeb1bd/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index 1f704b3..80e202d 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -503,6 +503,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> implements
             if (targetEntityMaybe.isAbsent()) return Maybe.<Object>cast(targetEntityMaybe);
             EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get();
             ConfigKey<?> key = targetEntity.getEntityType().getConfigKey(keyName);
+            checkAndTagForRecursiveReference(targetEntity);
             Maybe<?> result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyName));
             return Maybe.<Object>cast(result);
         }
@@ -517,22 +518,24 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> implements
                         @Override
                         public Object call() throws Exception {
                             Entity targetEntity = component.get();
-                            
-                            String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')";
-                            Task<?> ancestor = Tasks.current();
-                            while (ancestor!=null) {
-                                if (TaskTags.hasTag(ancestor, tag)) {
-                                    throw new IllegalStateException("Recursive config reference "+tag); 
-                                }
-                                ancestor = ancestor.getSubmittedByTask();
-                            }
-                            
-                            Tasks.addTagDynamically(tag);
-                            
+                            checkAndTagForRecursiveReference(targetEntity);
                             ConfigKey<?> key = targetEntity.getEntityType().getConfigKey(keyName);
                             return targetEntity.getConfig(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyName));
-                        }})
-                    .build();
+                        }
+                    }).build();
+        }
+        
+        private void checkAndTagForRecursiveReference(Entity targetEntity) {
+            String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')";
+            Task<?> ancestor = Tasks.current();
+            while (ancestor!=null) {
+                if (TaskTags.hasTag(ancestor, tag)) {
+                    throw new IllegalStateException("Recursive config reference "+tag); 
+                }
+                ancestor = ancestor.getSubmittedByTask();
+            }
+            
+            Tasks.addTagDynamically(tag);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/faeeb1bd/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index 39b2f70..8f4fa14 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -78,6 +78,8 @@ public class BrooklynTaskTags extends TaskTags {
      * and that it need not appear in some task lists;
      * often used for framework lifecycle events and sensor polling */
     public static final String TRANSIENT_TASK_TAG = "TRANSIENT";
+    /** marks that a task is meant to return immediately, without blocking (or if absolutely necessary blocking for a short while) */
+    public static final String IMMEDIATE_TASK_TAG = "IMMEDIATE";
 
     // ------------- entity tags -------------------------
     

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/faeeb1bd/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
index ce10c86..796ab13 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
@@ -146,7 +146,6 @@ public abstract class AbstractConfigurationSupportInternal implements BrooklynOb
                 .immediately(true)
                 .deep(true)
                 .context(getContext())
-                .swallowExceptions()
                 .get();
         return (resolved != marker)
                 ? TypeCoercions.tryCoerce(resolved, key.getTypeToken())

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/faeeb1bd/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 38f1b5a..f23053b 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -41,10 +41,12 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.guava.Maybe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
 
 /**
@@ -96,7 +98,34 @@ public class BasicExecutionContext extends AbstractExecutionContext {
     /** returns tasks started by this context (or tasks which have all the tags on this object) */
     @Override
     public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags(tags); }
-     
+
+    /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context */
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
+        BasicTask<?> fakeTaskForContext = new BasicTask<Object>(MutableMap.of("displayName", "immediate evaluation"));
+        fakeTaskForContext.tags.addAll(tags);
+        fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
+        fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
+        
+        Task<?> previousTask = BasicExecutionManager.getPerThreadCurrentTask().get();
+        if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask);
+        try {
+            BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext);
+            
+            if ((callableOrSupplier instanceof Supplier) && !(callableOrSupplier instanceof ImmediateSupplier)) {
+                callableOrSupplier = new InterruptingImmediateSupplier<>((Supplier<Object>)callableOrSupplier);
+            }
+            if (callableOrSupplier instanceof ImmediateSupplier) {
+                return ((ImmediateSupplier<T>)callableOrSupplier).getImmediately();
+            }
+            // TODO could add more types here
+            throw new IllegalArgumentException("Type "+callableOrSupplier.getClass()+" not supported for getImmediately (instance "+callableOrSupplier+")");
+        } finally {
+            BasicExecutionManager.getPerThreadCurrentTask().set(previousTask);
+        }
+    }
+    
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/faeeb1bd/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
index c29b458..c478f5e 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
@@ -18,6 +18,7 @@
  */
 package org.apache.brooklyn.util.core.task;
 
+import java.util.NoSuchElementException;
 import java.util.concurrent.Semaphore;
 
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -56,7 +57,7 @@ public class InterruptingImmediateSupplier<T> implements ImmediateSupplier<T>, D
         } catch (Throwable t) {
             if (Exceptions.getFirstThrowableOfType(t, InterruptedException.class)!=null || 
                     Exceptions.getFirstThrowableOfType(t, RuntimeInterruptedException.class)!=null) {
-                return Maybe.absent("Immediate value not available");
+                return Maybe.absent(new UnsupportedOperationException("Immediate value not available", t));
             }
             throw Exceptions.propagate(t);
         } finally {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/faeeb1bd/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 8c61c45..ec7eb01 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -357,9 +357,9 @@ public class ValueResolver<T> implements DeferredSupplier<T>, Iterable<Maybe<Obj
         
         try {
             if (immediately && v instanceof ImmediateSupplier) {
-                final ImmediateSupplier<?> supplier = (ImmediateSupplier<?>) v;
+                final ImmediateSupplier<Object> supplier = (ImmediateSupplier<Object>) v;
                 try {
-                    Maybe<?> result = supplier.getImmediately();
+                    Maybe<Object> result = exec.getImmediately(supplier);
                     
                     // Recurse: need to ensure returned value is cast, etc
                     return (result.isPresent())