You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/07/14 11:35:54 UTC

[03/16] incubator-brooklyn git commit: Don't try to re-submit DST secondary tasks if already executed

Don't try to re-submit DST secondary tasks if already executed

Effectors invoked against an entity will be submitted twice - once in the entity's context and again in the TaskQueueingContext of the caller (see brooklyn.management.internal.LocalManagementContext.runAtEntity(Entity, TaskAdaptable<T>)). Usually it's not a problem because the ExecutionContext will notice it's already submitted and ignore it. But if the entity is unmanaged in the time being then the call to get the entity's ExecutionContext will fail.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/160b3ca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/160b3ca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/160b3ca0

Branch: refs/heads/master
Commit: 160b3ca0fc6fefd9ef5b6e6929c7d3594436cf09
Parents: 807e6d1
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Tue Jul 7 17:24:24 2015 +0300
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Jul 7 17:29:16 2015 +0300

----------------------------------------------------------------------
 .../entity/group/DynamicClusterImpl.java        |  6 +-
 .../util/task/DynamicSequentialTask.java        | 10 ++-
 .../entity/effector/EffectorTaskTest.java       | 79 ++++++++++++++++++++
 3 files changed, 88 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
index 447a4bb..0e2f164 100644
--- a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
@@ -874,11 +874,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
         try {
             if (member instanceof Startable) {
                 Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
-                try {
-                    task.get();
-                } catch (Exception e) {
-                    throw Exceptions.propagate(e);
-                }
+                task.getUnchecked();
             }
         } finally {
             Entities.unmanage(member);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
index 967c831..7756388 100644
--- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
+++ b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
@@ -209,12 +209,18 @@ public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChi
             throw new IllegalStateException(message);
         }
         synchronized (task) {
-            if (task.isSubmitted() && !task.isDone()) {
+            if (task.isSubmitted()) {
                 if (log.isTraceEnabled()) {
                     log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted");
                 }
             } else {
-                ec.submit(task);
+                try {
+                    ec.submit(task);
+                } catch (Exception e) {
+                    Exceptions.propagateIfFatal(e);
+                    // Give some context when the submit fails (happens when the target is already unmanaged)
+                    throw new IllegalStateException("Failure submitting task "+task+" in "+this+": "+e.getMessage(), e);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java b/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java
index e9584db..bc7677c 100644
--- a/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java
+++ b/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java
@@ -19,6 +19,7 @@
 package brooklyn.entity.effector;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -28,6 +29,7 @@ import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.AbstractEntity;
 import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
 import brooklyn.entity.effector.EffectorTasks.EffectorTaskFactory;
 import brooklyn.entity.proxying.EntitySpec;
@@ -37,12 +39,14 @@ import brooklyn.management.Task;
 import brooklyn.test.entity.TestEntity;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.task.DynamicSequentialTask;
 import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.TaskBuilder;
 import brooklyn.util.task.Tasks;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
 public class EffectorTaskTest extends BrooklynAppUnitTestSupport {
@@ -53,6 +57,7 @@ public class EffectorTaskTest extends BrooklynAppUnitTestSupport {
             .description("doubles the given number")
             .parameter(Integer.class, "numberToDouble")
             .impl(new EffectorBody<Integer>() {
+                @Override
                 public Integer call(ConfigBag parameters) {
                     // do a sanity check
                     Assert.assertNotNull(entity());
@@ -352,4 +357,78 @@ public class EffectorTaskTest extends BrooklynAppUnitTestSupport {
         
         Assert.assertEquals(doubler.invoke(DoublingEntity.DOUBLE, MutableMap.of("numberToDouble", 3, "numberToStartWith", 3)).get(), (Integer)7);
     }
+    
+    public static final Effector<Void> DUMMY = Effectors.effector(Void.class, "dummy")
+            .impl(new EffectorBody<Void>() {
+                @Override
+                public Void call(ConfigBag parameters) {
+                    return null;
+                }
+            })
+            .build();
+    
+    public static final Effector<Void> STALL = Effectors.effector(Void.class, "stall")
+            .parameter(AtomicBoolean.class, "lock")
+            .impl(new EffectorBody<Void>() {
+                @Override
+                public Void call(ConfigBag parameters) {
+                    AtomicBoolean lock = (AtomicBoolean)parameters.getStringKey("lock");
+                    synchronized(lock) {
+                        if (!lock.get()) {
+                            try {
+                                lock.wait();
+                            } catch (InterruptedException e) {
+                                Exceptions.propagate(e);
+                            }
+                        }
+                    }
+                    return null;
+                }
+            })
+            .build();
+
+    public static final Effector<Void> CONTEXT = Effectors.effector(Void.class, "stall_caller")
+            .parameter(AtomicBoolean.class, "lock")
+            .impl(new EffectorBody<Void>() {
+                @Override
+                public Void call(ConfigBag parameters) {
+                    Entity child = Iterables.getOnlyElement(entity().getChildren());
+                    AtomicBoolean lock = new AtomicBoolean();
+                    Task<Void> dummyTask = null;
+
+                    try {
+                        // Queue a (DST secondary) task which waits until notified, so that tasks queued later will get blocked
+                        queue(Effectors.invocation(entity(), STALL, ImmutableMap.of("lock", lock)));
+    
+                        // Start a new task - submitted directly to child's ExecutionContext, as well as added as a
+                        // DST secondary of the current effector.
+                        dummyTask = child.invoke(DUMMY, ImmutableMap.<String, Object>of());
+                        dummyTask.getUnchecked();
+
+                        // Execution completed in the child's ExecutionContext, but still queued as a secondary.
+                        // Destroy the child entity so that no subsequent tasks can be executed in its context.
+                        Entities.destroy(child);
+                    } finally {
+                        // Let STALL complete
+                        synchronized(lock) {
+                            lock.set(true);
+                            lock.notifyAll();
+                        }
+                        // At this point DUMMY will be unblocked and the DST will try to execute it as a secondary.
+                        // Submission will be ignored because DUMMY already executed.
+                        // If it's not ignored then submission will fail because entity is already unmanaged.
+                    }
+                    return null;
+                }
+            })
+            .build();
+    
+
+    @Test
+    public void testNestedEffectorExecutedAsSecondaryTask() throws Exception {
+        app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        Task<Void> effTask = app.invoke(CONTEXT, ImmutableMap.<String, Object>of());
+        effTask.get();
+    }
+
 }