You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/06 08:06:26 UTC

[02/23] brooklyn-server git commit: task visibility: entity mgmt create and startup wrapped in its own task

task visibility: entity mgmt create and startup wrapped in its own task


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/7f4d7bd8
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/7f4d7bd8
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/7f4d7bd8

Branch: refs/heads/master
Commit: 7f4d7bd87e0e0e1d98ed49d875e601a21e0635c8
Parents: 37b6b11
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 12 15:01:09 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:54 2017 +0100

----------------------------------------------------------------------
 .../core/mgmt/EntityManagementUtils.java        |  8 +++---
 .../mgmt/internal/EntityManagementSupport.java  | 19 ++++++++-----
 .../mgmt/internal/LocalSubscriptionManager.java |  2 ++
 .../internal/QueueingSubscriptionManager.java   |  4 +--
 .../internal/EntityExecutionManagerTest.java    | 28 ++++++++++++++------
 5 files changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
index 0cd18fc..de9964c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
@@ -91,9 +91,11 @@ public class EntityManagementUtils {
      */
     @Beta
     public static <T extends Application> T createUnstarted(ManagementContext mgmt, EntitySpec<T> spec, Optional<String> entityId) {
-        // TODO wrap in task
-        T app = ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId);
-        return app;
+        return mgmt.getServerExecutionContext().get(Tasks.<T>builder().dynamic(false)
+            .displayName("Creating entity "+
+                (Strings.isNonBlank(spec.getDisplayName()) ? spec.getDisplayName() : spec.getType().getName()) )
+            .body(() -> ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId))
+            .build() );
     }
 
     /** as {@link #createUnstarted(ManagementContext, EntitySpec)} but for a string plan (e.g. camp yaml) */

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
index 97c7bac..96cfd33 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
@@ -38,10 +38,12 @@ import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.entity.AbstractEntity;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.EntityAndItem;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.StringAndArgument;
 import org.apache.brooklyn.core.mgmt.internal.NonDeploymentManagementContext.NonDeploymentManagementContextMode;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,9 +161,10 @@ public class EntityManagementSupport {
     }
     
     public void onManagementStarting(ManagementTransitionInfo info) {
-        try {
-            // TODO same-thread task on this entity, with internal tag ?
-            synchronized (this) {
+        info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management starting")
+            .dynamic(false)
+            .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+            .body(() -> { try { synchronized (this) {
                 boolean alreadyManaging = isDeployed();
                 
                 if (alreadyManaging) {
@@ -212,13 +215,15 @@ public class EntityManagementSupport {
         } catch (Throwable t) {
             managementFailed.set(true);
             throw Exceptions.propagate(t);
-        }
+        }}).build() );
     }
 
     @SuppressWarnings("deprecation")
     public void onManagementStarted(ManagementTransitionInfo info) {
-        try {
-            synchronized (this) {
+        info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management started")
+            .dynamic(false)
+            .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+            .body(() -> { try { synchronized (this) {
                 boolean alreadyManaged = isFullyManaged();
                 
                 if (alreadyManaged) {
@@ -265,7 +270,7 @@ public class EntityManagementSupport {
         } catch (Throwable t) {
             managementFailed.set(true);
             throw Exceptions.propagate(t);
-        }
+        }}).build() );
     }
     
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 2349c73..983b307 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -229,6 +229,8 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
             .add(s.subscriberExecutionManagerTag)
             .add(BrooklynTaskTags.SENSOR_TAG)
+            // associate the publish event with the publisher (though on init it might be triggered by subscriber)
+            .addIfNotNull(event.getSource()!=null ? BrooklynTaskTags.tagForTargetEntity(event.getSource()) : null)
             .build()
             .asUnmodifiable();
         

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
index 83facda..290520d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
@@ -76,7 +76,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
     
     @SuppressWarnings("unchecked")
     public synchronized void startDelegatingForSubscribing() {
-        // TODO wrap in same-thread task
+        // could wrap in same-thread task, but there's enough context without it
         assert delegate!=null;
         for (QueuedSubscription s: queuedSubscriptions) {
             delegate.subscribe(s.flags, s.s);
@@ -87,7 +87,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
     
     @SuppressWarnings("unchecked")
     public synchronized void startDelegatingForPublishing() {
-        // TODO wrap in same-thread task
+        // could wrap in same-thread task, but there's enough context without it
         assert delegate!=null;
         for (SensorEvent evt: queuedSensorEvents) {
             delegate.publish(evt);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 51f5bdc..59ee3cf 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -46,8 +46,10 @@ import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
 import org.apache.brooklyn.util.core.task.ExecutionListener;
+import org.apache.brooklyn.util.core.task.ScheduledTask;
 import org.apache.brooklyn.util.core.task.TaskBuilder;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.javalang.JavaClassNames;
@@ -129,18 +131,28 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         return Tasks.builder().displayName(name).dynamic(false).body(Callables.returning(null));
     }
 
-    protected void assertTaskCountForEntityEventually(final Entity entity, final int expectedCount) {
+    protected void assertImportantTaskCountForEntityEventually(final Entity entity, final int expectedCount) {
         // Dead task (and initialization task) should have been GC'd on completion.
         // However, the GC'ing happens in a listener, executed in a different thread - the task.get()
         // doesn't block for it. Therefore can't always guarantee it will be GC'ed by now.
         Asserts.succeedsEventually(new Runnable() {
             @Override public void run() {
-                forceGc();
-                Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity);
+                forceGc();  
+                Collection<Task<?>> tasks = removeSystemTasks(BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity));
                 Assert.assertEquals(tasks.size(), expectedCount, "Tasks were "+tasks);
             }});
     }
 
+    static Set<Task<?>> removeSystemTasks(Iterable<Task<?>> tasks) {
+        Set<Task<?>> result = MutableSet.of();
+        for (Task<?> t: tasks) {
+            if (t instanceof ScheduledTask) continue;
+            if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue;
+            result.add(t);
+        }
+        return result;
+    }
+
     // Needed because of https://issues.apache.org/jira/browse/BROOKLYN-401
     protected void assertTaskMaxCountForEntityEventually(final Entity entity, final int expectedMaxCount) {
         // Dead task (and initialization task) should have been GC'd on completion.
@@ -149,7 +161,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         Asserts.succeedsEventually(new Runnable() {
             @Override public void run() {
                 forceGc();
-                Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity);
+                Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) );
                 Assert.assertTrue(tasks.size() <= expectedMaxCount,
                         "Expected tasks count max of " + expectedMaxCount + ". Tasks were "+tasks);
             }});
@@ -161,8 +173,8 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         final Task<?> task = runEmptyTaskWithNameAndTags(e, "should-be-kept", ManagementContextInternal.NON_TRANSIENT_TASK_TAG);
         runEmptyTaskWithNameAndTags(e, "should-be-gcd", ManagementContextInternal.TRANSIENT_TASK_TAG);
         
-        assertTaskCountForEntityEventually(e, 1);
-        Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e);
+        assertImportantTaskCountForEntityEventually(e, 1);
+        Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e) );
         assertEquals(tasks, ImmutableList.of(task), "Mismatched tasks, got: "+tasks);
     }
 
@@ -320,7 +332,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         // allow background enrichers to complete
         Time.sleep(Duration.ONE_SECOND);
         forceGc();
-        List<Task<?>> t1 = em.getAllTasks();
+        Collection<Task<?>> t1 = em.getAllTasks();
 
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
         entity.sensors().set(TestEntity.NAME, "bob");
@@ -328,7 +340,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         Entities.destroy(entity);
         Time.sleep(Duration.ONE_SECOND);
         forceGc();
-        List<Task<?>> t2 = em.getAllTasks();
+        Collection<Task<?>> t2 = em.getAllTasks();
 
         Assert.assertEquals(t1.size(), t2.size(), "lists are different:\n"+t1+"\n"+t2+"\n");
     }