You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/06 08:06:26 UTC
[02/23] brooklyn-server git commit: task visibility: entity mgmt
create and startup wrapped in its own task
task visibility: entity mgmt create and startup wrapped in its own task
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/7f4d7bd8
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/7f4d7bd8
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/7f4d7bd8
Branch: refs/heads/master
Commit: 7f4d7bd87e0e0e1d98ed49d875e601a21e0635c8
Parents: 37b6b11
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 12 15:01:09 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:54 2017 +0100
----------------------------------------------------------------------
.../core/mgmt/EntityManagementUtils.java | 8 +++---
.../mgmt/internal/EntityManagementSupport.java | 19 ++++++++-----
.../mgmt/internal/LocalSubscriptionManager.java | 2 ++
.../internal/QueueingSubscriptionManager.java | 4 +--
.../internal/EntityExecutionManagerTest.java | 28 ++++++++++++++------
5 files changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
index 0cd18fc..de9964c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
@@ -91,9 +91,11 @@ public class EntityManagementUtils {
*/
@Beta
public static <T extends Application> T createUnstarted(ManagementContext mgmt, EntitySpec<T> spec, Optional<String> entityId) {
- // TODO wrap in task
- T app = ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId);
- return app;
+ return mgmt.getServerExecutionContext().get(Tasks.<T>builder().dynamic(false)
+ .displayName("Creating entity "+
+ (Strings.isNonBlank(spec.getDisplayName()) ? spec.getDisplayName() : spec.getType().getName()) )
+ .body(() -> ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId))
+ .build() );
}
/** as {@link #createUnstarted(ManagementContext, EntitySpec)} but for a string plan (e.g. camp yaml) */
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
index 97c7bac..96cfd33 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
@@ -38,10 +38,12 @@ import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.EntityAndItem;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.StringAndArgument;
import org.apache.brooklyn.core.mgmt.internal.NonDeploymentManagementContext.NonDeploymentManagementContextMode;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,9 +161,10 @@ public class EntityManagementSupport {
}
public void onManagementStarting(ManagementTransitionInfo info) {
- try {
- // TODO same-thread task on this entity, with internal tag ?
- synchronized (this) {
+ info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management starting")
+ .dynamic(false)
+ .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+ .body(() -> { try { synchronized (this) {
boolean alreadyManaging = isDeployed();
if (alreadyManaging) {
@@ -212,13 +215,15 @@ public class EntityManagementSupport {
} catch (Throwable t) {
managementFailed.set(true);
throw Exceptions.propagate(t);
- }
+ }}).build() );
}
@SuppressWarnings("deprecation")
public void onManagementStarted(ManagementTransitionInfo info) {
- try {
- synchronized (this) {
+ info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management started")
+ .dynamic(false)
+ .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+ .body(() -> { try { synchronized (this) {
boolean alreadyManaged = isFullyManaged();
if (alreadyManaged) {
@@ -265,7 +270,7 @@ public class EntityManagementSupport {
} catch (Throwable t) {
managementFailed.set(true);
throw Exceptions.propagate(t);
- }
+ }}).build() );
}
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 2349c73..983b307 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -229,6 +229,8 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
.addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
.add(s.subscriberExecutionManagerTag)
.add(BrooklynTaskTags.SENSOR_TAG)
+ // associate the publish event with the publisher (though on init it might be triggered by subscriber)
+ .addIfNotNull(event.getSource()!=null ? BrooklynTaskTags.tagForTargetEntity(event.getSource()) : null)
.build()
.asUnmodifiable();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
index 83facda..290520d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
@@ -76,7 +76,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
@SuppressWarnings("unchecked")
public synchronized void startDelegatingForSubscribing() {
- // TODO wrap in same-thread task
+ // could wrap in same-thread task, but there's enough context without it
assert delegate!=null;
for (QueuedSubscription s: queuedSubscriptions) {
delegate.subscribe(s.flags, s.s);
@@ -87,7 +87,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
@SuppressWarnings("unchecked")
public synchronized void startDelegatingForPublishing() {
- // TODO wrap in same-thread task
+ // could wrap in same-thread task, but there's enough context without it
assert delegate!=null;
for (SensorEvent evt: queuedSensorEvents) {
delegate.publish(evt);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 51f5bdc..59ee3cf 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -46,8 +46,10 @@ import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
import org.apache.brooklyn.util.core.task.ExecutionListener;
+import org.apache.brooklyn.util.core.task.ScheduledTask;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.javalang.JavaClassNames;
@@ -129,18 +131,28 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
return Tasks.builder().displayName(name).dynamic(false).body(Callables.returning(null));
}
- protected void assertTaskCountForEntityEventually(final Entity entity, final int expectedCount) {
+ protected void assertImportantTaskCountForEntityEventually(final Entity entity, final int expectedCount) {
// Dead task (and initialization task) should have been GC'd on completion.
// However, the GC'ing happens in a listener, executed in a different thread - the task.get()
// doesn't block for it. Therefore can't always guarantee it will be GC'ed by now.
Asserts.succeedsEventually(new Runnable() {
@Override public void run() {
- forceGc();
- Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity);
+ forceGc();
+ Collection<Task<?>> tasks = removeSystemTasks(BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity));
Assert.assertEquals(tasks.size(), expectedCount, "Tasks were "+tasks);
}});
}
+ static Set<Task<?>> removeSystemTasks(Iterable<Task<?>> tasks) {
+ Set<Task<?>> result = MutableSet.of();
+ for (Task<?> t: tasks) {
+ if (t instanceof ScheduledTask) continue;
+ if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue;
+ result.add(t);
+ }
+ return result;
+ }
+
// Needed because of https://issues.apache.org/jira/browse/BROOKLYN-401
protected void assertTaskMaxCountForEntityEventually(final Entity entity, final int expectedMaxCount) {
// Dead task (and initialization task) should have been GC'd on completion.
@@ -149,7 +161,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
Asserts.succeedsEventually(new Runnable() {
@Override public void run() {
forceGc();
- Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity);
+ Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) );
Assert.assertTrue(tasks.size() <= expectedMaxCount,
"Expected tasks count max of " + expectedMaxCount + ". Tasks were "+tasks);
}});
@@ -161,8 +173,8 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
final Task<?> task = runEmptyTaskWithNameAndTags(e, "should-be-kept", ManagementContextInternal.NON_TRANSIENT_TASK_TAG);
runEmptyTaskWithNameAndTags(e, "should-be-gcd", ManagementContextInternal.TRANSIENT_TASK_TAG);
- assertTaskCountForEntityEventually(e, 1);
- Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e);
+ assertImportantTaskCountForEntityEventually(e, 1);
+ Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e) );
assertEquals(tasks, ImmutableList.of(task), "Mismatched tasks, got: "+tasks);
}
@@ -320,7 +332,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
// allow background enrichers to complete
Time.sleep(Duration.ONE_SECOND);
forceGc();
- List<Task<?>> t1 = em.getAllTasks();
+ Collection<Task<?>> t1 = em.getAllTasks();
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
entity.sensors().set(TestEntity.NAME, "bob");
@@ -328,7 +340,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
Entities.destroy(entity);
Time.sleep(Duration.ONE_SECOND);
forceGc();
- List<Task<?>> t2 = em.getAllTasks();
+ Collection<Task<?>> t2 = em.getAllTasks();
Assert.assertEquals(t1.size(), t2.size(), "lists are different:\n"+t1+"\n"+t2+"\n");
}