You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/06 08:06:25 UTC
[01/23] brooklyn-server git commit: task optimization: some
queued-or-submitted tasks use foreground for executing
Repository: brooklyn-server
Updated Branches:
refs/heads/master 54f9c708a -> a73ee1728
task optimization: some queued-or-submitted tasks use foreground for executing
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4430f769
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4430f769
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4430f769
Branch: refs/heads/master
Commit: 4430f769077210bf253a8d70a69482c1c2119d39
Parents: 7f4d7bd
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Sep 15 11:07:27 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:54 2017 +0100
----------------------------------------------------------------------
.../brooklyn/util/core/task/DynamicTasks.java | 16 +++++++---------
1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4430f769/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 15b062a..5426baf 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -107,7 +107,7 @@ public class DynamicTasks {
this.execContext = ((EntityInternal)entity).getExecutionContext();
return this;
}
- private boolean orSubmitInternal() {
+ private boolean orSubmitInternal(boolean samethread) {
if (!wasQueued()) {
if (isQueuedOrSubmitted()) {
log.warn("Redundant call to execute "+getTask()+"; skipping");
@@ -118,7 +118,8 @@ public class DynamicTasks {
ec = BasicExecutionContext.getCurrentExecutionContext();
if (ec==null)
throw new IllegalStateException("Cannot execute "+getTask()+" without an execution context; ensure caller is in an ExecutionContext");
- ec.submit(getTask());
+ if (samethread) ec.get(getTask());
+ else ec.submit(getTask());
return true;
}
} else {
@@ -128,7 +129,7 @@ public class DynamicTasks {
/** causes the task to be submitted (asynchronously) if it hasn't already been,
* requiring an entity execution context (will try to find a default if not set) */
public TaskQueueingResult<T> orSubmitAsync() {
- orSubmitInternal();
+ orSubmitInternal(false);
return this;
}
/** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
@@ -141,7 +142,7 @@ public class DynamicTasks {
* (which assumes all commands complete immediately);
* requiring an entity execution context (will try to find a default if not set) */
public TaskQueueingResult<T> orSubmitAndBlock() {
- if (orSubmitInternal()) task.getUnchecked();
+ orSubmitInternal(true);
return this;
}
/** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
@@ -288,12 +289,9 @@ public class DynamicTasks {
return task;
}
- /** submits/queues the given task if needed, and gets the result (unchecked)
- * only permitted in a queueing context (ie a DST main job) if the task is not yet submitted */
- // things get really confusing if you try to queueInTaskHierarchy -- easy to cause deadlocks!
+ /** submits/queues the given task if needed, and gets the result (unchecked) */
public static <T> T get(TaskAdaptable<T> t) {
- // TODO do in foreground?
- return queueIfNeeded(t).asTask().getUnchecked();
+ return queueIfPossible(t).orSubmitAndBlock().andWaitForSuccess();
}
/** As {@link #drain(Duration, boolean)} waiting forever and throwing the first error
[07/23] brooklyn-server git commit: fix deadlock in initial
publication of sensors on setting up a subscription
Posted by he...@apache.org.
fix deadlock in initial publication of sensors on setting up a subscription
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/0a1acecb
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/0a1acecb
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/0a1acecb
Branch: refs/heads/master
Commit: 0a1acecbe7b70ca752e0636605b34317f24a5a8e
Parents: 84d24d1
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 13:09:59 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Sep 19 13:16:36 2017 +0100
----------------------------------------------------------------------
.../mgmt/internal/LocalSubscriptionManager.java | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a1acecb/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 983b307..a9fb70b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -42,6 +42,7 @@ import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.sensor.BasicSensorEvent;
import org.apache.brooklyn.util.collections.MutableList;
@@ -260,6 +261,8 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
"displayName", name.toString(),
"description", description.toString());
+ boolean isEntityStarting = s.subscriber instanceof Entity && isInitial;
+ // will have entity (and adjunct) execution context from tags, so can skip getting exec context
em.submit(execFlags, new Runnable() {
@Override
public String toString() {
@@ -272,6 +275,22 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
@Override
public void run() {
try {
+ if (isEntityStarting) {
+ /* don't let sub deliveries start until this is completed;
+ * this is a pragmatic way to ensure the publish events
+ * if submitted during management starting, aren't executed
+ * until after management is starting.
+ * without this we can get deadlocks as this goes to publish,
+ * has the attribute sensors lock, and waits on the publish lock
+ * (any of management support, local subs, queueing subs).
+ * meanwhile the management startup has those three locks,
+ * then goes to publish and in the process looks up a sensor value.
+ * usually this is not an issue because some other task
+ * does something (eg entity.getExecutionContext()) which
+ * also has a wait-on-management-support semantics.
+ */
+ synchronized (((EntityInternal)s.subscriber).getManagementSupport()) {}
+ }
int count = s.eventCount.incrementAndGet();
if (count > 0 && count % 1000 == 0) LOG.debug("{} events for subscriber {}", count, s);
[09/23] brooklyn-server git commit: task GC and visibility: tidy GC
code, don't delete some things eg subscriptions quite so aggressively
Posted by he...@apache.org.
task GC and visibility: tidy GC code, don't delete some things eg subscriptions quite so aggressively
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/aeecd3e9
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/aeecd3e9
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/aeecd3e9
Branch: refs/heads/master
Commit: aeecd3e90e049064e7c4cdfdaf9b40404032815a
Parents: b0556de
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Sep 20 09:09:08 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Sep 20 10:10:25 2017 +0100
----------------------------------------------------------------------
.../mgmt/internal/BrooklynGarbageCollector.java | 93 +++++++++++++-------
.../util/core/task/BasicExecutionManager.java | 2 +
2 files changed, 65 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/aeecd3e9/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
index 7995768..e0e6dec 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
@@ -305,27 +305,20 @@ public class BrooklynGarbageCollector {
if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG))
return false;
- if (task.getSubmittedByTask()!=null) {
- Task<?> parent = task.getSubmittedByTask();
- if (executionManager.getTask(parent.getId())==null) {
- // parent is already cleaned up
- return true;
- }
- if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) {
- // it is a child, let the parent manage this task's death
- return false;
- }
- Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task);
- if (associatedEntity!=null) {
- // this is associated to an entity; destroy only if the entity is unmanaged
- return !Entities.isManaged(associatedEntity);
- }
- // if not associated to an entity, then delete immediately
- return true;
+ if (!isSubmitterExpired(task)) {
+ return false;
+ }
+ if (isChild(task)) {
+ // parent should manage this task's death; but above already kicks in if parent is not expired, so probably shouldn't come here?
+ LOG.warn("Unexpected expiry candidacy for "+task);
+ return false;
+ }
+ if (isAssociatedToActiveEntity(task)) {
+ return false;
}
// e.g. scheduled tasks, sensor events, etc
- // TODO (in future may keep some of these with another limit, based on a new TagCategory)
+ // (in future may keep some of these with another limit, based on a new TagCategory)
// there may also be a server association for server-side tasks which should be kept
// (but be careful not to keep too many subscriptions!)
@@ -337,7 +330,7 @@ public class BrooklynGarbageCollector {
* {@link #maxTasksPerTag} and {@link #maxTaskAge}.
*/
protected synchronized int gcTasks() {
- // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks.
+ // NB: be careful with memory usage here: have seen OOME if we get crazy lots of tasks.
// hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help.
//
// An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That
@@ -397,11 +390,21 @@ public class BrooklynGarbageCollector {
int deletedCount = 0;
deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false);
deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true);
- deletedCount += expireSubTasksWhoseSubmitterIsExpired();
- int deletedGlobally = expireIfOverCapacityGlobally();
- deletedCount += deletedGlobally;
- if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired();
+ // if expensive we could optimize task GC here to avoid repeated lookups by
+ // counting all expired above (not just prev two lines) and skipping if none
+ // but that seems unlikely
+ int deletedHere = 0;
+ while ((deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()) > 0) {
+ // delete in loop so we don't have descendants sticking around until deleted in later cycles
+ deletedCount += deletedHere;
+ }
+
+ deletedHere = expireIfOverCapacityGlobally();
+ deletedCount += deletedHere;
+ while (deletedHere > 0) {
+ deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion());
+ }
return deletedCount;
}
@@ -471,7 +474,9 @@ public class BrooklynGarbageCollector {
}
}
- protected int expireSubTasksWhoseSubmitterIsExpired() {
+ protected int expireHistoricTasksNowReadyForImmediateDeletion() {
+ // find tasks which weren't ready for immediate deletion, but which now are
+
// ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS
if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS))
return 0;
@@ -480,13 +485,15 @@ public class BrooklynGarbageCollector {
Collection<Task<?>> tasksToDelete = MutableList.of();
try {
for (Task<?> task: allTasks) {
- if (!task.isDone()) continue;
- Task<?> submitter = task.getSubmittedByTask();
- // if we've leaked, ie a subtask which is not a child task,
- // and the submitter is GC'd, then delete this also
- if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) {
- tasksToDelete.add(task);
+ if (!shouldDeleteTaskImmediately(task)) {
+ // 2017-09 previously we only checked done and submitter expired, and deleted if both were true
+ // so could pick up even things that were non_transient -- now much stricter
+ continue;
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("Deleting task which really is no longer wanted: "+task+" (submitted by "+task.getSubmittedByTask()+")");
}
+
+ tasksToDelete.add(task);
}
} catch (ConcurrentModificationException e) {
@@ -500,6 +507,32 @@ public class BrooklynGarbageCollector {
return tasksToDelete.size();
}
+ private boolean isAssociatedToActiveEntity(Task<?> task) {
+ Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task);
+ if (associatedEntity==null) {
+ return false;
+ }
+ // this is associated to an entity; destroy only if the entity is unmanaged
+ return Entities.isManaged(associatedEntity);
+ }
+
+ private boolean isChild(Task<?> task) {
+ Task<?> parent = task.getSubmittedByTask();
+ return (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task));
+ }
+
+ private boolean isSubmitterExpired(Task<?> task) {
+ if (Strings.isBlank(task.getSubmittedByTaskId())) {
+ return false;
+ }
+ Task<?> submitter = task.getSubmittedByTask();
+ if (submitter!=null && (!submitter.isDone() || executionManager.getTask(submitter.getId())!=null)) {
+ return false;
+ }
+ // submitter task is GC'd
+ return true;
+ }
+
protected enum TagCategory {
ENTITY, NON_ENTITY_NORMAL;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/aeecd3e9/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 4fcfadb..46e501e8 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -57,6 +57,7 @@ import org.apache.brooklyn.core.config.Sanitizer;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
@@ -796,6 +797,7 @@ public class BasicExecutionManager implements ExecutionManager {
.description("Details of the original task have been forgotten.")
.body(Callables.returning((T)null)).build();
((BasicTask<T>)t).ignoreIfNotRun();
+ ((BasicTask<T>)t).cancelled = true;
return t;
}
}
[18/23] brooklyn-server git commit: switch
queue-or-submit-blocking-then-get invocations to new simpler
DynamicTasks.get. but note some things fail getImmediately if they are
running a queueing context eg EffectorSayHiTest, until fixed in the next PR.
Posted by he...@apache.org.
switch queue-or-submit-blocking-then-get invocations to new simpler DynamicTasks.get.
but note some things fail getImmediately if they are running a queueing context eg EffectorSayHiTest,
until fixed in the next PR.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/98888708
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/98888708
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/98888708
Branch: refs/heads/master
Commit: 98888708957d0cea0f6bf219c5ea30b243094138
Parents: 0c2e1f6
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Oct 2 15:43:02 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:18 2017 +0100
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ExecutionContext.java | 16 +++---
.../core/entity/trait/StartableMethods.java | 6 +--
.../core/location/BasicMachineDetails.java | 5 +-
.../location/access/BrooklynAccessUtils.java | 2 +-
.../core/objs/proxy/EntityProxyImpl.java | 2 +-
.../entity/group/DynamicClusterImpl.java | 7 +--
.../brooklyn/util/core/task/DynamicTasks.java | 53 +++++++++++++++-----
.../apache/brooklyn/core/feed/PollerTest.java | 2 +-
.../core/mgmt/rebind/RebindManagerTest.java | 13 +----
.../entity/group/DynamicClusterTest.java | 8 +--
.../SameServerDriverLifecycleEffectorTasks.java | 5 +-
11 files changed, 63 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 3ef3470..2d8fe23 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -101,16 +101,18 @@ public interface ExecutionContext extends Executor {
<T> Maybe<T> getImmediately(Task<T> callableOrSupplierOrTask);
/**
- * Efficient shortcut for {@link #submit(TaskAdaptable)} followed by an immediate {@link Task#get()}.
+ * Efficient implementation of common case when {@link #submit(TaskAdaptable)} is followed by an immediate {@link Task#get()}.
* <p>
- * Implementations will typically attempt to execute in the current thread, with appropriate
- * configuration to make it look like it is in a sub-thread,
- * ie registering this as a task and allowing
- * context methods on tasks to see the given sub-task.
+ * This is efficient in that it may typically attempt to execute in the current thread,
+ * with appropriate configuration to make it look like it is in a sub-thread,
+ * ie registering this as a task and allowing context methods on tasks to see the given sub-task.
+ * However it will normally be non-blocking which reduces overhead and
+ * is permissible within a {@link #getImmediately(Object)} task
* <p>
- * If the argument has already been submitted it simply blocks on it.
+ * If the argument has already been submitted it simply blocks on it
+ * (i.e. no additional execution, and in that case would fail within a {@link #getImmediately(Object)}).
*
- * @param task
+ * @param task the task whose result is being sought
* @return result of the task execution
*/
@Beta
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
index 1883166..30b2348 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
@@ -49,20 +49,20 @@ public class StartableMethods {
/** Common implementation for start in parent nodes; just invokes start on all children of the entity */
public static void start(Entity e, Collection<? extends Location> locations) {
log.debug("Starting entity "+e+" at "+locations);
- DynamicTasks.queueIfPossible(startingChildren(e, locations)).orSubmitAsync(e).getTask().getUnchecked();
+ DynamicTasks.get(startingChildren(e, locations), e);
}
/** Common implementation for stop in parent nodes; just invokes stop on all children of the entity */
public static void stop(Entity e) {
log.debug("Stopping entity "+e);
- DynamicTasks.queueIfPossible(stoppingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
+ DynamicTasks.get(stoppingChildren(e), e);
if (log.isDebugEnabled()) log.debug("Stopped entity "+e);
}
/** Common implementation for restart in parent nodes; just invokes restart on all children of the entity */
public static void restart(Entity e) {
log.debug("Restarting entity "+e);
- DynamicTasks.queueIfPossible(restartingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
+ DynamicTasks.get(restartingChildren(e), e);
if (log.isDebugEnabled()) log.debug("Restarted entity "+e);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
index f8a7cce..7906616 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
@@ -96,10 +96,7 @@ public class BasicMachineDetails implements MachineDetails {
*/
@Beta
public static BasicMachineDetails forSshMachineLocationLive(SshMachineLocation location) {
- return TaskTags.markInessential(DynamicTasks.queueIfPossible(taskForSshMachineLocation(location))
- .orSubmitAsync()
- .asTask())
- .getUnchecked();
+ return DynamicTasks.get(TaskTags.markInessential(taskForSshMachineLocation(location)));
}
/**
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
index 40f71e2..35bc2f8 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
@@ -119,7 +119,7 @@ public class BrooklynAccessUtils {
public static String getResolvedAddress(Entity entity, SshMachineLocation origin, String hostnameTarget) {
ProcessTaskWrapper<Integer> task = SshTasks.newSshExecTaskFactory(origin, "ping -c 1 -t 1 "+hostnameTarget)
.summary("checking resolution of "+hostnameTarget).allowingNonZeroExitCode().newTask();
- DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).asTask().blockUntilEnded();
+ DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).getTask().blockUntilEnded();
if (task.asTask().isError()) {
log.warn("ping could not be run, at "+entity+" / "+origin+": "+Tasks.getError(task.asTask()));
return "";
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
index daaf18b..4b6e704 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
@@ -209,7 +209,7 @@ public class EntityProxyImpl implements java.lang.reflect.InvocationHandler {
TaskAdaptable<?> task = ((EffectorWithBody)eff).getBody().newTask(delegate, eff, ConfigBag.newInstance(parameters));
// as per LocalManagementContext.runAtEntity(Entity entity, TaskAdaptable<T> task)
TaskTags.markInessential(task);
- result = DynamicTasks.queueIfPossible(task.asTask()).orSubmitAsync(delegate).andWaitForSuccess();
+ result = DynamicTasks.get(task.asTask(), delegate);
} else {
result = m.invoke(delegate, nonNullArgs);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 23b48f0..b9a9041 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -827,10 +827,8 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
for (Entity member : removedStartables) {
tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()));
}
- Task<?> invoke = Tasks.parallel(tasks.build());
- DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
try {
- invoke.get();
+ DynamicTasks.get( Tasks.parallel(tasks.build()) );
return removedEntities;
} catch (Exception e) {
throw Exceptions.propagate(e);
@@ -1075,8 +1073,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
try {
if (member instanceof Startable) {
Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
- DynamicTasks.queueIfPossible(task).orSubmitAsync();
- task.getUnchecked();
+ DynamicTasks.get(task);
}
} finally {
Entities.unmanage(member);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 5426baf..20b80ab 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -41,6 +41,16 @@ import com.google.common.collect.Iterables;
/**
* Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks.
+ * <p>
+ * Queueing is supported by some task contexts (eg {@link DynamicSequentialTask}) to let that task
+ * build up a complex sequence of tasks and logic. This utility class gives conveniences to allow:
+ * <p>
+ * <li> "queue-if-possible-else-submit-async", so that it is backgrounded, using queueing semantics if available;
+ * <li> "queue-if-possible-else-submit-blocking", so that it is in the queue if there is one, else it will complete synchronously;
+ * <li> "queue-if-possible-else-submit-and-in-both-cases-block", so that it is returned immediately, but waits in its queue if there is one.
+ * <p>
+ * Over time the last mode has been the most prevalent and {@link #get(TaskAdaptable)} is introduced here
+ * as a convenience. If a timeout is desired then the first should be used.
*
* @since 0.6.0
*/
@@ -126,36 +136,50 @@ public class DynamicTasks {
return false;
}
}
- /** causes the task to be submitted (asynchronously) if it hasn't already been,
- * requiring an entity execution context (will try to find a default if not set) */
+ /** Causes the task to be submitted (asynchronously) if it hasn't already been,
+ * such as if a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context.
+ * <p>
+ * An {@link #executionContext(ExecutionContext)} should typically have been set
+ * (or use {@link #orSubmitAsync(Entity)}).
+ */
public TaskQueueingResult<T> orSubmitAsync() {
orSubmitInternal(false);
return this;
}
- /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
+ /** Convenience for setting {@link #executionContext(Entity)} then {@link #orSubmitAsync()}. */
public TaskQueueingResult<T> orSubmitAsync(Entity entity) {
executionContext(entity);
return orSubmitAsync();
}
- /** causes the task to be submitted *synchronously* if it hasn't already been submitted;
- * useful in contexts such as libraries where callers may be either on a legacy call path
- * (which assumes all commands complete immediately);
- * requiring an entity execution context (will try to find a default if not set) */
+ /** Alternative to {@link #orSubmitAsync()} but where, if the submission is needed
+ * (usually because a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context)
+ * it will wait until execution completes (and in fact will execute the task in this thread,
+ * as per {@link ExecutionContext#get(TaskAdaptable)}.
+ * <p>
+ * If the task is already queued, this method does nothing, not even blocks,
+ * to permit cases where a caller is building up a set of tasks to be executed sequentially:
+ * with a queueing context the caller can line them all up, but without that the caller needs this task
+ * finished before submitting subsequent tasks.
+ * <p>
+ * If blocking is desired in all cases and this call should fail on task failure, invoke {@link #andWaitForSuccess()} on the result,
+ * or consider using {@link DynamicTasks#get(TaskAdaptable)} instead of this method,
+ * or {@link DynamicTasks#get(TaskAdaptable, Entity)} if an execuiton context a la {@link #orSubmitAndBlock(Entity)} is needed. */
public TaskQueueingResult<T> orSubmitAndBlock() {
orSubmitInternal(true);
return this;
}
- /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
+ /** Variant of {@link #orSubmitAndBlock()} doing what {@link #orSubmitAsync(Entity)} does for {@link #orSubmitAsync()}. */
public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) {
executionContext(entity);
return orSubmitAndBlock();
}
- /** blocks for the task to be completed
+ /** Blocks for the task to be completed, throwing if there are any errors
+ * and otherwise returning the value.
* <p>
- * needed in any context where subsequent commands assume the task has completed.
+ * In addition to cases where a result is wanted, this is needed in any context where subsequent commands assume the task has completed.
* not needed in a context where the task is simply being built up and queued.
* <p>
- * throws if there are any errors
+ *
*/
public T andWaitForSuccess() {
return task.getUnchecked();
@@ -170,7 +194,7 @@ public class DynamicTasks {
/**
* Tries to add the task to the current addition context if there is one, otherwise does nothing.
* <p/>
- * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned
+ * Call {@link TaskQueueingResult#orSubmitAsync()} on the returned
* {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a
* {@link BasicExecutionContext}.
*/
@@ -332,6 +356,11 @@ public class DynamicTasks {
public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) {
return queueIfPossible(task).orSubmitAsync(entity).asTask();
}
+
+ /** queues the task if possible and waits for the result, otherwise executes synchronously as per {@link ExecutionContext#get(TaskAdaptable)} */
+ public static <T> T get(TaskAdaptable<T> task, Entity e) {
+ return queueIfPossible(task).orSubmitAndBlock(e).andWaitForSuccess();
+ }
/** Breaks the parent-child relation between Tasks.current() and the task passed,
* making the new task a top-level one at the target entity.
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
index 2251153..f270e54 100644
--- a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
@@ -133,7 +133,7 @@ public class PollerTest extends BrooklynAppUnitTestSupport {
}
})
.build();
- return DynamicTasks.queueIfPossible(t).orSubmitAsync().asTask().getUnchecked();
+ return DynamicTasks.get(t);
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
index 45589b1..25eb00f 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
@@ -20,11 +20,8 @@ package org.apache.brooklyn.core.mgmt.rebind;
import static org.testng.Assert.assertEquals;
-import java.util.concurrent.Callable;
-
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.core.test.entity.TestEntityImpl;
import org.apache.brooklyn.util.core.task.BasicTask;
@@ -48,15 +45,7 @@ public class RebindManagerTest extends RebindTestFixtureWithApp {
@Override
public void rebind() {
super.rebind();
- Task<String> task = new BasicTask<String>(new Callable<String>() {
- @Override public String call() {
- return "abc";
- }});
- String val = DynamicTasks.queueIfPossible(task)
- .orSubmitAsync()
- .asTask()
- .getUnchecked();
- sensors().set(TestEntity.NAME, val);
+ sensors().set(TestEntity.NAME, DynamicTasks.get(new BasicTask<String>(() -> "abc")));
}
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 80f1be6..0827664 100644
--- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -1325,12 +1325,8 @@ public class DynamicClusterTest extends AbstractDynamicClusterOrFabricTest {
.body(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
- DynamicTasks.queueIfPossible(first).orSubmitAsync();
- final Entity source = first.get();
- final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
- DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
- return booleanTask.get();
+ final Entity source = DynamicTasks.get( DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST) );
+ return DynamicTasks.get( DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP) );
}
})
.build();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
index faed3db..52ce82f 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
@@ -111,10 +111,7 @@ public class SameServerDriverLifecycleEffectorTasks extends MachineLifecycleEffe
@Override
protected String startProcessesAtMachine(Supplier<MachineLocation> machineS) {
- DynamicTasks.queueIfPossible(StartableMethods.startingChildren(entity(), machineS.get()))
- .orSubmitAsync(entity())
- .getTask()
- .getUnchecked();
+ DynamicTasks.get(StartableMethods.startingChildren(entity(), machineS.get()), entity());
DynamicTasks.waitForLast();
return "children started";
}
[14/23] brooklyn-server git commit: deprecated since is now 0.13.0
not 0.12.0
Posted by he...@apache.org.
deprecated since is now 0.13.0 not 0.12.0
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/6dfe4987
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/6dfe4987
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/6dfe4987
Branch: refs/heads/master
Commit: 6dfe49875e5e06396a7ff23a2e90cff5741e830a
Parents: 80446ab
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Sep 25 10:41:37 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 25 10:41:37 2017 +0100
----------------------------------------------------------------------
.../main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java | 4 ++--
.../main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java | 4 ++--
.../java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java | 4 ++--
.../org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java | 2 +-
.../apache/brooklyn/util/core/task/BasicExecutionContext.java | 2 +-
5 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index c940ca0..3bf7c73 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -53,12 +53,12 @@ public interface ExecutionContext extends Executor {
<T> Task<T> submit(Map<?,?> properties, Callable<T> callable);
/** {@link ExecutionManager#submit(Runnable)
- * @deprecated since 0.12.0 pass a display name or a more detailed map */
+ * @deprecated since 0.13.0 pass a display name or a more detailed map */
@Deprecated
Task<?> submit(Runnable runnable);
/** {@link ExecutionManager#submit(Callable)
- * @deprecated since 0.12.0 pass a display name or a more detailed map */
+ * @deprecated since 0.13.0 pass a display name or a more detailed map */
@Deprecated
<T> Task<T> submit(Callable<T> callable);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
index d4d1db4..347a127 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
@@ -80,12 +80,12 @@ public interface ExecutionManager {
// public Set<Task<?>> getAllTasks();
/** see {@link #submit(Map, TaskAdaptable)}
- * @deprecated since 0.12.0 pass displayName or map */
+ * @deprecated since 0.13.0 pass displayName or map */
@Deprecated
public Task<?> submit(Runnable r);
/** see {@link #submit(Map, TaskAdaptable)}
- * @deprecated since 0.12.0 pass displayName or map */
+ * @deprecated since 0.13.0 pass displayName or map */
@Deprecated
public <T> Task<T> submit(Callable<T> r);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index a68df32..7d35ac4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -88,7 +88,7 @@ public class BrooklynTaskTags extends TaskTags {
// ------------- entity tags -------------------------
public abstract static class WrappedItem<T> {
- /** @deprecated since 0.12.0 going private; use {@link #getWrappingType()} */
+ /** @deprecated since 0.13.0 going private; use {@link #getWrappingType()} */
@Deprecated
public final String wrappingType;
protected WrappedItem(String wrappingType) {
@@ -117,7 +117,7 @@ public class BrooklynTaskTags extends TaskTags {
}
}
public static class WrappedEntity extends WrappedItem<Entity> {
- /** @deprecated since 0.12.0 going private; use {@link #unwrap()} */
+ /** @deprecated since 0.13.0 going private; use {@link #unwrap()} */
@Deprecated
public final Entity entity;
protected WrappedEntity(String wrappingType, Entity entity) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index 9018d5d..c6cb74c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -89,7 +89,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
@Deprecated
protected Map<String,Object> leftoverProperties = Maps.newLinkedHashMap();
- /** @deprecated since 0.12.0, going private, use {@link #getExecutionContext()} */
+ /** @deprecated since 0.13.0, going private, use {@link #getExecutionContext()} */
@Deprecated
protected transient ExecutionContext execution;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 1236219..33412d8 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -84,7 +84,7 @@ public class BasicExecutionContext extends AbstractExecutionContext {
* Supported flags are {@code tag} and {@code tags}
*
* @see ExecutionManager#submit(Map, TaskAdaptable)
- * @deprecated since 0.12.0 use {@link #BasicExecutionContext(ExecutionManager, Iterable)}
+ * @deprecated since 0.13.0 use {@link #BasicExecutionContext(ExecutionManager, Iterable)}
*/
@Deprecated
public BasicExecutionContext(Map<?, ?> flags, ExecutionManager executionManager) {
[15/23] brooklyn-server git commit: Merge branch 'tasks-better' into
tasks-better-tree
Posted by he...@apache.org.
Merge branch 'tasks-better' into tasks-better-tree
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/d7b086b7
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/d7b086b7
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/d7b086b7
Branch: refs/heads/master
Commit: d7b086b731922dcce0f2d9a367d924e842c22047
Parents: 6dfe498 afc17ee
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Thu Sep 28 17:28:45 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Thu Sep 28 17:28:45 2017 +0100
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ExecutionContext.java | 8 ++++-
.../util/core/task/BasicExecutionContext.java | 28 +++++++++++------
.../util/core/task/BasicExecutionManager.java | 11 ++++---
.../brooklyn/util/core/task/BasicTask.java | 23 +++++---------
.../brooklyn/util/core/task/ForwardingTask.java | 4 +--
.../brooklyn/util/core/task/TaskInternal.java | 10 +++---
.../task/TaskInternalCancellableWithMode.java | 32 ++++++++++++++++++++
.../brooklyn/util/core/task/ValueResolver.java | 8 +++--
.../core/effector/EffectorSayHiTest.java | 1 +
.../util/core/task/BasicTasksFutureTest.java | 23 +++++++-------
.../core/task/NonBasicTaskExecutionTest.java | 8 +----
.../org/apache/brooklyn/util/guava/Maybe.java | 2 +-
.../apache/brooklyn/util/repeat/Repeater.java | 7 +++--
13 files changed, 102 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d7b086b7/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d7b086b7/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d7b086b7/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
[16/23] brooklyn-server git commit: change when cancellation is done
for getImmediate - means effector invocation now works
Posted by he...@apache.org.
change when cancellation is done for getImmediate - means effector invocation now works
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/bb26d320
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/bb26d320
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/bb26d320
Branch: refs/heads/master
Commit: bb26d320906ad86888c28cc32b2fa36709e70d1a
Parents: d7b086b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Thu Sep 28 16:55:33 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Thu Sep 28 17:29:59 2017 +0100
----------------------------------------------------------------------
.../util/core/task/BasicExecutionContext.java | 18 +++++++-----------
.../brooklyn/core/effector/EffectorSayHiTest.java | 6 +++---
2 files changed, 10 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/bb26d320/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index f9f431a..2c8ddab 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -287,17 +287,6 @@ public class BasicExecutionContext extends AbstractExecutionContext {
try {
return runInSameThread(fakeTaskForContext, new Callable<Maybe<T>>() {
public Maybe<T> call() {
- // could try to make this work for more types of tasks by not cancelling, just interrupting;
- // however there is a danger that immediate-submission tasks are leaked if we don't cancel.
- // for instance with DSTs the thread interrupt may apply only to the main job queue.andWait blocking,
- // leaving other tasks leaked.
- //
- // this method is best-effort so fine if it doesn't succeed. good if we can expand
- // coverage but NOT at the expense of major leaks of course!
- //
- // see WIP test in EffectorSayHiTest
- fakeTaskForContext.cancel();
-
boolean wasAlreadyInterrupted = Thread.interrupted();
try {
return job.getImmediately();
@@ -305,6 +294,13 @@ public class BasicExecutionContext extends AbstractExecutionContext {
if (wasAlreadyInterrupted) {
Thread.currentThread().interrupt();
}
+ // we've acknowledged that getImmediate may wreck (cancel) the task,
+ // their first priority is to prevent them from leaking;
+ // however previously we did the cancel before running,
+ // doing it after means more tasks successfully execute
+ // (the interrupt is sufficient to prevent them blocking);
+ // see test EffectorSayHiTest.testInvocationGetImmediately
+ fakeTaskForContext.cancel();
}
} });
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/bb26d320/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
index bc41d45..e7dc626 100644
--- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
@@ -61,6 +61,7 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
//TODO test edge/error conditions
//(missing parameters, wrong number of params, etc)
+ @SuppressWarnings("unused")
private static final Logger log = LoggerFactory.getLogger(EffectorSayHiTest.class);
private MyEntity e;
@@ -109,11 +110,10 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
.get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
}
- @Test(groups="WIP") // see comments at BasicExecutionContext.getImmediately
- // TODO this will be fixed soon by #835
+ @Test(invocationCount=100)
public void testInvocationGetImmediately() throws Exception {
assertEquals(((EntityInternal)e).getExecutionContext()
- .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
+ .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ).get(), "hi Bob");
}
@Test
[19/23] brooklyn-server git commit: Tasks.tryQueueing won't queue if
calling thread is interrupted
Posted by he...@apache.org.
Tasks.tryQueueing won't queue if calling thread is interrupted
means more things work in immediate mode.
could be un-done if DynamicSequentialTask removes the DST manager thread.
fixes a few failing tests in the previous commit.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/8b727697
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/8b727697
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/8b727697
Branch: refs/heads/master
Commit: 8b7276976721ef440c858ee8253227169c32f5e0
Parents: 9888870
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 10:57:17 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:19 2017 +0100
----------------------------------------------------------------------
.../brooklyn/util/core/task/DynamicTasks.java | 21 ++++++--------------
.../apache/brooklyn/util/core/task/Tasks.java | 6 +++---
2 files changed, 9 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8b727697/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 20b80ab..9026798 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -199,11 +199,7 @@ public class DynamicTasks {
* {@link BasicExecutionContext}.
*/
public static <T> TaskQueueingResult<T> queueIfPossible(TaskAdaptable<T> task) {
- TaskQueueingContext adder = getTaskQueuingContext();
- boolean result = false;
- if (adder!=null)
- result = Tasks.tryQueueing(adder, task);
- return new TaskQueueingResult<T>(task, result);
+ return new TaskQueueingResult<T>(task, Tasks.tryQueueing(getTaskQueuingContext(), task));
}
/** @see #queueIfPossible(TaskAdaptable) */
@@ -214,22 +210,17 @@ public class DynamicTasks {
/** adds the given task to the nearest task addition context,
* either set as a thread-local, or in the current task, or the submitter of the task, etc
* <p>
- * throws if it cannot add */
+ * throws if it cannot add or addition/execution would fail including if calling thread is interrupted */
public static <T> Task<T> queueInTaskHierarchy(Task<T> task) {
Preconditions.checkNotNull(task, "Task to queue cannot be null");
Preconditions.checkState(!Tasks.isQueuedOrSubmitted(task), "Task to queue must not yet be submitted: {}", task);
- TaskQueueingContext adder = getTaskQueuingContext();
- if (adder!=null) {
- if (Tasks.tryQueueing(adder, task)) {
- log.debug("Queued task {} at context {} (no hierarchy)", task, adder);
- return task;
- }
+ if (Tasks.tryQueueing(getTaskQueuingContext(), task)) {
+ log.debug("Queued task {} at context {} (no hierarchy)", task, getTaskQueuingContext());
+ return task;
}
- Task<?> t = Tasks.current();
- Preconditions.checkState(t!=null || adder!=null, "No task addition context available for queueing task "+task);
-
+ Task<?> t = Tasks.current();
while (t!=null) {
if (t instanceof TaskQueueingContext) {
if (Tasks.tryQueueing((TaskQueueingContext)t, task)) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8b727697/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
index 12247c4..90a6bdc 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
@@ -278,11 +278,11 @@ public class Tasks {
}
/**
- * Adds the given task to the given context. Does not throw an exception if the addition fails.
- * @return true if the task was added, false otherwise.
+ * Adds the given task to the given context. Does not throw an exception if the addition fails or would fail.
+ * @return true if the task was added, false otherwise including if context is null or thread is interrupted.
*/
public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable<?> task) {
- if (task==null || isQueued(task))
+ if (task==null || adder==null || isQueued(task) || Thread.currentThread().isInterrupted())
return false;
try {
adder.queue(task.asTask());
[20/23] brooklyn-server git commit: more assertion routines,
map equality and unordered iterable equality
Posted by he...@apache.org.
more assertion routines, map equality and unordered iterable equality
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/508183b1
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/508183b1
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/508183b1
Branch: refs/heads/master
Commit: 508183b1b85f7ee118d948739e4cc72121623a85
Parents: 8b72769
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 11:08:51 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:39 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/brooklyn/test/Asserts.java | 20 ++++++++++++++++++--
.../org/apache/brooklyn/test/AssertsTest.java | 17 +++++++++++++++++
2 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/508183b1/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index dc672b5..3cd70b1 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -1452,8 +1452,24 @@ public class Asserts {
}
public static void assertSize(Iterable<?> list, int expectedSize) {
- if (list==null) fail("List is null");
- if (Iterables.size(list)!=expectedSize) fail("List has wrong size "+Iterables.size(list)+" (expected "+expectedSize+"): "+list);
+ if (list==null) fail("Collection is null");
+ if (Iterables.size(list)!=expectedSize) fail("Collection has wrong size "+Iterables.size(list)+" (expected "+expectedSize+"): "+list);
+ }
+
+ public static void assertSize(Map<?,?> map, int expectedSize) {
+ if (map==null) fail("Map is null");
+ if (Iterables.size(map.keySet())!=expectedSize) fail("Map has wrong size "+map.size()+" (expected "+expectedSize+"): "+map);
+ }
+
+ /** Ignores duplicates and order */
+ public static void assertSameUnorderedContents(Iterable<?> i1, Iterable<?> i2) {
+ if (i1==null || i2==null) {
+ if (i1==null && i2==null) {
+ return ;
+ }
+ fail("Collections differ in that one is null: "+i1+" and "+i2);
+ }
+ assertEquals(MutableSet.copyOf(i1), MutableSet.copyOf(i2));
}
public static void assertInstanceOf(Object obj, Class<?> type) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/508183b1/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java
----------------------------------------------------------------------
diff --git a/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java b/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java
index bd411b7..1105acd 100644
--- a/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java
+++ b/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java
@@ -25,7 +25,9 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.brooklyn.test.Asserts.ShouldHaveFailedPreviouslyAssertionError;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
import org.testng.Assert;
@@ -170,4 +172,19 @@ public class AssertsTest {
// check code flowed the way we expected
Asserts.assertEquals(reached, 3);
}
+
+ @Test
+ public void testAssertSize() {
+ Asserts.assertSize(MutableList.of("x", "x", "y"), 3);
+ Asserts.assertSize(MutableSet.of("x", "x", "y"), 2);
+ Asserts.assertSize(MutableMap.of("x", "x", "y", "y"), 2);
+ }
+
+ @Test
+ public void testAssertSetListEqualityAndSameUnoderderedContents() {
+ Assert.assertEquals(MutableSet.of("x", "x", "y"), MutableSet.of("x", "y", "x"));
+ Assert.assertNotEquals(MutableList.of("x", "x", "y"), MutableList.of("x", "y", "x"));
+ // above are baseline checks
+ Asserts.assertSameUnorderedContents(MutableList.of("x", "x", "y"), MutableList.of("y", "x"));
+ }
}
[21/23] brooklyn-server git commit: fix message publish synching to
guarantee in-order delivery
Posted by he...@apache.org.
fix message publish synching to guarantee in-order delivery
both for initial subscriptions and in-life - two changes, synching in AttributesInternal for the in-life delivery, and in subscribe/publish for the initial
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/9213f0e2
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/9213f0e2
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/9213f0e2
Branch: refs/heads/master
Commit: 9213f0e25288114718f2a06a50ee206bf0120561
Parents: 508183b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 13:49:42 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:39 2017 +0100
----------------------------------------------------------------------
.../brooklyn/api/mgmt/SubscriptionManager.java | 6 +-
.../brooklyn/core/entity/AbstractEntity.java | 1 +
.../mgmt/internal/LocalSubscriptionManager.java | 108 +++++++++++++------
.../brooklyn/core/sensor/AttributeMap.java | 15 ++-
.../core/effector/EffectorSayHiTest.java | 2 +-
.../core/entity/EntitySubscriptionTest.java | 9 +-
.../internal/LocalSubscriptionManagerTest.java | 91 ++++++++++++++++
.../policy/basic/PolicySubscriptionTest.java | 35 ++++--
8 files changed, 224 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
index 1fa327e..8302ba8 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
@@ -42,8 +42,10 @@ public interface SubscriptionManager {
*
* The method returns an id which can be used to {@link #unsubscribe(SubscriptionHandle)} later.
* <p>
- * The listener callback is in-order single-threaded and synchronized on this object. The flags
- * parameters can include the following:
+ * The listener callback is in-order single-threaded and synchronized on this object.
+ * In other words message delivery from a producer to a given subscriber is in publish order
+ * (or in the case of a late subscriber getting initial values, in subscribe order).
+ * The flags parameters can include the following:
* <ul>
* <li>subscriber - object to identify the subscriber (e.g. entity, or console session uid)
* <li><i>in future</i> - control parameters for the subscription (period, minimum delta for updates, etc)
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index bd7df06..dfbbc8f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -529,6 +529,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
* through this method. Internally, all attribute updates synch on this object. Code wishing to
* update attributes or publish while holding some other lock should acquire the monitor on this
* object first to prevent deadlock. */
+ @Beta
protected Object getAttributesSynchObjectInternal() {
return attributesInternal.getSynchObjectInternal();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index a927a89..a726059 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -54,6 +54,7 @@ import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
@@ -129,15 +130,9 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[] {s.id, s.subscriber, producer, sensor, this});
allSubscriptions.put(s.id, s);
- addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s);
- if (s.subscriber!=null) {
- addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s);
- }
- if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) {
- ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
- }
-
+ T lastVal;
if (notifyOfInitialValue) {
+ notifyOfInitialValue = false;
if (producer == null) {
LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer: "+s);
} else if (sensor == null) {
@@ -145,16 +140,58 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
} else if (!(sensor instanceof AttributeSensor)) {
LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
} else {
- if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
- em.submit(
- MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer), BrooklynTaskTags.SENSOR_TAG),
- "displayName", "Initial publication of "+s.sensor.getName()),
- () -> {
- T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
- submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
- });
+ notifyOfInitialValue = true;
}
}
+ if (notifyOfInitialValue) {
+ lastVal = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor);
+ } else {
+ lastVal = null; // won't be used
+ }
+ addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s);
+ if (s.subscriber!=null) {
+ addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s);
+ }
+ if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) {
+ ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
+ }
+
+ if (notifyOfInitialValue) {
+ if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
+ // this is run asynchronously to prevent deadlock when trying to get attribute and publish;
+ // however we want it:
+ // (a) to run in the same order as subscriptions are made, so use the manager tag scheduler
+ // (b) ideally to use the last value that was not published to this target, and
+ // (c) to deliver before any subsequent value notification
+ // but we can't guarantee either (b) or (c) without taking a lock from before we added
+ // the subscriber above, mutexing other sets and publications, which feels heavy and dangerous.
+ // so the compromise is to skip this delivery in cases where the last value has obviously changed -
+ // because a more recent notification is guaranteed to be sent.
+ // we may occasionally still send a duplicate, if delivery got sent in the tiny
+ // window between adding the subscription and taking the last value,
+ // we will think the last value hasn't changed. but we will never send a
+ // wrong value as this backs out if there is any confusion over the last value.
+ em.submit(
+ MutableMap.of("tags", getPublishTags(s, s.producer),
+ "displayName", "Initial value publication on subscription to "+s.sensor.getName()),
+ () -> {
+ T val = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor);
+ if (!Objects.equal(lastVal, val)) {
+ // bail out - value has been changed;
+ // this might be a duplicate if value changed in small window earlier,
+ // but it won't be delivering an old value later than a newer value
+ if (LOG.isDebugEnabled()) LOG.debug("skipping initial value delivery of {} -> {} to {} as value changed from {} to {}", new Object[] {s.producer, s.sensor, s, lastVal, val});
+ return;
+ }
+ // guard against case where other thread changes the val and publish
+ // while we are publishing, and our older val is then delivered after theirs.
+ // 2017-10 previously we did not do this, then looked at doing it with the attribute lock object
+ // synchronized (((AbstractEntity)s.producer).getAttributesSynchObjectInternal()) {
+ // but realized a better thing is to have initial delivery _done_, not just submitted,
+ // by ourselves, as we are already in the right thread now and can prevent interleaving this way
+ submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
+ });
+ }
return s;
}
@@ -210,7 +247,6 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
// (recommend exactly one per subscription to prevent deadlock)
// this is done with:
// em.setTaskSchedulerForTag(subscriberId, SingleThreadedScheduler.class);
-
//note, generating the notifications must be done in the calling thread to preserve order
//e.g. emit(A); emit(B); should cause onEvent(A); onEvent(B) in that order
if (LOG.isTraceEnabled()) LOG.trace("{} got event {}", this, event);
@@ -228,18 +264,11 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void submitPublishEvent(final Subscription s, final SensorEvent<?> event, final boolean isInitial) {
+ private void submitPublishEvent(final Subscription s, final SensorEvent<?> event, final boolean isInitialPublicationOfOldValueInCorrectScheduledThread) {
if (s.eventFilter!=null && !s.eventFilter.apply(event))
return;
- List<Object> tags = MutableList.builder()
- .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
- .add(s.subscriberExecutionManagerTag)
- .add(BrooklynTaskTags.SENSOR_TAG)
- // associate the publish event with the publisher (though on init it might be triggered by subscriber)
- .addIfNotNull(event.getSource()!=null ? BrooklynTaskTags.tagForTargetEntity(event.getSource()) : null)
- .build()
- .asUnmodifiable();
+ List<Object> tags = getPublishTags(s, event.getSource()).asUnmodifiable();
StringBuilder name = new StringBuilder("sensor ");
StringBuilder description = new StringBuilder("Sensor ");
@@ -271,12 +300,12 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
"displayName", name.toString(),
"description", description.toString());
- boolean isEntityStarting = s.subscriber instanceof Entity && isInitial;
+ boolean isEntityStarting = s.subscriber instanceof Entity && isInitialPublicationOfOldValueInCorrectScheduledThread;
// will have entity (and adjunct) execution context from tags, so can skip getting exec context
- em.submit(execFlags, new Runnable() {
+ Runnable deliverer = new Runnable() {
@Override
public String toString() {
- if (isInitial) {
+ if (isInitialPublicationOfOldValueInCorrectScheduledThread) {
return "LSM.publishInitial("+event+")";
} else {
return "LSM.publish("+event+")";
@@ -312,7 +341,26 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
LOG.warn("Error processing subscriptions to "+this+": "+t, t);
}
}
- }});
+ }};
+ if (!isInitialPublicationOfOldValueInCorrectScheduledThread) {
+ em.submit(execFlags, deliverer);
+ } else {
+ // for initial, caller guarantees he is running in the right thread/context
+ // where the above submission would take place, typically the
+ // subscriber single threaded executor with the entity context;
+ // this allows caller to do extra assertions and bailout steps at the right time
+ deliverer.run();
+ }
+ }
+
+ private MutableList<Object> getPublishTags(final Subscription<?> s, final Entity source) {
+ return MutableList.builder()
+ .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+ .add(s.subscriberExecutionManagerTag)
+ .add(BrooklynTaskTags.SENSOR_TAG)
+ // associate the publish event with the publisher (though on init it might be triggered by subscriber)
+ .addIfNotNull(source!=null ? BrooklynTaskTags.tagForTargetEntity(source) : null)
+ .build();
}
protected boolean includeDescriptionForSensorTask(SensorEvent<?> event) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
index dee0700..d9da6ae 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
@@ -139,9 +139,18 @@ public final class AttributeMap {
}
public <T> T update(AttributeSensor<T> attribute, T newValue) {
- T oldValue = updateWithoutPublishing(attribute, newValue);
- entity.emitInternal(attribute, newValue);
- return oldValue;
+ // 2017-10 this was unsynched which meant if two threads updated
+ // the last publication would not correspond to the last value.
+ // could introduce deadlock but emit internal and publish should
+ // not seek any locks. _subscribe_ and _delivery_ might, but they
+ // won't be in this block. an issue with _subscribe-and-get-initial_
+ // should be resolved by initial subscription queueing the publication
+ // to a context where locks are not held.
+ synchronized (values) {
+ T oldValue = updateWithoutPublishing(attribute, newValue);
+ entity.emitInternal(attribute, newValue);
+ return oldValue;
+ }
}
public <T> T updateWithoutPublishing(AttributeSensor<T> attribute, T newValue) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
index e7dc626..4b991ad 100644
--- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
@@ -110,7 +110,7 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
.get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
}
- @Test(invocationCount=100)
+ @Test
public void testInvocationGetImmediately() throws Exception {
assertEquals(((EntityInternal)e).getExecutionContext()
.getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ).get(), "hi Bob");
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index ea91f36..8ec2794 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -34,6 +34,8 @@ import org.apache.brooklyn.core.test.policy.TestEnricher;
import org.apache.brooklyn.core.test.policy.TestPolicy;
import org.apache.brooklyn.entity.group.BasicGroup;
import org.apache.brooklyn.test.Asserts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -44,7 +46,9 @@ import com.google.common.collect.Iterables;
public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
- // TODO Duplication between this and PolicySubscriptionTest
+ // TODO Duplication between this and PolicySubscriptionTest and LocalSubscriptionManagerTest
+
+ private static final Logger log = LoggerFactory.getLogger(EntitySubscriptionTest.class);
private static final long SHORT_WAIT_MS = 100;
@@ -221,6 +225,7 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
}
@Test
+ @SuppressWarnings("unused")
public void testUnsubscribeUsingHandleStopsEvents() {
SubscriptionHandle handle1 = entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
SubscriptionHandle handle2 = entity.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
@@ -300,6 +305,8 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
@Test
public void testContextEntityOnSubscriptionCallbackTask() {
+ log.info("Observing "+observedEntity+" from "+entity);
+
observedEntity.sensors().set(TestEntity.NAME, "myval");
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
index 4e47090..1373545 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.brooklyn.api.entity.EntitySpec;
@@ -32,12 +33,22 @@ import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.SubscriptionManager;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
/**
* testing the {@link SubscriptionManager} and associated classes.
*/
@@ -170,4 +181,84 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
if (threadException.get() != null) throw threadException.get();
}
+ @Test
+ // same test as in PolicySubscriptionTest, but for entities / simpler
+ public void testSubscriptionReceivesInitialValueEventsInOrder() {
+ RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+
+ entity.sensors().set(TestEntity.NAME, "myname");
+ entity.sensors().set(TestEntity.SEQUENCE, 123);
+ entity.sensors().emit(TestEntity.MY_NOTIF, -1);
+
+ // delivery should be in subscription order, so 123 then 456
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
+ // wait for the above delivery - otherwise it might get dropped
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
+ Asserts.assertSize(listener.getEvents(), 1); });
+ entity.sensors().set(TestEntity.SEQUENCE, 456);
+
+ // notifications don't have "initial value" so don't get -1
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener);
+ // but do get 1, after 456
+ entity.sensors().emit(TestEntity.MY_NOTIF, 1);
+
+ // STOPPING and myname received, in subscription order, after everything else
+ entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener);
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() {
+ @Override public void run() {
+ Asserts.assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456),
+ new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity, 1),
+ new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING),
+ new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
+ "actually got: "+listener.getEvents());
+ }});
+ }
+
+ @Test
+ public void testNotificationOrderMatchesSetValueOrderWhenSynched() {
+ RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+
+ AtomicInteger count = new AtomicInteger();
+ Runnable set = () -> {
+ synchronized (count) {
+ entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet());
+ }
+ };
+ entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener);
+ for (int i=0; i<10; i++) {
+ new Thread(set).start();
+ }
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
+ Asserts.assertSize(listener.getEvents(), 10); });
+ for (int i=0; i<10; i++) {
+ Assert.assertEquals(listener.getEvents().get(i).getValue(), i+1);
+ }
+ }
+
+ @Test
+ public void testNotificationOrderMatchesSetValueOrderWhenNotSynched() {
+ RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+
+ AtomicInteger count = new AtomicInteger();
+ Runnable set = () -> {
+ // as this is not synched, the sets may interleave
+ entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet());
+ };
+ entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener);
+ for (int i=0; i<10; i++) {
+ new Thread(set).start();
+ }
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
+ Asserts.assertSize(listener.getEvents(), 10); });
+ // all we expect for sure is that the last value is whatever the sensor is at the end - internal update and publish is mutexed
+ Assert.assertEquals(listener.getEvents().get(9).getValue(), entity.sensors().get(TestEntity.SEQUENCE));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
index 0f3310e..4d733e6 100644
--- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
@@ -24,12 +24,15 @@ import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.policy.PolicySpec;
import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.location.SimulatedLocation;
import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.core.sensor.BasicSensorEvent;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -102,6 +105,7 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
}
@Test
+ @SuppressWarnings("unused")
public void testUnsubscribeUsingHandleStopsEvents() throws Exception {
SubscriptionHandle handle1 = policy.subscriptions().subscribe(entity, TestEntity.SEQUENCE, listener);
SubscriptionHandle handle2 = policy.subscriptions().subscribe(entity, TestEntity.NAME, listener);
@@ -122,18 +126,37 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
}
@Test
- public void testSubscriptionReceivesInitialValueEvents() {
- entity.sensors().set(TestEntity.SEQUENCE, 123);
+ public void testSubscriptionReceivesInitialValueEventsInOrder() {
entity.sensors().set(TestEntity.NAME, "myname");
-
+ entity.sensors().set(TestEntity.SEQUENCE, 123);
+ entity.sensors().emit(TestEntity.MY_NOTIF, -1);
+
+ // delivery should be in subscription order, so 123 then 456
policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
+ // wait for the above delivery - otherwise it might get dropped
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
+ Asserts.assertSize(listener.getEvents(), 1); });
+ entity.sensors().set(TestEntity.SEQUENCE, 456);
+
+ // notifications don't have "initial value" so don't get -1
+ policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener);
+ // but do get 1, after 456
+ entity.sensors().emit(TestEntity.MY_NOTIF, 1);
+
+ // STOPPING and myname received, in subscription order, after everything else
+ entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+ policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener);
policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
- Asserts.succeedsEventually(new Runnable() {
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() {
@Override public void run() {
assertEquals(listener.getEvents(), ImmutableList.of(
new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
- new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")));
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456),
+ new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity, 1),
+ new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING),
+ new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
+ "actually got: "+listener.getEvents());
}});
}
@@ -147,7 +170,7 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
@Override public void run() {
- assertEquals(listener.getEvents(), ImmutableList.of());
+ Asserts.assertSize(listener.getEvents(), 0);
}});
}
[03/23] brooklyn-server git commit: task visibility: validation of
config
Posted by he...@apache.org.
task visibility: validation of config
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/37b6b114
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/37b6b114
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/37b6b114
Branch: refs/heads/master
Commit: 37b6b11452b9619decde71376f95b73eab0836b4
Parents: 2bdcf1a
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 12 11:49:51 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:54 2017 +0100
----------------------------------------------------------------------
.../apache/brooklyn/core/config/ConfigConstraints.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/37b6b114/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
index 2d0cf7d..b891c2b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
+++ b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
@@ -28,9 +28,11 @@ import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
import org.apache.brooklyn.core.objs.BrooklynObjectPredicate;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.guava.Maybe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,12 +114,17 @@ public abstract class ConfigConstraints<T extends BrooklynObject> {
abstract Iterable<ConfigKey<?>> getBrooklynObjectTypeConfigKeys();
public Iterable<ConfigKey<?>> getViolations() {
- // TODO in new task
- return validateAll();
+ if (getBrooklynObject() instanceof EntityInternal) {
+ return ((EntityInternal) getBrooklynObject()).getExecutionContext().get(
+ Tasks.<Iterable<ConfigKey<?>>>builder().dynamic(false).displayName("Validating config").body(
+ () -> validateAll() ).build() );
+ } else {
+ return validateAll();
+ }
}
@SuppressWarnings("unchecked")
- private Iterable<ConfigKey<?>> validateAll() {
+ protected Iterable<ConfigKey<?>> validateAll() {
List<ConfigKey<?>> violating = Lists.newLinkedList();
Iterable<ConfigKey<?>> configKeys = getBrooklynObjectTypeConfigKeys();
LOG.trace("Checking config keys on {}: {}", getBrooklynObject(), configKeys);
[23/23] brooklyn-server git commit: This closes #835
Posted by he...@apache.org.
This closes #835
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/a73ee172
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/a73ee172
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/a73ee172
Branch: refs/heads/master
Commit: a73ee172847766be69c9ffd6128e2d9679bc3a61
Parents: 54f9c70 2dcb0a0
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Oct 6 09:06:06 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 6 09:06:06 2017 +0100
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ExecutionContext.java | 33 +++--
.../brooklyn/api/mgmt/ExecutionManager.java | 14 ++-
.../brooklyn/api/mgmt/ManagementContext.java | 7 ++
.../brooklyn/api/mgmt/SubscriptionManager.java | 6 +-
.../camp/brooklyn/ConfigParametersYamlTest.java | 13 +-
.../brooklyn/camp/brooklyn/spi/dsl/DslTest.java | 4 +-
.../brooklyn/core/config/ConfigConstraints.java | 18 ++-
.../brooklyn/core/entity/AbstractEntity.java | 1 +
.../core/entity/trait/StartableMethods.java | 6 +-
.../org/apache/brooklyn/core/feed/Poller.java | 2 +-
.../core/location/BasicMachineDetails.java | 5 +-
.../location/access/BrooklynAccessUtils.java | 2 +-
.../brooklyn/core/mgmt/BrooklynTaskTags.java | 64 ++++++++--
.../core/mgmt/EntityManagementUtils.java | 8 +-
.../internal/AbstractManagementContext.java | 25 ++++
.../internal/AbstractSubscriptionManager.java | 5 +
.../mgmt/internal/BrooklynGarbageCollector.java | 93 +++++++++-----
.../mgmt/internal/EntityManagementSupport.java | 41 ++++---
.../mgmt/internal/LocalSubscriptionManager.java | 123 +++++++++++++++----
.../NonDeploymentManagementContext.java | 29 ++++-
.../internal/QueueingSubscriptionManager.java | 5 +-
.../core/mgmt/internal/Subscription.java | 1 +
.../core/mgmt/rebind/RebindManagerImpl.java | 11 +-
.../AbstractConfigurationSupportInternal.java | 12 +-
.../core/objs/AbstractEntityAdjunct.java | 28 ++++-
.../brooklyn/core/objs/AdjunctConfigMap.java | 4 +-
.../core/objs/BrooklynObjectInternal.java | 2 +
.../core/objs/proxy/EntityProxyImpl.java | 2 +-
.../core/objs/proxy/InternalEntityFactory.java | 4 +-
.../brooklyn/core/sensor/AttributeMap.java | 15 ++-
.../entity/group/DynamicClusterImpl.java | 7 +-
.../SshCommandMembershipTrackingPolicy.java | 2 +-
.../apache/brooklyn/feed/shell/ShellFeed.java | 2 +-
.../core/task/AbstractExecutionContext.java | 10 +-
.../util/core/task/BasicExecutionContext.java | 30 ++---
.../util/core/task/BasicExecutionManager.java | 13 +-
.../brooklyn/util/core/task/DynamicTasks.java | 90 ++++++++------
.../apache/brooklyn/util/core/task/Tasks.java | 6 +-
.../core/config/DeferredConfigTest.java | 8 +-
.../core/effector/EffectorSayHiTest.java | 6 +-
.../entity/ApplicationLifecycleStateTest.java | 18 +--
.../brooklyn/core/entity/EntityAssertsTest.java | 93 +++-----------
.../core/entity/EntitySubscriptionTest.java | 9 +-
.../core/entity/hello/LocalEntitiesTest.java | 20 ++-
.../apache/brooklyn/core/feed/PollerTest.java | 2 +-
.../internal/EntityExecutionManagerTest.java | 48 ++++++--
.../internal/LocalSubscriptionManagerTest.java | 89 ++++++++++++++
.../mgmt/persist/XmlMementoSerializerTest.java | 2 +-
.../core/mgmt/rebind/RebindFeedWithHaTest.java | 8 +-
.../core/mgmt/rebind/RebindManagerTest.java | 13 +-
.../policy/basic/PolicySubscriptionTest.java | 35 +++++-
.../qa/performance/TaskPerformanceTest.java | 6 +-
.../entity/group/DynamicClusterTest.java | 8 +-
.../ssh/SshMachineLocationIntegrationTest.java | 6 +-
.../location/ssh/SshMachineLocationTest.java | 12 +-
.../util/core/task/TaskPredicatesTest.java | 5 +-
.../brooklyn/util/core/task/TasksTest.java | 4 +-
.../util/core/task/ValueResolverTest.java | 93 +++++++++++---
.../policy/jclouds/os/CreateUserPolicy.java | 6 +-
.../policy/ha/AbstractFailureDetector.java | 16 ++-
.../policy/ha/ServiceFailureDetector.java | 7 +-
.../brooklyn/policy/ha/ServiceReplacer.java | 15 +--
.../brooklyn/policy/ha/ServiceRestarter.java | 7 +-
.../rest/resources/SensorResourceTest.java | 2 +-
.../entity/brooklynnode/BrooklynNodeImpl.java | 12 +-
.../SameServerDriverLifecycleEffectorTasks.java | 5 +-
.../windows/WindowsPerformanceCounterFeed.java | 21 ++--
.../java/org/apache/brooklyn/test/Asserts.java | 20 ++-
.../org/apache/brooklyn/test/AssertsTest.java | 17 +++
69 files changed, 892 insertions(+), 464 deletions(-)
----------------------------------------------------------------------
[12/23] brooklyn-server git commit: fix tests that asserted specific
tasks (as there are now more)
Posted by he...@apache.org.
fix tests that asserted specific tasks (as there are now more)
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/130a29b9
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/130a29b9
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/130a29b9
Branch: refs/heads/master
Commit: 130a29b906da83f108ae1ea6cebef94c0c5f6a6f
Parents: 79cc9bc
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Sep 20 13:37:21 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Sep 20 13:37:21 2017 +0100
----------------------------------------------------------------------
.../mgmt/internal/LocalSubscriptionManager.java | 2 +-
.../core/entity/hello/LocalEntitiesTest.java | 20 +++++++++-----------
.../internal/EntityExecutionManagerTest.java | 15 ++++++++++++---
.../core/mgmt/rebind/RebindFeedWithHaTest.java | 9 +++++++--
4 files changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/130a29b9/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 6b1a5d9..a927a89 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -147,7 +147,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
} else {
if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
em.submit(
- MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer)),
+ MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer), BrooklynTaskTags.SENSOR_TAG),
"displayName", "Initial publication of "+s.sensor.getName()),
() -> {
T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/130a29b9/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java
index 985997e..6951a97 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java
@@ -33,20 +33,19 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.EntityManager;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.location.SimulatedLocation;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.time.Time;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
@@ -63,14 +62,12 @@ public class LocalEntitiesTest extends BrooklynAppUnitTestSupport {
public static final Logger log = LoggerFactory.getLogger(LocalEntitiesTest.class);
private SimulatedLocation loc;
- private EntityManager entityManager;
@BeforeMethod(alwaysRun=true)
@Override
public void setUp() throws Exception {
super.setUp();
loc = new SimulatedLocation();
- entityManager = mgmt.getEntityManager();
}
@Test
@@ -165,9 +162,10 @@ public class LocalEntitiesTest extends BrooklynAppUnitTestSupport {
h.setAge(6);
long totalTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
- // TODO guava util for (1..5)
- Asserts.continually(MutableMap.of("timeout", 50), Suppliers.ofInstance(data), Predicates.<Object>equalTo(ImmutableList.of(1,2,3,4,5)));
- assertTrue(totalTime < 2000, "totalTime="+totalTime); //shouldn't have blocked for anywhere close to 2s (Aled says TODO: too time sensitive for BuildHive?)
+ Asserts.continually(
+ Suppliers.ofInstance(data), Predicates.<Object>equalTo(ImmutableList.of(1,2,3,4,5)),
+ Duration.millis(50), null, null);
+ assertTrue(totalTime < 2000, "totalTime="+totalTime); //shouldn't have blocked for anywhere close to 2s (unless build machine v v slow eg BuildHive)
}
@Test
@@ -224,8 +222,8 @@ public class LocalEntitiesTest extends BrooklynAppUnitTestSupport {
assertTrue(System.currentTimeMillis() - startTime < 1500);
synchronized (sonsConfig) {
assertEquals(null, sonsConfig[0]);
- for (Task tt : ((EntityInternal)dad).getExecutionContext().getTasks()) { log.info("task at dad: {}, {}", tt, tt.getStatusDetail(false)); }
- for (Task tt : ((EntityInternal)son).getExecutionContext().getTasks()) { log.info("task at son: {}, {}", tt, tt.getStatusDetail(false)); }
+ for (Task<?> tt : ((EntityInternal)dad).getExecutionContext().getTasks()) { log.info("task at dad: {}, {}", tt, tt.getStatusDetail(false)); }
+ for (Task<?> tt : ((EntityInternal)son).getExecutionContext().getTasks()) { log.info("task at son: {}, {}", tt, tt.getStatusDetail(false)); }
dad.sensors().set(HelloEntity.FAVOURITE_NAME, "Dan");
if (!s1.tryAcquire(2, TimeUnit.SECONDS)) fail("race mismatch, missing permits");
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/130a29b9/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 97f02b7..4f581f5 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
@@ -46,6 +47,7 @@ import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
@@ -149,6 +151,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
for (Task<?> t: tasks) {
if (t instanceof ScheduledTask) continue;
if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue;
+ if (t.getDisplayName().contains("Validating")) continue;
result.add(t);
}
return result;
@@ -294,8 +297,6 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
forceGc();
stopCondition.set(true);
- // might need an eventually here, if the internal job completion and GC is done in the background
- // (if there are no test failures for a few months, since Sept 2014, then we can remove this comment)
assertTaskMaxCountForEntityEventually(e, 2);
}
@@ -343,7 +344,15 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
forceGc();
Collection<Task<?>> t2 = em.getAllTasks();
- Assert.assertEquals(t1.size(), t2.size(), "lists are different:\n"+t1+"\n"+t2+"\n");
+ // no tasks from first batch were GC'd
+ Asserts.assertSize(MutableList.builder().addAll(t1).removeAll(t2).build(), 0);
+
+ // and we expect just the add/remove cycle at parent, and service problems
+ Set<String> newOnes = MutableList.<Task<?>>builder().addAll(t2).removeAll(t1).build().stream().map(
+ (t) -> t.getDisplayName()).collect(Collectors.toSet());
+ Function<String,String> prefix = (s) -> "sensor "+app.getId()+":"+s;
+ Assert.assertEquals(newOnes, MutableSet.of(
+ prefix.apply("entity.children.removed"), prefix.apply("entity.children.added"), prefix.apply("service.problems")));
}
/**
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/130a29b9/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
index a38a873..319311a 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
@@ -24,6 +24,7 @@ import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.Task;
@@ -31,7 +32,6 @@ import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.Feed;
import org.apache.brooklyn.core.entity.EntityAsserts;
-import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.test.entity.TestApplication;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.util.core.http.BetterMockWebServer;
@@ -102,10 +102,11 @@ public class RebindFeedWithHaTest extends RebindTestFixtureWithApp {
@Override
public Boolean call() throws Exception {
origManagementContext.getGarbageCollector().gcIteration();
- List<Task<?>> tasksAfter = ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks();
+ List<Task<?>> tasksAfter = removeSystemTasks( ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks() );
log.info("tasks after disabling HA, "+tasksAfter.size()+": "+tasksAfter);
return tasksAfter.isEmpty();
}
+
}).runRequiringTrue();
newManagementContext = createNewManagementContext();
@@ -123,6 +124,10 @@ public class RebindFeedWithHaTest extends RebindTestFixtureWithApp {
EntityAsserts.assertAttributeEqualsEventually(newEntity, SENSOR_STRING, "{\"foo\":\"myfoo\"}");
}
+ static List<Task<?>> removeSystemTasks(List<Task<?>> tasks) {
+ return tasks.stream().filter(t -> !("rebind".equals(t.getDisplayName()) || t.getDisplayName().contains("Validating"))).collect(Collectors.toList());
+ }
+
@Test(groups="Integration", invocationCount=50)
public void testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailoverManyTimes() throws Exception {
testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailover();
[13/23] brooklyn-server git commit: Merge branch 'master' into
tasks-better-tree
Posted by he...@apache.org.
Merge branch 'master' into tasks-better-tree
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/80446ab1
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/80446ab1
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/80446ab1
Branch: refs/heads/master
Commit: 80446ab1f28d35d430899f19f0f46e6ccab777d4
Parents: 130a29b df83d44
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Sep 25 10:38:50 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 25 10:38:50 2017 +0100
----------------------------------------------------------------------
api/pom.xml | 2 +-
.../brooklyn/api/catalog/BrooklynCatalog.java | 3 +
.../brooklyn/api/typereg/RegisteredType.java | 5 +-
.../typereg/RegisteredTypeLoadingContext.java | 2 +-
camp/camp-base/pom.xml | 2 +-
camp/camp-brooklyn/pom.xml | 2 +-
.../camp/brooklyn/AbstractYamlTest.java | 31 +-
.../brooklyn/EmptySoftwareProcessYamlTest.java | 94 +-
.../camp/brooklyn/FunctionSensorYamlTest.java | 123 ++
.../VanillaSoftwareProcessYamlTest.java | 299 +++++
.../CatalogOsgiVersionMoreEntityRebindTest.java | 10 +-
.../CatalogOsgiVersionMoreEntityTest.java | 3 +
camp/camp-server/pom.xml | 2 +-
camp/pom.xml | 2 +-
core/pom.xml | 2 +-
.../apache/brooklyn/core/BrooklynVersion.java | 2 +-
.../catalog/internal/BasicBrooklynCatalog.java | 106 +-
.../core/mgmt/entitlement/Entitlements.java | 1 +
.../core/mgmt/ha/OsgiArchiveInstaller.java | 18 +-
.../mgmt/ha/OsgiBundleInstallationResult.java | 21 +-
.../brooklyn/core/mgmt/ha/OsgiManager.java | 89 +-
.../core/sensor/function/FunctionSensor.java | 89 ++
.../core/typereg/BasicBrooklynTypeRegistry.java | 20 +-
.../typereg/BasicTypeImplementationPlan.java | 4 +-
.../typereg/RegisteredTypeLoadingContexts.java | 6 +-
.../core/typereg/RegisteredTypePredicates.java | 98 +-
.../brooklyn/core/typereg/RegisteredTypes.java | 43 +-
core/src/main/resources/catalog.bom | 2 +-
.../entity/RecordingSensorEventListener.java | 12 +
.../core/mgmt/osgi/OsgiStandaloneTest.java | 4 +-
.../core/policy/basic/EnricherTypeTest.java | 6 -
.../sensor/function/FunctionSensorTest.java | 77 ++
.../core/test/entity/TestEntityImpl.java | 2 +-
.../util/core/internal/ssh/ExecCmdAsserts.java | 12 +-
core/src/test/resources/catalog.bom | 2 +-
karaf/commands/pom.xml | 2 +-
karaf/features/pom.xml | 2 +-
karaf/httpcomponent-extension/pom.xml | 2 +-
karaf/init/pom.xml | 2 +-
.../init/src/main/resources/catalog-classes.bom | 14 +-
karaf/jetty-config/pom.xml | 2 +-
karaf/pom.xml | 2 +-
karaf/start/pom.xml | 2 +-
launcher-common/pom.xml | 2 +-
launcher/pom.xml | 2 +-
locations/container/pom.xml | 2 +-
.../resources/generic-application.tests.bom | 2 +-
.../src/test/resources/generic.tests.bom | 2 +-
locations/jclouds/pom.xml | 2 +-
.../location/jclouds/JcloudsLocation.java | 73 +-
.../api/JcloudsLocationConfigPublic.java | 6 +
.../JcloudsMaxConcurrencyStubbedTest.java | 225 ++++
logging/logback-includes/pom.xml | 2 +-
logging/logback-xml/pom.xml | 2 +-
parent/pom.xml | 2 +-
policy/pom.xml | 2 +-
.../action/AbstractScheduledEffectorPolicy.java | 292 +++++
.../policy/action/PeriodicEffectorPolicy.java | 116 ++
.../policy/action/ScheduledEffectorPolicy.java | 100 ++
.../policy/autoscaling/AutoScalerPolicy.java | 4 +-
policy/src/main/resources/catalog.bom | 25 +-
.../action/AbstractEffectorPolicyTest.java | 81 ++
.../action/PeriodicEffectorPolicyTest.java | 123 ++
.../action/ScheduledEffectorPolicyTest.java | 121 ++
.../action/ScheduledPolicyRebindTest.java | 137 +++
.../AutoScalerPolicyPoolSizeTest.java | 18 +-
.../AbstractLoadBalancingPolicyTest.java | 1 -
pom.xml | 4 +-
rest/rest-api/pom.xml | 2 +-
.../org/apache/brooklyn/rest/api/BundleApi.java | 147 +++
.../apache/brooklyn/rest/api/CatalogApi.java | 53 +
.../org/apache/brooklyn/rest/api/TypeApi.java | 100 ++
.../domain/BundleInstallationRestResult.java | 67 ++
.../brooklyn/rest/domain/BundleSummary.java | 106 ++
.../brooklyn/rest/domain/ConfigSummary.java | 79 +-
.../rest/domain/EnricherConfigSummary.java | 52 +-
.../rest/domain/EntityConfigSummary.java | 67 +-
.../rest/domain/LocationConfigSummary.java | 40 +-
.../rest/domain/PolicyConfigSummary.java | 53 +-
.../brooklyn/rest/domain/TaskSummary.java | 9 +-
.../apache/brooklyn/rest/domain/TypeDetail.java | 87 ++
.../brooklyn/rest/domain/TypeSummary.java | 293 +++++
rest/rest-resources/pom.xml | 2 +-
.../apache/brooklyn/rest/BrooklynRestApi.java | 6 +-
.../brooklyn/rest/resources/BundleResource.java | 169 +++
.../rest/resources/CatalogResource.java | 40 +-
.../brooklyn/rest/resources/TypeResource.java | 207 ++++
.../rest/transform/EntityTransformer.java | 26 +-
.../rest/transform/PolicyTransformer.java | 3 +-
.../rest/transform/TypeTransformer.java | 197 ++++
.../resources/BundleAndTypeResourcesTest.java | 1098 ++++++++++++++++++
rest/rest-server/pom.xml | 2 +-
server-cli/pom.xml | 2 +-
.../main/resources/brooklyn/default.catalog.bom | 2 +-
server-cli/src/main/resources/catalog.bom | 2 +-
software/base/pom.xml | 2 +-
.../entity/brooklynnode/BrooklynNode.java | 2 +-
.../brooklynnode/BrooklynNodeSshDriver.java | 2 +-
.../brooklyn/entity/java/JmxmpSslSupport.java | 2 +-
.../software/base/EmptySoftwareProcessImpl.java | 4 +-
.../software/base/VanillaSoftwareProcess.java | 6 +
.../base/VanillaSoftwareProcessImpl.java | 16 +-
software/base/src/main/resources/catalog.bom | 2 +-
software/winrm/pom.xml | 2 +-
test-framework/pom.xml | 2 +-
test-framework/src/main/resources/catalog.bom | 6 +-
test-support/pom.xml | 2 +-
.../osgi/com-example-entities/pom.xml | 2 +-
utils/common/dependencies/osgi/entities/pom.xml | 2 +-
.../dependencies/osgi/more-entities-v1/pom.xml | 2 +-
.../osgi/more-entities-v2-evil-twin/pom.xml | 2 +-
.../dependencies/osgi/more-entities-v2/pom.xml | 2 +-
utils/common/pom.xml | 2 +-
.../brooklyn/util/osgi/VersionedName.java | 24 +-
.../brooklyn/util/maven/MavenArtifactTest.java | 4 +-
utils/groovy/pom.xml | 2 +-
utils/jmx/jmxmp-ssl-agent/pom.xml | 2 +-
utils/jmx/jmxrmi-agent/pom.xml | 2 +-
utils/rest-swagger/pom.xml | 2 +-
utils/rt-felix/pom.xml | 2 +-
utils/test-support/pom.xml | 2 +-
121 files changed, 5095 insertions(+), 486 deletions(-)
----------------------------------------------------------------------
[06/23] brooklyn-server git commit: entity adjuncts have extra tag
for execution context, used in subscription delivery
Posted by he...@apache.org.
entity adjuncts have extra tag for execution context, used in subscription delivery
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/d4c9fe12
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/d4c9fe12
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/d4c9fe12
Branch: refs/heads/master
Commit: d4c9fe12ecfdf884ea8e1945e51cc07e026d7610
Parents: 8ecf395
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 13:10:17 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Sep 19 13:12:11 2017 +0100
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ManagementContext.java | 4 ++
.../brooklyn/core/config/ConfigConstraints.java | 9 ++-
.../org/apache/brooklyn/core/feed/Poller.java | 2 +-
.../brooklyn/core/mgmt/BrooklynTaskTags.java | 64 ++++++++++++++++----
.../internal/AbstractManagementContext.java | 8 +++
.../mgmt/internal/EntityManagementSupport.java | 22 ++++---
.../NonDeploymentManagementContext.java | 20 ++++--
.../core/objs/AbstractEntityAdjunct.java | 32 ++++++++--
.../brooklyn/core/objs/AdjunctConfigMap.java | 4 +-
.../SshCommandMembershipTrackingPolicy.java | 2 +-
.../apache/brooklyn/feed/shell/ShellFeed.java | 2 +-
.../util/core/task/BasicExecutionContext.java | 5 +-
.../internal/EntityExecutionManagerTest.java | 5 +-
.../brooklyn/util/core/task/TasksTest.java | 4 +-
.../policy/jclouds/os/CreateUserPolicy.java | 6 +-
.../policy/ha/AbstractFailureDetector.java | 16 +++--
.../policy/ha/ServiceFailureDetector.java | 7 ++-
.../brooklyn/policy/ha/ServiceReplacer.java | 13 +---
.../brooklyn/policy/ha/ServiceRestarter.java | 5 +-
.../brooklyn/entity/chef/ChefAttributeFeed.java | 16 +++--
.../windows/WindowsPerformanceCounterFeed.java | 21 +++----
21 files changed, 172 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index 515ec6b..00dd4d3 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.api.mgmt.entitlement.EntitlementManager;
import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager;
import org.apache.brooklyn.api.mgmt.rebind.RebindManager;
import org.apache.brooklyn.api.objs.BrooklynObject;
+import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry;
import org.apache.brooklyn.config.StringConfigMap;
import org.apache.brooklyn.util.guava.Maybe;
@@ -176,6 +177,9 @@ public interface ManagementContext {
*/
SubscriptionContext getSubscriptionContext(Entity entity);
+ /** As {@link #getSubscriptionContext(Entity)} where there is also an adjunct */
+ SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a);
+
/**
* Returns a {@link SubscriptionContext} instance representing subscriptions
* (from the {@link SubscriptionManager}) associated with this location, and capable
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
index b891c2b..c4ce81d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
+++ b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.config.ConfigKey;
@@ -114,8 +115,12 @@ public abstract class ConfigConstraints<T extends BrooklynObject> {
abstract Iterable<ConfigKey<?>> getBrooklynObjectTypeConfigKeys();
public Iterable<ConfigKey<?>> getViolations() {
- if (getBrooklynObject() instanceof EntityInternal) {
- return ((EntityInternal) getBrooklynObject()).getExecutionContext().get(
+ ExecutionContext exec =
+ getBrooklynObject() instanceof EntityInternal ? ((EntityInternal)getBrooklynObject()).getExecutionContext() :
+ // getBrooklynObject() instanceof AbstractEntityAdjunct ? ((AbstractEntityAdjunct)getBrooklynObject()).getExecutionContext() :
+ null;
+ if (exec!=null) {
+ return exec.get(
Tasks.<Iterable<ConfigKey<?>>>builder().dynamic(false).displayName("Validating config").body(
() -> validateAll() ).build() );
} else {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 940057e..dd7d22d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -139,7 +139,7 @@ public class Poller<V> {
for (final Callable<?> oneOffJob : oneOffJobs) {
Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).displayName("Poll").description("One-time poll job "+oneOffJob).build();
- oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
+ oneOffTasks.add(feed.getExecutionContext().submit(task));
}
Duration minPeriod = null;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index 5cb1027..a68df32 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionManager;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
+import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.task.DynamicTasks;
@@ -59,13 +60,14 @@ import com.google.common.collect.ImmutableSet;
/** Provides utilities for making Tasks easier to work with in Brooklyn.
* Main thing at present is to supply (and find) wrapped entities for tasks to understand the
* relationship of the entity to the task.
- * TODO Longer term it would be better to remove 'tags' on Tasks and use a strongly typed context object.
+ * <p>
+ * Eventually it may be better to replace these 'tags' on Tasks with strongly typed context objects.
* (Tags there are used mainly for determining who called it (caller), what they called it on (target entity),
* and what type of task it is (effector, schedule/sensor, etc).)
*/
public class BrooklynTaskTags extends TaskTags {
- private static final Logger log = LoggerFactory.getLogger(BrooklynTaskTags.WrappedEntity.class);
+ private static final Logger log = LoggerFactory.getLogger(BrooklynTaskTags.class);
/** Tag for tasks which are running on behalf of the management server, rather than any entity */
public static final String BROOKLYN_SERVER_TASK_TAG = "BROOKLYN-SERVER";
@@ -85,37 +87,66 @@ public class BrooklynTaskTags extends TaskTags {
// ------------- entity tags -------------------------
- public static class WrappedEntity {
+ public abstract static class WrappedItem<T> {
+ /** @deprecated since 0.12.0 going private; use {@link #getWrappingType()} */
+ @Deprecated
public final String wrappingType;
- public final Entity entity;
- protected WrappedEntity(String wrappingType, Entity entity) {
+ protected WrappedItem(String wrappingType) {
Preconditions.checkNotNull(wrappingType);
- Preconditions.checkNotNull(entity);
this.wrappingType = wrappingType;
- this.entity = entity;
+ }
+ public abstract T unwrap();
+ public String getWrappingType() {
+ return wrappingType;
}
@Override
public String toString() {
- return "Wrapped["+wrappingType+":"+entity+"]";
+ return "Wrapped["+getWrappingType()+":"+unwrap()+"]";
}
@Override
public int hashCode() {
- return Objects.hashCode(entity, wrappingType);
+ return Objects.hashCode(unwrap(), getWrappingType());
}
@Override
public boolean equals(Object obj) {
if (this==obj) return true;
- if (!(obj instanceof WrappedEntity)) return false;
+ if (!(obj instanceof WrappedItem)) return false;
return
- Objects.equal(entity, ((WrappedEntity)obj).entity) &&
- Objects.equal(wrappingType, ((WrappedEntity)obj).wrappingType);
+ Objects.equal(unwrap(), ((WrappedItem<?>)obj).unwrap()) &&
+ Objects.equal(getWrappingType(), ((WrappedItem<?>)obj).getWrappingType());
+ }
+ }
+ public static class WrappedEntity extends WrappedItem<Entity> {
+ /** @deprecated since 0.12.0 going private; use {@link #unwrap()} */
+ @Deprecated
+ public final Entity entity;
+ protected WrappedEntity(String wrappingType, Entity entity) {
+ super(wrappingType);
+ this.entity = Preconditions.checkNotNull(entity);
+ }
+ @Override
+ public Entity unwrap() {
+ return entity;
}
}
+ public static class WrappedObject<T> extends WrappedItem<T> {
+ private final T object;
+ protected WrappedObject(String wrappingType, T object) {
+ super(wrappingType);
+ this.object = Preconditions.checkNotNull(object);
+ }
+ @Override
+ public T unwrap() {
+ return object;
+ }
+ }
public static final String CONTEXT_ENTITY = "contextEntity";
public static final String CALLER_ENTITY = "callerEntity";
public static final String TARGET_ENTITY = "targetEntity";
+ public static final String CONTEXT_ADJUNCT = "contextAdjunct";
+
/**
* Marks a task as running in the context of the entity. This means
* resolving any relative/context sensitive values against that entity.
@@ -138,6 +169,15 @@ public class BrooklynTaskTags extends TaskTags {
return new WrappedEntity(TARGET_ENTITY, entity);
}
+ /**
+ * As {@link #tagForContextEntity(Entity)} but wrapping an adjunct.
+ * Tasks with this tag will also have a {@link #tagForContextEntity(Entity)}.
+ */
+ public static WrappedObject<EntityAdjunct> tagForContextAdjunct(EntityAdjunct adjunct) {
+ return new WrappedObject<EntityAdjunct>(CONTEXT_ADJUNCT, adjunct);
+ }
+
+
public static WrappedEntity getWrappedEntityTagOfType(Task<?> t, String wrappingType) {
if (t==null) return null;
return getWrappedEntityTagOfType( getTagsFast(t), wrappingType);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index 52eed44..f0c9335 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -51,6 +51,7 @@ import org.apache.brooklyn.api.mgmt.entitlement.EntitlementManager;
import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager;
import org.apache.brooklyn.api.mgmt.rebind.RebindManager;
import org.apache.brooklyn.api.objs.BrooklynObject;
+import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry;
import org.apache.brooklyn.api.typereg.RegisteredType;
import org.apache.brooklyn.config.StringConfigMap;
@@ -258,6 +259,13 @@ public abstract class AbstractManagementContext implements ManagementContextInte
Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e)));
return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
}
+
+ @Override
+ public SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a) {
+ // BSC is a thin wrapper around SM so fine to create a new one here
+ Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e), BrooklynTaskTags.tagForContextAdjunct(a)));
+ return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
+ }
@Override
public SubscriptionContext getSubscriptionContext(Location loc) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
index 96cfd33..e192edd 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
@@ -84,8 +84,8 @@ public class EntityManagementSupport {
protected transient ManagementContext initialManagementContext;
protected transient ManagementContext managementContext;
- protected transient SubscriptionContext subscriptionContext;
- protected transient ExecutionContext executionContext;
+ protected transient volatile SubscriptionContext subscriptionContext;
+ protected transient volatile ExecutionContext executionContext;
protected final AtomicBoolean managementContextUsable = new AtomicBoolean(false);
protected final AtomicBoolean currentlyDeployed = new AtomicBoolean(false);
@@ -357,19 +357,25 @@ public class EntityManagementSupport {
return (managementContextUsable.get()) ? managementContext : nonDeploymentManagementContext;
}
- public synchronized ExecutionContext getExecutionContext() {
+ public ExecutionContext getExecutionContext() {
if (executionContext!=null) return executionContext;
if (managementContextUsable.get()) {
- executionContext = managementContext.getExecutionContext(entity);
- return executionContext;
+ synchronized (this) {
+ if (executionContext!=null) return executionContext;
+ executionContext = managementContext.getExecutionContext(entity);
+ return executionContext;
+ }
}
return nonDeploymentManagementContext.getExecutionContext(entity);
}
- public synchronized SubscriptionContext getSubscriptionContext() {
+ public SubscriptionContext getSubscriptionContext() {
if (subscriptionContext!=null) return subscriptionContext;
if (managementContextUsable.get()) {
- subscriptionContext = managementContext.getSubscriptionContext(entity);
- return subscriptionContext;
+ synchronized (this) {
+ if (subscriptionContext!=null) return subscriptionContext;
+ subscriptionContext = managementContext.getSubscriptionContext(entity);
+ return subscriptionContext;
+ }
}
return nonDeploymentManagementContext.getSubscriptionContext(entity);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index 2e9238f..91375d2 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -58,6 +58,7 @@ import org.apache.brooklyn.api.mgmt.rebind.RebindManager;
import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister;
import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData;
import org.apache.brooklyn.api.objs.BrooklynObject;
+import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry;
import org.apache.brooklyn.config.StringConfigMap;
import org.apache.brooklyn.core.catalog.internal.CatalogInitialization;
@@ -101,7 +102,6 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
private ManagementContextInternal initialManagementContext;
private final QueueingSubscriptionManager qsm;
- private final BasicSubscriptionContext subscriptionContext;
private NonDeploymentEntityManager entityManager;
private NonDeploymentLocationManager locationManager;
private NonDeploymentAccessManager accessManager;
@@ -112,10 +112,6 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
this.mode = checkNotNull(mode, "mode");
qsm = new QueueingSubscriptionManager();
- // For subscription flags, see AbstractManagementContext.getSubscriptionContext. This is
- // needed for callbacks, to ensure the correct entity context is set.
- Map<String, ?> subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity)));
- subscriptionContext = new BasicSubscriptionContext(subscriptionFlags, qsm, entity);
entityManager = new NonDeploymentEntityManager(null);
locationManager = new NonDeploymentLocationManager(null);
accessManager = new NonDeploymentAccessManager(null);
@@ -254,7 +250,19 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity);
if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED)
throw new IllegalStateException("Entity "+entity+" is no longer managed; subscription context not available");
- return subscriptionContext;
+ // see also AbstractManagementContext.getSubscriptionContext - needed for callbacks, to ensure the correct entity context is set
+ Map<String, ?> subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity)));
+ return new BasicSubscriptionContext(subscriptionFlags, qsm, entity);
+ }
+
+ @Override
+ public SubscriptionContext getSubscriptionContext(Entity entity, EntityAdjunct adjunct) {
+ if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity);
+ if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED)
+ throw new IllegalStateException("Entity "+entity+" is no longer managed; subscription context not available");
+ // see also AbstractManagementContext.getSubscriptionContext - needed for callbacks, to ensure the correct entity context is set
+ Map<String, ?> subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity), BrooklynTaskTags.tagForContextAdjunct(adjunct)));
+ return new BasicSubscriptionContext(subscriptionFlags, qsm, entity);
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index 2c4c4b4..9018d5d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
@@ -37,6 +36,7 @@ import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.objs.BrooklynObject;
@@ -54,12 +54,15 @@ import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.internal.ConfigUtilsInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.BasicExecutionContext;
import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +89,8 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
@Deprecated
protected Map<String,Object> leftoverProperties = Maps.newLinkedHashMap();
+ /** @deprecated since 0.12.0, going private, use {@link #getExecutionContext()} */
+ @Deprecated
protected transient ExecutionContext execution;
private final BasicConfigurationSupport config = new BasicConfigurationSupport();
@@ -213,6 +218,14 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
return _legacyNoConstructionInit;
}
+ /** If the entity has been set, returns the execution context indicating this adjunct.
+ * Primarily intended for this adjunct to execute tasks, but in some cases, mainly low level,
+ * it may make sense for other components to execute tasks against this adjunct. */
+ @Beta
+ public ExecutionContext getExecutionContext() {
+ return execution;
+ }
+
@Override
public ConfigurationSupportInternal config() {
return config;
@@ -276,7 +289,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
synchronized (AbstractEntityAdjunct.this) {
if (_subscriptionTracker!=null) return _subscriptionTracker;
if (entity==null) return null;
- _subscriptionTracker = new SubscriptionTracker(((EntityInternal)entity).subscriptions().getSubscriptionContext());
+ _subscriptionTracker = new SubscriptionTracker(getManagementContext().getSubscriptionContext(entity, AbstractEntityAdjunct.this));
return _subscriptionTracker;
}
}
@@ -334,7 +347,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
@Override
protected ExecutionContext getContext() {
- return AbstractEntityAdjunct.this.execution;
+ return AbstractEntityAdjunct.this.getExecutionContext();
}
@Override
@@ -402,10 +415,21 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
this.name = name;
}
+ @Override
+ public ManagementContext getManagementContext() {
+ ManagementContext result = super.getManagementContext();
+ if (result!=null) return result;
+ if (entity!=null) {
+ return ((EntityInternal)entity).getManagementContext();
+ }
+ return null;
+ }
+
public void setEntity(EntityLocal entity) {
if (destroyed.get()) throw new IllegalStateException("Cannot set entity on a destroyed entity adjunct");
this.entity = entity;
- this.execution = ((EntityInternal) entity).getExecutionContext();
+ this.execution = new BasicExecutionContext( getManagementContext().getExecutionManager(),
+ MutableList.of(BrooklynTaskTags.tagForContextAdjunct(this), BrooklynTaskTags.tagForContextEntity(entity)) );
if (entity!=null && getCatalogItemId() == null) {
setCatalogItemIdAndSearchPath(entity.getCatalogItemId(), entity.getCatalogItemIdSearchPath());
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java b/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java
index 602d943..71fe16c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java
@@ -69,9 +69,7 @@ public class AdjunctConfigMap extends AbstractConfigMapImpl<EntityAdjunct> {
@Override
protected ExecutionContext getExecutionContext(BrooklynObject bo) {
- // TODO expose ((AbstractEntityAdjunct)bo).execution ?
- Entity entity = ((AbstractEntityAdjunct)bo).entity;
- return (entity != null) ? ((EntityInternal)entity).getExecutionContext() : null;
+ return ((AbstractEntityAdjunct)bo).getExecutionContext();
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
index ea2ec55..373c667 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
@@ -186,7 +186,7 @@ public class SshCommandMembershipTrackingPolicy extends AbstractMembershipTracki
// Try to resolve the configuration in the env Map
try {
- env = (Map<String, Object>) Tasks.resolveDeepValue(env, Object.class, ((EntityInternal) entity).getExecutionContext());
+ env = (Map<String, Object>) Tasks.resolveDeepValue(env, Object.class, getExecutionContext());
} catch (InterruptedException | ExecutionException e) {
throw Exceptions.propagate(e);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
index ca41304..3f78c3a 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
@@ -229,7 +229,7 @@ public class ShellFeed extends AbstractFeed {
final ProcessTaskFactory<?> taskFactory = newTaskFactory(pollInfo.command, pollInfo.env, pollInfo.dir,
pollInfo.input, pollInfo.context, pollInfo.timeout);
- final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
+ final ExecutionContext executionContext = getExecutionContext();
getPoller().scheduleAtFixedRate(
new Callable<SshPollValue>() {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 429cad7..1b85663 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -44,6 +44,7 @@ import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
@@ -107,8 +108,8 @@ public class BasicExecutionContext extends AbstractExecutionContext {
// which may require access to internal methods
// (could remove this check if generalizing; it has been here for a long time and the problem seems gone)
for (Object tag: tags) {
- if (tag instanceof BrooklynTaskTags.WrappedEntity) {
- if (Proxy.isProxyClass(((WrappedEntity)tag).entity.getClass())) {
+ if (tag instanceof BrooklynTaskTags.WrappedItem) {
+ if (Proxy.isProxyClass(((WrappedItem<?>)tag).unwrap().getClass())) {
log.warn(""+this+" has entity proxy in "+tag);
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 59ee3cf..97f02b7 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -40,6 +40,7 @@ import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.internal.BrooklynProperties;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem;
import org.apache.brooklyn.core.sensor.BasicAttributeSensor;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests;
@@ -318,8 +319,8 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
if (tag instanceof Entity && ((Entity)tag).getId().equals(eId)) {
fail("tags contains unmanaged entity "+tag);
}
- if ((tag instanceof WrappedEntity) && ((WrappedEntity)tag).entity.getId().equals(eId)
- && ((WrappedEntity)tag).wrappingType.equals(BrooklynTaskTags.CONTEXT_ENTITY)) {
+ if ((tag instanceof WrappedEntity) && ((WrappedEntity)tag).unwrap().getId().equals(eId)
+ && ((WrappedItem<?>)tag).getWrappingType().equals(BrooklynTaskTags.CONTEXT_ENTITY)) {
fail("tags contains unmanaged entity (wrapped) "+tag);
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
index 990e7f7..8c656a3 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
@@ -248,8 +248,8 @@ public class TasksTest extends BrooklynAppUnitTestSupport {
for (Object tag : Tasks.current().getTags()) {
if (tag instanceof WrappedEntity) {
WrappedEntity wrapped = (WrappedEntity)tag;
- if (BrooklynTaskTags.CONTEXT_ENTITY.equals(wrapped.wrappingType)) {
- context.add(wrapped.entity);
+ if (BrooklynTaskTags.CONTEXT_ENTITY.equals(wrapped.getWrappingType())) {
+ context.add(wrapped.unwrap());
}
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java b/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java
index 2078779..555d7cb 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java
@@ -110,11 +110,7 @@ public class CreateUserPolicy extends AbstractPolicy implements SensorEventListe
}
protected void addUserAsync(final Entity entity, final SshMachineLocation machine) {
- ((EntityInternal)entity).getExecutionContext().execute(new Runnable() {
- @Override
- public void run() {
- addUser(entity, machine);
- }});
+ getExecutionContext().execute(() -> addUser(entity, machine));
}
protected void addUser(Entity entity, SshMachineLocation machine) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
index 840335a..c6bd33d 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
@@ -21,7 +21,6 @@ package org.apache.brooklyn.policy.ha;
import static org.apache.brooklyn.util.time.Time.makeTimeStringRounded;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -30,19 +29,18 @@ import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.policy.AbstractPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
-import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.apache.brooklyn.util.core.task.BasicTask;
import org.apache.brooklyn.util.core.task.ScheduledTask;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.reflect.TypeToken;
@@ -194,7 +192,7 @@ public abstract class AbstractFailureDetector extends AbstractPolicy {
protected void doStartPolling() {
if (scheduledTask == null || scheduledTask.isDone()) {
ScheduledTask task = ScheduledTask.builder(pollingTaskFactory).displayName( getTaskName() ).period(getPollPeriod()).build();
- scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);
+ scheduledTask = getExecutionContext().submit(task);
}
}
@@ -270,7 +268,6 @@ public abstract class AbstractFailureDetector extends AbstractPolicy {
schedulePublish(0);
}
- @SuppressWarnings("unchecked")
protected void schedulePublish(long delay) {
if (isRunning() && executorQueued.compareAndSet(false, true)) {
long now = System.currentTimeMillis();
@@ -279,8 +276,9 @@ public abstract class AbstractFailureDetector extends AbstractPolicy {
Runnable job = new PublishJob();
- ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job));
- ((EntityInternal)entity).getExecutionContext().submit(task);
+ ScheduledTask task = ScheduledTask.builder(() -> Tasks.builder().body(job).dynamic(false).displayName("Failure detector iteration").build())
+ .delay(Duration.millis(delay)).displayName("Failure detector scheduler").build();
+ getExecutionContext().submit(task);
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
index 2cbbf28..e143582 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -39,6 +39,7 @@ import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.apache.brooklyn.util.core.task.BasicTask;
import org.apache.brooklyn.util.core.task.ScheduledTask;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.time.Duration;
@@ -314,7 +315,6 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
return description;
}
- @SuppressWarnings({ "rawtypes" })
protected void recomputeAfterDelay(long delay) {
// TODO Execute in same thread as other onEvent calls are done in (i.e. same conceptually
// single-threaded executor as the subscription-manager will use).
@@ -352,8 +352,9 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
}
};
- ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
- ((EntityInternal)entity).getExecutionContext().submit(task);
+ ScheduledTask task = ScheduledTask.builder(() -> Tasks.builder().body(job).dynamic(false).displayName("Failure detector recompute").build())
+ .delay(Duration.millis(delay)).displayName("Failure detector recompute after delay").build();
+ getExecutionContext().submit(task);
}
private String getTimeStringSince(Long time) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
index 3a1ba80..e14433d 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
@@ -129,10 +129,7 @@ public class ServiceReplacer extends AbstractPolicy {
if (isRunning()) {
highlightViolation("Failure detected");
LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")");
- ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
- @Override public void run() {
- onDetectedFailure(event);
- }});
+ getExecutionContext().submit(() -> onDetectedFailure(event));
} else {
LOG.warn("ServiceReplacer not running, so not acting on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")");
}
@@ -176,10 +173,7 @@ public class ServiceReplacer extends AbstractPolicy {
highlightViolation(violationText+", triggering restart");
LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
- Task<?> t = ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
-
- @Override
- public void run() {
+ Task<?> t = getExecutionContext().submit(() -> {
try {
Entities.invokeEffectorWithArgs(entity, entity, MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get();
consecutiveReplacementFailureTimes.clear();
@@ -191,8 +185,7 @@ public class ServiceReplacer extends AbstractPolicy {
highlightViolation(violationText+" and replace attempt failed: "+Exceptions.collapseText(e));
onReplacementFailed("Replace failure ("+Exceptions.collapseText(e)+") at "+entity+": "+reason);
}
- }
- });
+ });
highlightAction("Replacing "+failedEntity, t);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
index 8faaf89..a31e3c0 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
@@ -112,10 +112,7 @@ public class ServiceRestarter extends AbstractPolicy {
if (isRunning()) {
LOG.info("ServiceRestarter notified; dispatching job for "+entity+" ("+event.getValue()+")");
- ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
- @Override public void run() {
- onDetectedFailure(event);
- }});
+ getExecutionContext().submit(() -> onDetectedFailure(event));
} else {
LOG.warn("ServiceRestarter not running, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java
index f6d2615..0252dc2 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java
@@ -225,9 +225,8 @@ public class ChefAttributeFeed extends AbstractFeed {
@Override
public SshPollValue call() throws Exception {
ProcessTaskWrapper<String> taskWrapper = knifeTaskFactory.newTask();
- final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
log.debug("START: Running knife to query attributes of Chef node {}", nodeName);
- executionContext.submit(taskWrapper);
+ getExecutionContext().submit(taskWrapper);
taskWrapper.block();
log.debug("DONE: Running knife to query attributes of Chef node {}", nodeName);
return new SshPollValue(null, taskWrapper.getExitCode(), taskWrapper.getStdout(), taskWrapper.getStderr());
@@ -235,7 +234,7 @@ public class ChefAttributeFeed extends AbstractFeed {
};
getPoller().scheduleAtFixedRate(
- new CallInEntityExecutionContext<SshPollValue>(entity, getAttributesFromKnife),
+ new CallInExecutionContext<SshPollValue>(this, getAttributesFromKnife),
new SendChefAttributesToSensors(entity, polls),
minPeriod);
}
@@ -269,20 +268,19 @@ public class ChefAttributeFeed extends AbstractFeed {
*
* @param <T> The type of the {@link Callable}.
*/
- private static class CallInEntityExecutionContext<T> implements Callable<T> {
+ private static class CallInExecutionContext<T> implements Callable<T> {
private final Callable<T> job;
- private Entity entity;
+ private AbstractFeed feed;
- private CallInEntityExecutionContext(Entity entity, Callable<T> job) {
+ private CallInExecutionContext(AbstractFeed feed, Callable<T> job) {
this.job = job;
- this.entity = entity;
+ this.feed = feed;
}
@Override
public T call() throws Exception {
- final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
- return executionContext.submit(Maps.newHashMap(), job).get();
+ return feed.getExecutionContext().submit(Maps.newHashMap(), job).get();
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
----------------------------------------------------------------------
diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
index d5d9751..55b273e 100644
--- a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
+++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
@@ -211,14 +211,14 @@ public class WindowsPerformanceCounterFeed extends AbstractFeed {
String command = JOINER_ON_SPACE.join(allParams);
log.debug("Windows performance counter poll command for {} will be: {}", entity, command);
- GetPerformanceCountersJob<WinRmToolResponse> job = new GetPerformanceCountersJob(getEntity(), command);
+ GetPerformanceCountersJob job = new GetPerformanceCountersJob(getEntity(), command);
getPoller().scheduleAtFixedRate(
- new CallInEntityExecutionContext(entity, job),
+ new CallInExecutionContext<WinRmToolResponse>(this, job),
new SendPerfCountersToSensors(getEntity(), polls),
minPeriod);
}
- private static class GetPerformanceCountersJob<T> implements Callable<T> {
+ private static class GetPerformanceCountersJob implements Callable<WinRmToolResponse> {
private final Entity entity;
private final String command;
@@ -229,15 +229,14 @@ public class WindowsPerformanceCounterFeed extends AbstractFeed {
}
@Override
- @SuppressWarnings("unchecked")
- public T call() throws Exception {
+ public WinRmToolResponse call() throws Exception {
Maybe<WinRmMachineLocation> machineLocationMaybe = Machines.findUniqueMachineLocation(entity.getLocations(), WinRmMachineLocation.class);
if (machineLocationMaybe.isAbsent()) {
return null;
}
WinRmMachineLocation machine = EffectorTasks.getMachine(entity, WinRmMachineLocation.class);
WinRmToolResponse response = machine.executePsScript(command);
- return (T)response;
+ return response;
}
}
@@ -254,18 +253,18 @@ public class WindowsPerformanceCounterFeed extends AbstractFeed {
*
* @param <T> The type of the {@link java.util.concurrent.Callable}.
*/
- private static class CallInEntityExecutionContext<T> implements Callable<T> {
+ private static class CallInExecutionContext<T> implements Callable<T> {
private final Callable<T> job;
- private Entity entity;
+ private AbstractFeed feed;
- private CallInEntityExecutionContext(Entity entity, Callable<T> job) {
+ private CallInExecutionContext(AbstractFeed feed, Callable<T> job) {
this.job = job;
- this.entity = entity;
+ this.feed = feed;
}
@Override
public T call() throws Exception {
- ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
+ ExecutionContext executionContext = feed.getExecutionContext();
return executionContext.submit(Maps.newHashMap(), job).get();
}
}
[02/23] brooklyn-server git commit: task visibility: entity mgmt
create and startup wrapped in its own task
Posted by he...@apache.org.
task visibility: entity mgmt create and startup wrapped in its own task
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/7f4d7bd8
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/7f4d7bd8
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/7f4d7bd8
Branch: refs/heads/master
Commit: 7f4d7bd87e0e0e1d98ed49d875e601a21e0635c8
Parents: 37b6b11
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 12 15:01:09 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:54 2017 +0100
----------------------------------------------------------------------
.../core/mgmt/EntityManagementUtils.java | 8 +++---
.../mgmt/internal/EntityManagementSupport.java | 19 ++++++++-----
.../mgmt/internal/LocalSubscriptionManager.java | 2 ++
.../internal/QueueingSubscriptionManager.java | 4 +--
.../internal/EntityExecutionManagerTest.java | 28 ++++++++++++++------
5 files changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
index 0cd18fc..de9964c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
@@ -91,9 +91,11 @@ public class EntityManagementUtils {
*/
@Beta
public static <T extends Application> T createUnstarted(ManagementContext mgmt, EntitySpec<T> spec, Optional<String> entityId) {
- // TODO wrap in task
- T app = ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId);
- return app;
+ return mgmt.getServerExecutionContext().get(Tasks.<T>builder().dynamic(false)
+ .displayName("Creating entity "+
+ (Strings.isNonBlank(spec.getDisplayName()) ? spec.getDisplayName() : spec.getType().getName()) )
+ .body(() -> ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId))
+ .build() );
}
/** as {@link #createUnstarted(ManagementContext, EntitySpec)} but for a string plan (e.g. camp yaml) */
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
index 97c7bac..96cfd33 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
@@ -38,10 +38,12 @@ import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.EntityAndItem;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.StringAndArgument;
import org.apache.brooklyn.core.mgmt.internal.NonDeploymentManagementContext.NonDeploymentManagementContextMode;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,9 +161,10 @@ public class EntityManagementSupport {
}
public void onManagementStarting(ManagementTransitionInfo info) {
- try {
- // TODO same-thread task on this entity, with internal tag ?
- synchronized (this) {
+ info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management starting")
+ .dynamic(false)
+ .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+ .body(() -> { try { synchronized (this) {
boolean alreadyManaging = isDeployed();
if (alreadyManaging) {
@@ -212,13 +215,15 @@ public class EntityManagementSupport {
} catch (Throwable t) {
managementFailed.set(true);
throw Exceptions.propagate(t);
- }
+ }}).build() );
}
@SuppressWarnings("deprecation")
public void onManagementStarted(ManagementTransitionInfo info) {
- try {
- synchronized (this) {
+ info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management started")
+ .dynamic(false)
+ .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+ .body(() -> { try { synchronized (this) {
boolean alreadyManaged = isFullyManaged();
if (alreadyManaged) {
@@ -265,7 +270,7 @@ public class EntityManagementSupport {
} catch (Throwable t) {
managementFailed.set(true);
throw Exceptions.propagate(t);
- }
+ }}).build() );
}
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 2349c73..983b307 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -229,6 +229,8 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
.addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
.add(s.subscriberExecutionManagerTag)
.add(BrooklynTaskTags.SENSOR_TAG)
+ // associate the publish event with the publisher (though on init it might be triggered by subscriber)
+ .addIfNotNull(event.getSource()!=null ? BrooklynTaskTags.tagForTargetEntity(event.getSource()) : null)
.build()
.asUnmodifiable();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
index 83facda..290520d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
@@ -76,7 +76,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
@SuppressWarnings("unchecked")
public synchronized void startDelegatingForSubscribing() {
- // TODO wrap in same-thread task
+ // could wrap in same-thread task, but there's enough context without it
assert delegate!=null;
for (QueuedSubscription s: queuedSubscriptions) {
delegate.subscribe(s.flags, s.s);
@@ -87,7 +87,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
@SuppressWarnings("unchecked")
public synchronized void startDelegatingForPublishing() {
- // TODO wrap in same-thread task
+ // could wrap in same-thread task, but there's enough context without it
assert delegate!=null;
for (SensorEvent evt: queuedSensorEvents) {
delegate.publish(evt);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 51f5bdc..59ee3cf 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -46,8 +46,10 @@ import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
import org.apache.brooklyn.util.core.task.ExecutionListener;
+import org.apache.brooklyn.util.core.task.ScheduledTask;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.javalang.JavaClassNames;
@@ -129,18 +131,28 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
return Tasks.builder().displayName(name).dynamic(false).body(Callables.returning(null));
}
- protected void assertTaskCountForEntityEventually(final Entity entity, final int expectedCount) {
+ protected void assertImportantTaskCountForEntityEventually(final Entity entity, final int expectedCount) {
// Dead task (and initialization task) should have been GC'd on completion.
// However, the GC'ing happens in a listener, executed in a different thread - the task.get()
// doesn't block for it. Therefore can't always guarantee it will be GC'ed by now.
Asserts.succeedsEventually(new Runnable() {
@Override public void run() {
- forceGc();
- Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity);
+ forceGc();
+ Collection<Task<?>> tasks = removeSystemTasks(BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity));
Assert.assertEquals(tasks.size(), expectedCount, "Tasks were "+tasks);
}});
}
+ static Set<Task<?>> removeSystemTasks(Iterable<Task<?>> tasks) {
+ Set<Task<?>> result = MutableSet.of();
+ for (Task<?> t: tasks) {
+ if (t instanceof ScheduledTask) continue;
+ if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue;
+ result.add(t);
+ }
+ return result;
+ }
+
// Needed because of https://issues.apache.org/jira/browse/BROOKLYN-401
protected void assertTaskMaxCountForEntityEventually(final Entity entity, final int expectedMaxCount) {
// Dead task (and initialization task) should have been GC'd on completion.
@@ -149,7 +161,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
Asserts.succeedsEventually(new Runnable() {
@Override public void run() {
forceGc();
- Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity);
+ Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) );
Assert.assertTrue(tasks.size() <= expectedMaxCount,
"Expected tasks count max of " + expectedMaxCount + ". Tasks were "+tasks);
}});
@@ -161,8 +173,8 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
final Task<?> task = runEmptyTaskWithNameAndTags(e, "should-be-kept", ManagementContextInternal.NON_TRANSIENT_TASK_TAG);
runEmptyTaskWithNameAndTags(e, "should-be-gcd", ManagementContextInternal.TRANSIENT_TASK_TAG);
- assertTaskCountForEntityEventually(e, 1);
- Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e);
+ assertImportantTaskCountForEntityEventually(e, 1);
+ Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e) );
assertEquals(tasks, ImmutableList.of(task), "Mismatched tasks, got: "+tasks);
}
@@ -320,7 +332,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
// allow background enrichers to complete
Time.sleep(Duration.ONE_SECOND);
forceGc();
- List<Task<?>> t1 = em.getAllTasks();
+ Collection<Task<?>> t1 = em.getAllTasks();
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
entity.sensors().set(TestEntity.NAME, "bob");
@@ -328,7 +340,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
Entities.destroy(entity);
Time.sleep(Duration.ONE_SECOND);
forceGc();
- List<Task<?>> t2 = em.getAllTasks();
+ Collection<Task<?>> t2 = em.getAllTasks();
Assert.assertEquals(t1.size(), t2.size(), "lists are different:\n"+t1+"\n"+t2+"\n");
}
[17/23] brooklyn-server git commit: Merge branch 'master' into tasks-4
Posted by he...@apache.org.
Merge branch 'master' into tasks-4
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/0c2e1f60
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/0c2e1f60
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/0c2e1f60
Branch: refs/heads/master
Commit: 0c2e1f60e7781def8b4a8be744e4ee9244658212
Parents: bb26d32 baf8070
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 03:00:04 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 03:00:04 2017 +0100
----------------------------------------------------------------------
.../org/apache/brooklyn/api/entity/Entity.java | 3 +
.../brooklyn/api/mgmt/rebind/RebindManager.java | 14 +-
.../org/apache/brooklyn/api/sensor/Feed.java | 2 +-
...loudsCustomizerInstantiationYamlDslTest.java | 156 +++++--
.../catalog/SpecParameterUnwrappingTest.java | 2 +-
.../brooklyn/core/catalog/CatalogLoadMode.java | 73 ----
.../catalog/internal/BasicBrooklynCatalog.java | 28 +-
.../catalog/internal/CatalogInitialization.java | 278 ++++++++-----
.../core/catalog/internal/CatalogUtils.java | 31 +-
.../brooklyn/core/entity/AbstractEntity.java | 34 +-
.../core/entity/BrooklynConfigKeys.java | 2 +-
.../brooklyn/core/entity/EntityAdjuncts.java | 28 +-
.../brooklyn/core/entity/EntityInternal.java | 16 +-
.../apache/brooklyn/core/feed/AbstractFeed.java | 2 +-
.../mgmt/ha/HighAvailabilityManagerImpl.java | 45 +-
.../core/mgmt/ha/OsgiArchiveInstaller.java | 210 +++++++---
.../brooklyn/core/mgmt/ha/OsgiManager.java | 36 +-
.../internal/AbstractManagementContext.java | 5 +-
.../core/mgmt/internal/LocalUsageManager.java | 31 +-
.../ManagementNodeStateListenerManager.java | 175 ++++++++
.../NonDeploymentManagementContext.java | 19 -
.../rebind/ImmediateDeltaChangeListener.java | 160 --------
.../core/mgmt/rebind/RebindIteration.java | 213 ++++------
.../core/mgmt/rebind/RebindManagerImpl.java | 30 --
.../mgmt/rebind/dto/BasicEntityMemento.java | 2 +-
.../mgmt/rebind/dto/MementosGenerators.java | 5 +-
.../mgmt/usage/ManagementNodeStateListener.java | 53 +++
.../core/objs/BasicConfigurableObject.java | 4 +-
.../apache/brooklyn/core/policy/Policies.java | 66 +--
.../core/server/BrooklynServerConfig.java | 28 +-
.../FixedListMachineProvisioningLocation.java | 3 +-
.../brooklyn/util/core/ClassLoaderUtils.java | 7 +-
.../brooklyn/util/core/flags/TypeCoercions.java | 21 +
.../brooklyn/util/core/osgi/BundleMaker.java | 38 +-
.../brooklyn/util/core/task/BasicTask.java | 5 +-
.../core/entity/EntityConcurrencyTest.java | 2 +-
.../ha/HighAvailabilityManagerInMemoryTest.java | 5 -
.../ha/HighAvailabilityManagerTestFixture.java | 61 ++-
.../mgmt/rebind/RebindCatalogEntityTest.java | 2 +-
.../core/mgmt/rebind/RebindCatalogItemTest.java | 2 -
.../core/mgmt/rebind/RebindFailuresTest.java | 18 +-
.../core/mgmt/rebind/RebindFeedWithHaTest.java | 2 +-
.../usage/ManagementNodeStateListenerTest.java | 160 ++++++++
.../util/core/ClassLoaderUtilsTest.java | 6 +-
.../util/core/internal/TypeCoercionsTest.java | 31 ++
.../core/internal/ssh/RecordingSshTool.java | 7 +
.../util/core/osgi/BundleMakerTest.java | 8 +-
.../bundlemaker/withmanifest/myemptyfile.txt | 0
.../launcher/osgi/OsgiLauncherImpl.java | 2 +-
.../init/src/main/resources/catalog-classes.bom | 364 ----------------
karaf/init/src/main/resources/catalog.bom | 25 ++
.../BrooklynLauncherRebindCatalogTest.java | 2 +-
.../brooklyn/launcher/BrooklynLauncherTest.java | 2 +-
.../launcher/CleanOrphanedAdjunctsTest.java | 6 +-
.../CleanOrphanedLocationsIntegrationTest.java | 2 +-
.../jclouds/LocationCustomizerDelegate.java | 8 +-
...ailabilityManagerJcloudsObjectStoreTest.java | 7 +-
.../apache/brooklyn/rest/api/AdjunctApi.java | 237 +++++++++++
.../apache/brooklyn/rest/api/CatalogApi.java | 34 +-
.../brooklyn/rest/api/EntityConfigApi.java | 10 +-
.../org/apache/brooklyn/rest/api/PolicyApi.java | 16 +-
.../brooklyn/rest/api/PolicyConfigApi.java | 35 +-
.../brooklyn/rest/domain/AdjunctDetail.java | 72 ++++
.../brooklyn/rest/domain/AdjunctSummary.java | 148 +++++++
.../brooklyn/rest/domain/ApplicationSpec.java | 12 +-
.../rest/domain/CatalogEnricherSummary.java | 4 +
.../rest/domain/CatalogEntitySummary.java | 4 +
.../rest/domain/CatalogItemSummary.java | 4 +
.../rest/domain/CatalogLocationSummary.java | 4 +
.../rest/domain/CatalogPolicySummary.java | 4 +
.../brooklyn/rest/domain/EffectorSummary.java | 9 +-
.../brooklyn/rest/domain/EntityDetail.java | 14 +-
.../brooklyn/rest/domain/EntitySummary.java | 11 +-
.../rest/domain/PolicyConfigSummary.java | 2 +-
.../brooklyn/rest/domain/PolicySummary.java | 83 +---
.../rest/domain/ScriptExecutionSummary.java | 11 +-
.../brooklyn/rest/domain/SensorSummary.java | 7 +-
.../org/apache/brooklyn/rest/domain/Status.java | 7 +-
.../apache/brooklyn/rest/BrooklynRestApi.java | 2 +
.../rest/resources/AdjunctResource.java | 273 ++++++++++++
.../rest/resources/CatalogResource.java | 21 +-
.../rest/resources/EntityConfigResource.java | 10 +-
.../rest/resources/PolicyConfigResource.java | 1 +
.../brooklyn/rest/resources/PolicyResource.java | 2 +-
.../rest/transform/AdjunctTransformer.java | 96 +++++
.../rest/transform/CatalogTransformer.java | 30 +-
.../rest/transform/ConfigTransformer.java | 175 ++++++++
.../rest/transform/EntityTransformer.java | 107 +++--
.../rest/transform/PolicyTransformer.java | 3 +
.../rest/transform/SensorTransformer.java | 21 +-
.../rest/transform/TypeTransformer.java | 10 +-
.../rest/util/BrooklynRestResourceUtils.java | 44 ++
.../rest/resources/AdjunctResourceTest.java | 198 +++++++++
.../rest/testing/BrooklynRestResourceTest.java | 2 +-
.../main/java/org/apache/brooklyn/cli/Main.java | 21 +-
.../brooklyn/cli/lister/ItemDescriptors.java | 26 +-
.../java/org/apache/brooklyn/cli/CliTest.java | 32 +-
.../brooklyn/entity/chef/ChefAttributeFeed.java | 411 -------------------
.../entity/chef/ChefAttributePollConfig.java | 61 ---
.../brooklyn/entity/chef/ChefBashCommands.java | 42 --
.../apache/brooklyn/entity/chef/ChefConfig.java | 94 -----
.../brooklyn/entity/chef/ChefConfigs.java | 102 -----
.../apache/brooklyn/entity/chef/ChefEntity.java | 26 --
.../brooklyn/entity/chef/ChefEntityImpl.java | 39 --
.../entity/chef/ChefLifecycleEffectorTasks.java | 364 ----------------
.../brooklyn/entity/chef/ChefServerTasks.java | 97 -----
.../brooklyn/entity/chef/ChefSoloDriver.java | 85 ----
.../brooklyn/entity/chef/ChefSoloTasks.java | 70 ----
.../apache/brooklyn/entity/chef/ChefTasks.java | 154 -------
.../entity/chef/KnifeConvergeTaskFactory.java | 249 -----------
.../brooklyn/entity/chef/KnifeTaskFactory.java | 241 -----------
.../entity/resolve/ChefEntitySpecResolver.java | 42 --
...oklyn.core.resolve.entity.EntitySpecResolver | 1 -
.../resources/OSGI-INF/blueprint/blueprint.xml | 5 -
software/base/src/main/resources/catalog.bom | 88 ----
.../brooklyn/entity/chef/ChefConfigsTest.java | 40 --
.../entity/chef/ChefLiveTestSupport.java | 99 -----
.../chef/ChefServerTasksIntegrationTest.java | 109 -----
.../AbstractChefToyMySqlEntityLiveTest.java | 41 --
.../ChefSoloDriverMySqlEntityLiveTest.java | 49 ---
.../mysql/ChefSoloDriverToyMySqlEntity.java | 89 ----
...micChefAutodetectToyMySqlEntityLiveTest.java | 43 --
...DynamicChefServerToyMySqlEntityLiveTest.java | 50 ---
.../DynamicChefSoloToyMySqlEntityLiveTest.java | 43 --
.../chef/mysql/DynamicToyMySqlEntityChef.java | 81 ----
.../chef/mysql/TypedToyMySqlEntityChef.java | 55 ---
.../apache/brooklyn/util/stream/Streams.java | 39 +-
.../brooklyn/util/stream/StreamsTest.java | 17 +
128 files changed, 2905 insertions(+), 4370 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0c2e1f60/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0c2e1f60/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0c2e1f60/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0c2e1f60/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
----------------------------------------------------------------------
[08/23] brooklyn-server git commit: task visibility: entity
initialization
Posted by he...@apache.org.
task visibility: entity initialization
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/e1f948ad
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/e1f948ad
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/e1f948ad
Branch: refs/heads/master
Commit: e1f948ad583af01138519a17a9d79d400ad0511a
Parents: 0a1acec
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 13:19:39 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Sep 19 13:19:39 2017 +0100
----------------------------------------------------------------------
.../core/mgmt/internal/LocalSubscriptionManager.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/e1f948ad/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index a9fb70b..65e4b14 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -145,8 +145,13 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
} else {
if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
- T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
- submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
+ em.submit(
+ MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer)),
+ "displayName", "Initial publication of "+s.sensor.getName()),
+ () -> {
+ T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
+ submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
+ });
}
}
[10/23] brooklyn-server git commit: include adjunct info as a
subscription description
Posted by he...@apache.org.
include adjunct info as a subscription description
description could be used for more things, exposed more broadly, but this is needed for now to make sense of those tasks
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/b0556dec
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/b0556dec
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/b0556dec
Branch: refs/heads/master
Commit: b0556decc88a1d5bdc1c45500ceeb7c2eb558716
Parents: e1f948a
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 14:30:19 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Sep 20 10:10:25 2017 +0100
----------------------------------------------------------------------
.../brooklyn/core/mgmt/internal/AbstractManagementContext.java | 3 ++-
.../core/mgmt/internal/AbstractSubscriptionManager.java | 5 +++++
.../brooklyn/core/mgmt/internal/LocalSubscriptionManager.java | 5 +++++
.../core/mgmt/internal/QueueingSubscriptionManager.java | 1 +
.../org/apache/brooklyn/core/mgmt/internal/Subscription.java | 1 +
5 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index f0c9335..86d2e06 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -263,7 +263,8 @@ public abstract class AbstractManagementContext implements ManagementContextInte
@Override
public SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a) {
// BSC is a thin wrapper around SM so fine to create a new one here
- Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e), BrooklynTaskTags.tagForContextAdjunct(a)));
+ Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e), BrooklynTaskTags.tagForContextAdjunct(a)),
+ "subscriptionDescription", "adjunct "+a.getId());
return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
index c15f770..e7ae59e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
@@ -30,6 +30,7 @@ import org.apache.brooklyn.api.mgmt.SubscriptionManager;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,4 +148,8 @@ public abstract class AbstractSubscriptionManager implements SubscriptionManager
return s.subscriber!=null ? s.subscriber : flags.containsKey("subscriber") ? flags.remove("subscriber") : s.listener;
}
+ protected <T> String getSubscriptionDescription(Map<String, Object> flags, Subscription<T> s) {
+ return s.subscriptionDescription!=null ? s.subscriptionDescription : flags.containsKey("subscriptionDescription") ? Strings.toString(flags.remove("subscriptionDescription")) : null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 65e4b14..6b1a5d9 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -106,6 +106,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
Entity producer = s.producer;
Sensor<T> sensor= s.sensor;
s.subscriber = getSubscriber(flags, s);
+ s.subscriptionDescription = getSubscriptionDescription(flags, s);
if (flags.containsKey("tags") || flags.containsKey("tag")) {
Iterable<?> tags = (Iterable<?>) flags.get("tags");
Object tag = flags.get("tag");
@@ -255,6 +256,10 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
description.append(sourceName==null ? "<null-source>" : sourceName);
description.append(" publishing to ");
description.append(s.subscriber instanceof Entity ? ((Entity)s.subscriber).getId() : s.subscriber);
+ if (Strings.isNonBlank(s.subscriptionDescription)) {
+ description.append(", ");
+ description.append(s.subscriptionDescription);
+ }
if (includeDescriptionForSensorTask(event)) {
name.append(" ");
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
index 290520d..57ad073 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
@@ -55,6 +55,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
QueuedSubscription<T> qs = new QueuedSubscription<T>();
qs.flags = flags;
s.subscriber = getSubscriber(flags, s);
+ s.subscriptionDescription = getSubscriptionDescription(flags, s);
qs.s = s;
queuedSubscriptions.add(qs);
return s;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
index 5e71701..1cf0ad7 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
@@ -37,6 +37,7 @@ class Subscription<T> implements SubscriptionHandle {
public Object subscriberExecutionManagerTag;
/** whether the tag was supplied by user, in which case we should not clear execution semantics */
public boolean subscriberExecutionManagerTagSupplied;
+ public String subscriptionDescription;
public Iterable<?> subscriberExtraExecTags;
public final Entity producer;
public final Sensor<T> sensor;
[11/23] brooklyn-server git commit: task visibility: ensure all tasks
have a name, updating exec.submit() methods to take name
Posted by he...@apache.org.
task visibility: ensure all tasks have a name, updating exec.submit() methods to take name
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/79cc9bcc
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/79cc9bcc
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/79cc9bcc
Branch: refs/heads/master
Commit: 79cc9bccf0ca80816ecd09846ec385d5cad3fbf8
Parents: aeecd3e
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Sep 20 10:09:05 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Sep 20 10:10:25 2017 +0100
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ExecutionContext.java | 17 +++-
.../brooklyn/api/mgmt/ExecutionManager.java | 14 ++-
.../camp/brooklyn/ConfigParametersYamlTest.java | 13 ++-
.../brooklyn/camp/brooklyn/spi/dsl/DslTest.java | 4 +-
.../core/mgmt/rebind/RebindManagerImpl.java | 10 +--
.../core/task/AbstractExecutionContext.java | 10 ++-
.../util/core/task/BasicExecutionContext.java | 5 ++
.../util/core/task/BasicExecutionManager.java | 8 +-
.../core/config/DeferredConfigTest.java | 8 +-
.../entity/ApplicationLifecycleStateTest.java | 18 ++--
.../brooklyn/core/entity/EntityAssertsTest.java | 92 ++++---------------
.../mgmt/persist/XmlMementoSerializerTest.java | 2 +-
.../qa/performance/TaskPerformanceTest.java | 6 +-
.../ssh/SshMachineLocationIntegrationTest.java | 6 +-
.../location/ssh/SshMachineLocationTest.java | 12 +--
.../util/core/task/TaskPredicatesTest.java | 5 +-
.../util/core/task/ValueResolverTest.java | 93 +++++++++++++++++---
.../brooklyn/policy/ha/ServiceReplacer.java | 6 +-
.../brooklyn/policy/ha/ServiceRestarter.java | 4 +-
.../rest/resources/SensorResourceTest.java | 2 +-
.../entity/brooklynnode/BrooklynNodeImpl.java | 12 +--
21 files changed, 178 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 142e664..c940ca0 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -52,12 +52,22 @@ public interface ExecutionContext extends Executor {
*/
<T> Task<T> submit(Map<?,?> properties, Callable<T> callable);
- /** {@link ExecutionManager#submit(Runnable) */
+ /** {@link ExecutionManager#submit(Runnable)
+ * @deprecated since 0.12.0 pass a display name or a more detailed map */
+ @Deprecated
Task<?> submit(Runnable runnable);
- /** {@link ExecutionManager#submit(Callable) */
+ /** {@link ExecutionManager#submit(Callable)
+ * @deprecated since 0.12.0 pass a display name or a more detailed map */
+ @Deprecated
<T> Task<T> submit(Callable<T> callable);
+ /** {@link ExecutionManager#submit(String Runnable) */
+ Task<?> submit(String displayName, Runnable runnable);
+
+ /** {@link ExecutionManager#submit(String, Callable) */
+ <T> Task<T> submit(String displayName, Callable<T> callable);
+
/** See {@link ExecutionManager#submit(Map, TaskAdaptable)}. */
<T> Task<T> submit(TaskAdaptable<T> task);
@@ -80,6 +90,9 @@ public interface ExecutionContext extends Executor {
// TODO reference ImmediateSupplier when that class is moved to utils project
@Beta
<T> Maybe<T> getImmediately(Object callableOrSupplierOrTask);
+ /** As {@link #getImmediately(Object)} but strongly typed for a task. */
+ @Beta
+ <T> Maybe<T> getImmediately(Task<T> callableOrSupplierOrTask);
/**
* Efficient shortcut for {@link #submit(TaskAdaptable)} followed by an immediate {@link Task#get()}.
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
index 97108ab..d4d1db4 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
@@ -79,11 +79,21 @@ public interface ExecutionManager {
// /** returns all tasks known to this manager (immutable) */
// public Set<Task<?>> getAllTasks();
- /** see {@link #submit(Map, TaskAdaptable)} */
+ /** see {@link #submit(Map, TaskAdaptable)}
+ * @deprecated since 0.12.0 pass displayName or map */
+ @Deprecated
public Task<?> submit(Runnable r);
+ /** see {@link #submit(Map, TaskAdaptable)}
+ * @deprecated since 0.12.0 pass displayName or map */
+ @Deprecated
+ public <T> Task<T> submit(Callable<T> r);
+
+ /** see {@link #submit(Map, TaskAdaptable)} */
+ public Task<?> submit(String displayName, Runnable c);
+
/** see {@link #submit(Map, TaskAdaptable)} */
- public <T> Task<T> submit(Callable<T> c);
+ public <T> Task<T> submit(String displayName, Callable<T> c);
/** see {@link #submit(Map, TaskAdaptable)} */
public <T> Task<T> submit(TaskAdaptable<T> task);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java
index 98ccb0b..3840e2d 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java
@@ -934,13 +934,12 @@ public class ConfigParametersYamlTest extends AbstractYamlRebindTest {
Entity app = createStartWaitAndLogApplication(yaml);
final TestEntity entity1 = (TestEntity) Iterables.getOnlyElement(app.getChildren());
- TestEntity entity2 = entity1.getExecutionContext().submit(new Callable<TestEntity>() {
- public TestEntity call() {
- TestEntity entity2 = entity1.addChild(EntitySpec.create(TestEntity.class));
- entity2.start(Collections.<Location>emptyList());
- return entity2;
- }
- }).get();
+ TestEntity entity2 = entity1.getExecutionContext().submit("create and start", () -> {
+ TestEntity entity2i = entity1.addChild(EntitySpec.create(TestEntity.class));
+ entity2i.start(Collections.<Location>emptyList());
+ return entity2i;
+ })
+ .get();
Entities.dumpInfo(app);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
index 63aba8e..04b6bf1 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
@@ -327,7 +327,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
protected void runConcurrentWorker(Supplier<Runnable> taskSupplier) {
Collection<Task<?>> results = new ArrayList<>();
for (int i = 0; i < MAX_PARALLEL_RESOLVERS; i++) {
- Task<?> result = app.getExecutionContext().submit(taskSupplier.get());
+ Task<?> result = app.getExecutionContext().submit("parallel "+i, taskSupplier.get());
results.add(result);
}
for (Task<?> result : results) {
@@ -550,7 +550,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
};
if (execInTask) {
- Task<Maybe<?>> task = ((EntityInternal)context).getExecutionContext().submit(job);
+ Task<Maybe<?>> task = ((EntityInternal)context).getExecutionContext().submit("Resolving DSL for test: "+dsl, job);
task.get(Asserts.DEFAULT_LONG_TIMEOUT);
assertTrue(task.isDone());
return task.get();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index d896376..381fee8 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -520,15 +520,7 @@ public class RebindManagerImpl implements RebindManager {
ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
if (ec == null) {
ec = managementContext.getServerExecutionContext();
- Task<List<Application>> task = ec.submit(new Callable<List<Application>>() {
- @Override public List<Application> call() throws Exception {
- return rebindImpl(classLoader, exceptionHandler, mode);
- }});
- try {
- return task.get();
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
+ return ec.get(Tasks.<List<Application>>builder().displayName("rebind").dynamic(false).body(() -> rebindImpl(classLoader, exceptionHandler, mode)).build());
} else {
return rebindImpl(classLoader, exceptionHandler, mode);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java
index e7debb9..424bbfc 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java
@@ -25,6 +25,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.ExecutionManager;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+import org.apache.brooklyn.util.collections.MutableMap;
import com.google.common.collect.Maps;
@@ -41,11 +42,16 @@ public abstract class AbstractExecutionContext implements ExecutionContext {
public Task<?> submit(Map<?,?> properties, Runnable runnable) { return submitInternal(properties, runnable); }
/** @see #submit(Map, Runnable) */
- @Override
+ @Override
+ public Task<?> submit(String displayName, Runnable runnable) { return submitInternal(MutableMap.of("displayName", displayName), runnable); }
+ @Override @Deprecated
public Task<?> submit(Runnable runnable) { return submitInternal(Maps.newLinkedHashMap(), runnable); }
+
/** @see #submit(Map, Runnable) */
- @Override
+ @Override
+ public <T> Task<T> submit(String displayName, Callable<T> callable) { return submitInternal(MutableMap.of("displayName", displayName), callable); }
+ @Override @Deprecated
public <T> Task<T> submit(Callable<T> callable) { return submitInternal(Maps.newLinkedHashMap(), callable); }
/** @see #submit(Map, Runnable) */
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 1b85663..1236219 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -239,6 +239,11 @@ public class BasicExecutionContext extends AbstractExecutionContext {
}
}
+ @Override
+ public <T> Maybe<T> getImmediately(Task<T> callableOrSupplier) {
+ return getImmediately((Object) callableOrSupplier);
+ }
+
/** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context;
* currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances;
* with tasks if it is submitted or in progress,
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 46e501e8..e85bf2f 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -57,7 +57,7 @@ import org.apache.brooklyn.core.config.Sanitizer;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
@@ -388,10 +388,12 @@ public class BasicExecutionManager implements ExecutionManager {
}
}
- @Override public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); }
+ @Override @Deprecated public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); }
+ @Override public Task<?> submit(String displayName, Runnable r) { return submit(MutableMap.of("displayName", displayName), r); }
@Override public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); }
- @Override public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); }
+ @Override @Deprecated public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); }
+ @Override public <T> Task<T> submit(String displayName, Callable<T> c) { return submit(MutableMap.of("displayName", displayName), c); }
@Override public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); }
@Override public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); }
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java
index 9decca8..c2cf908 100644
--- a/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java
@@ -21,7 +21,6 @@ package org.apache.brooklyn.core.config;
import static org.testng.Assert.assertEquals;
import java.util.List;
-import java.util.concurrent.Callable;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.Sensor;
@@ -55,13 +54,10 @@ public class DeferredConfigTest extends BrooklynAppUnitTestSupport {
void doTestDeferredConfigInList(final boolean delay) throws Exception {
// Simulate a deferred value
- Task<Sensor<?>> sensorFuture = app.getExecutionContext().submit(new Callable<Sensor<?>>() {
- @Override
- public Sensor<?> call() throws Exception {
+ Task<Sensor<?>> sensorFuture = app.getExecutionContext().submit("deferred return sensor", () -> {
if (delay) Time.sleep(Duration.FIVE_SECONDS);
return TestApplication.MY_ATTRIBUTE;
- }
- });
+ });
app.config().set(SENSORS_UNTYPED, (Object)ImmutableList.of(sensorFuture));
if (!delay) sensorFuture.get(Duration.ONE_SECOND);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java
index e8a3eee..f301b3c 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java
@@ -267,20 +267,14 @@ public class ApplicationLifecycleStateTest extends BrooklynMgmtUnitTestSupport {
}
});
- Task<?> first = mgmt.getExecutionManager().submit(new Runnable() {
- @Override
- public void run() {
+ Task<?> first = mgmt.getExecutionManager().submit("setting test sensor", () -> {
app.sensors().set(TEST_SENSOR, "first");
log.debug("set first");
- }
- });
- Task<?> second = mgmt.getExecutionManager().submit(new Runnable() {
- @Override
- public void run() {
+ });
+ Task<?> second = mgmt.getExecutionManager().submit("setting test sensor", () -> {
app.sensors().set(TEST_SENSOR, "second");
log.debug("set second");
- }
- });
+ });
first.blockUntilEnded();
second.blockUntilEnded();
@@ -394,8 +388,8 @@ public class ApplicationLifecycleStateTest extends BrooklynMgmtUnitTestSupport {
};
// Simulates firing the emit method from event handlers in different threads
- mgmt.getExecutionManager().submit(overrideJob);
- mgmt.getExecutionManager().submit(overrideJob);
+ mgmt.getExecutionManager().submit("emitting test sensor", overrideJob);
+ mgmt.getExecutionManager().submit("emitting test sensor", overrideJob);
Asserts.eventually(Suppliers.ofInstance(seenValues), CollectionFunctionals.sizeEquals(2));
Asserts.succeedsContinually(new Runnable() {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
index bfdac3c..3160518 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.brooklyn.core.entity;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -33,7 +32,6 @@ import org.apache.brooklyn.util.time.Duration;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -85,12 +83,7 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
entity.sensors().set(TestEntity.NAME, "before");
final String after = "after";
- Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
- @Override
- public void run() {
- EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after);
- }
- });
+ Task<?> assertValue = entity.getExecutionContext().submit("assert attr equals", () -> EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after));
entity.sensors().set(TestEntity.NAME, after);
assertValue.get();
}
@@ -105,12 +98,7 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
@Test
public void shouldAssertAttributeEventuallyNonNull() throws Exception {
EntityAsserts.assertAttributeEquals(entity, TestEntity.NAME, null);
- Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
- @Override
- public void run() {
- EntityAsserts.assertAttributeEventuallyNonNull(entity, TestEntity.NAME);
- }
- });
+ Task<?> assertValue = entity.getExecutionContext().submit("assert attr non-null", () -> EntityAsserts.assertAttributeEventuallyNonNull(entity, TestEntity.NAME));
entity.sensors().set(TestEntity.NAME, "something");
assertValue.get();
}
@@ -118,18 +106,11 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
@Test
public void shouldAssertAttributeEventually() throws Exception {
final CountDownLatch eventuallyEntered = new CountDownLatch(2);
- Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
- @Override
- public void run() {
- EntityAsserts.assertAttributeEventually(entity, TestEntity.NAME, new Predicate<String>() {
- @Override
- public boolean apply(String input) {
- eventuallyEntered.countDown();
- return input.matches(".*\\d+");
- }
- });
- }
- });
+ Task<?> assertValue = entity.getExecutionContext().submit("assert attribute", () -> EntityAsserts.assertAttributeEventually(entity, TestEntity.NAME,
+ (input) -> {
+ eventuallyEntered.countDown();
+ return input.matches(".*\\d+");
+ }) );
eventuallyEntered.await();
entity.sensors().set(TestEntity.NAME, "testing testing 123");
assertValue.get();
@@ -146,18 +127,11 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
public void shouldAssertPredicateEventuallyTrue() throws Exception {
final int testVal = 987654321;
final CountDownLatch eventuallyEntered = new CountDownLatch(2);
- Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
- @Override
- public void run() {
- EntityAsserts.assertPredicateEventuallyTrue(entity, new Predicate<TestEntity>() {
- @Override
- public boolean apply(TestEntity input) {
- eventuallyEntered.countDown();
- return testVal == input.getSequenceValue();
- }
- });
- }
- });
+ Task<?> assertValue = entity.getExecutionContext().submit("assert predicate", () -> EntityAsserts.assertPredicateEventuallyTrue(entity,
+ (input) -> {
+ eventuallyEntered.countDown();
+ return testVal == input.getSequenceValue();
+ }));
eventuallyEntered.await();
entity.setSequenceValue(testVal);
assertValue.get();
@@ -175,12 +149,7 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
public void shouldFailAssertAttributeEqualsContinually() throws Throwable {
final String myName = "myname";
entity.sensors().set(TestEntity.NAME, myName);
- Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
- @Override
- public void run() {
- EntityAsserts.assertAttributeEqualsContinually(entity, TestEntity.NAME, myName);
- }
- });
+ Task<?> assertValue = entity.getExecutionContext().submit("check attr equals", () -> EntityAsserts.assertAttributeEqualsContinually(entity, TestEntity.NAME, myName));
entity.sensors().set(TestEntity.NAME, "something");
try {
assertValue.get();
@@ -199,20 +168,10 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
app.createAndManageChild(stooge);
app.createAndManageChild(stooge);
- Task<?> assertValue1 = entity.getExecutionContext().submit(new Runnable() {
- @Override
- public void run() {
- EntityAsserts.assertGroupSizeEqualsEventually(ImmutableMap.of("timeout", "2s"), stooges, 3);
- }
- });
+ Task<?> assertValue1 = entity.getExecutionContext().submit("assert size", () -> EntityAsserts.assertGroupSizeEqualsEventually(ImmutableMap.of("timeout", "2s"), stooges, 3));
stooges.setEntityFilter(EntityPredicates.configEqualTo(TestEntity.CONF_NAME, STOOGE));
assertValue1.get();
- Task<?> assertValue2 = entity.getExecutionContext().submit(new Runnable() {
- @Override
- public void run() {
- EntityAsserts.assertGroupSizeEqualsEventually(stooges, 0);
- }
- });
+ Task<?> assertValue2 = entity.getExecutionContext().submit("assert size 0", () -> EntityAsserts.assertGroupSizeEqualsEventually(stooges, 0));
stooges.setEntityFilter(EntityPredicates.configEqualTo(TestEntity.CONF_NAME, "Marx Brother"));
assertValue2.get();
}
@@ -220,24 +179,11 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
@Test
public void shouldAssertAttributeChangesEventually () throws Exception{
entity.sensors().set(TestEntity.NAME, "before");
- final Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
- @Override
- public void run() {
- EntityAsserts.assertAttributeChangesEventually(entity, TestEntity.NAME);
- }
- });
+ final Task<?> assertValue = entity.getExecutionContext().submit("check attr change", () -> EntityAsserts.assertAttributeChangesEventually(entity, TestEntity.NAME));
Repeater.create()
- .repeat(new Runnable() {
- @Override
- public void run() {
- entity.sensors().set(TestEntity.NAME, "after" + System.currentTimeMillis());
- }
- }).until(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return assertValue.isDone();
- }
- }).every(Duration.millis(10))
+ .repeat(() -> entity.sensors().set(TestEntity.NAME, "after" + System.currentTimeMillis()))
+ .until(() -> assertValue.isDone())
+ .every(Duration.millis(10))
.run();
assertValue.get();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java
index 59ea8f2..88082ff 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java
@@ -611,7 +611,7 @@ public class XmlMementoSerializerTest {
public void testTask() throws Exception {
final TestApplication app = TestApplication.Factory.newManagedInstanceForTests();
mgmt = app.getManagementContext();
- Task<String> completedTask = app.getExecutionContext().submit(Callables.returning("myval"));
+ Task<String> completedTask = app.getExecutionContext().submit("return myval", Callables.returning("myval"));
completedTask.get();
String loggerName = UnwantedStateLoggingMapper.class.getName();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java b/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java
index a7a531b..949f139 100644
--- a/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java
@@ -74,11 +74,7 @@ public class TaskPerformanceTest extends AbstractPerformanceTest {
.summary("TaskPerformanceTest.testExecuteSimplestRunnable")
.iterations(numIterations)
.minAcceptablePerSecond(minRatePerSec)
- .job(new Runnable() {
- @Override
- public void run() {
- executionManager.submit(work);
- }})
+ .job(() -> executionManager.submit("inner", work))
.completionLatch(completionLatch));
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java
index 9ff39c1..e648c32 100644
--- a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java
@@ -123,11 +123,7 @@ public class SshMachineLocationIntegrationTest extends SshMachineLocationTest {
BasicExecutionManager execManager = new BasicExecutionManager("mycontextid");
BasicExecutionContext execContext = new BasicExecutionContext(execManager);
try {
- MachineDetails details = execContext.submit(new Callable<MachineDetails>() {
- @Override
- public MachineDetails call() {
- return host.getMachineDetails();
- }}).get();
+ MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get();
LOG.info("machineDetails="+details);
assertNotNull(details);
} finally {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java
index 5c6e918..d25991c 100644
--- a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java
@@ -131,11 +131,7 @@ public class SshMachineLocationTest extends BrooklynAppUnitTestSupport {
BasicExecutionManager execManager = new BasicExecutionManager("mycontextid");
BasicExecutionContext execContext = new BasicExecutionContext(execManager);
try {
- MachineDetails details = execContext.submit(new Callable<MachineDetails>() {
- @Override
- public MachineDetails call() {
- return host.getMachineDetails();
- }}).get();
+ MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get();
LOG.info("machineDetails="+details);
assertNotNull(details);
@@ -166,11 +162,7 @@ public class SshMachineLocationTest extends BrooklynAppUnitTestSupport {
BasicExecutionManager execManager = new BasicExecutionManager("mycontextid");
BasicExecutionContext execContext = new BasicExecutionContext(execManager);
try {
- MachineDetails details = execContext.submit(new Callable<MachineDetails>() {
- @Override
- public MachineDetails call() {
- return host.getMachineDetails();
- }}).get();
+ MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get();
LOG.info("machineDetails="+details);
assertNotNull(details);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
index 90d6b06..2656c80 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
@@ -82,14 +82,13 @@ public class TaskPredicatesTest extends BrooklynAppUnitTestSupport {
@Test
public void testIsDone() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- Task<?> task = app.getExecutionContext().submit(new Runnable() {
- public void run() {
+ Task<?> task = app.getExecutionContext().submit("await latch", () -> {
try {
latch.await();
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
- }});
+ });
assertFalse(TaskPredicates.isDone().apply(task));
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index 1f5c754..455f8ad 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.fail;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -87,19 +88,17 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
Assert.assertTrue(result.isAbsent(), "result="+result);
Exception exception = Maybe.getException(result);
- Assert.assertTrue(exception.toString().contains("no execution context available"), "exception="+exception);
+ Asserts.assertStringContains(exception.toString(), "no execution context available");
+
+ Asserts.assertThat(t, (tt) -> !tt.isBegun());
}
public void testUnsubmittedTaskWithExecutionContextExecutesAndReturns() {
final Task<String> t = newSleepTask(Duration.ZERO, "foo");
// Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
- Maybe<String> result = app.getExecutionContext()
- .submit(new Callable<Maybe<String> >() {
- @Override
- public Maybe<String> call() throws Exception {
- return Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe();
- }})
+ Maybe<String> result = app.getExecutionContext()
+ .submit("resolving sleep task", () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe())
.getUnchecked();
Assert.assertEquals(result.get(), "foo");
@@ -110,17 +109,83 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
// Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
// However, it will quickly timeout as the task will not have completed.
- Maybe<String> result = app.getExecutionContext()
- .submit(new Callable<Maybe<String> >() {
- @Override
- public Maybe<String> call() throws Exception {
- return Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
- }})
+ Maybe<String> result = app.getExecutionContext()
+ .submit("resolving sleep task", () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe())
.getUnchecked();
Assert.assertTrue(result.isAbsent(), "result="+result);
Exception exception = Maybe.getException(result);
Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception);
+
+ Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS);
+ Asserts.assertThat(t, (tt) -> !tt.isDone());
+ }
+
+ public void testUnsubmittedTaskWithExecutionContextExecutesAndReturnsForeground() {
+ final Task<String> t = newSleepTask(Duration.ZERO, "foo");
+
+ // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
+ Maybe<String> result = app.getExecutionContext()
+ .get(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() ));
+
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOutForeground() {
+ final Task<String> t = newSleepTask(Duration.ONE_MINUTE, "foo");
+
+ // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
+ // However, it will quickly timeout as the task will not have completed.
+ Maybe<String> result = app.getExecutionContext()
+ .get(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe() ));
+
+ Assert.assertTrue(result.isAbsent(), "result="+result);
+ Exception exception = Maybe.getException(result);
+ Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception);
+
+ Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS);
+ Asserts.assertThat(t, (tt) -> !tt.isDone());
+ }
+
+ public void testUnsubmittedTaskWithExecutionContextTimesOutWhenImmediate() {
+ final Task<String> t = newSleepTask(Duration.ZERO, "foo");
+
+ // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task
+ Maybe<Maybe<String>> result = app.getExecutionContext()
+ .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() ));
+
+ // However, the resubmission will not be waited upon
+ Assert.assertTrue(result.isPresent(), "result="+result);
+ Assert.assertTrue(result.get().isAbsent(), "result="+result);
+ Exception exception = Maybe.getException(result.get());
+
+ Asserts.assertStringContainsIgnoreCase(exception.toString(), "immediate", "not", "available");
+
+ // But the underlying task is running
+ Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS);
+ Asserts.eventually(() -> t, (tt) -> tt.isDone(), Duration.TEN_SECONDS);
+ Asserts.assertThat(t, (tt) -> Objects.equals(tt.getUnchecked(), "foo"));
+
+ // And subsequent get _is_ immediate
+ result = app.getExecutionContext()
+ .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() ));
+ Assert.assertEquals(result.get().get(), "foo");
+ }
+
+ public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOutImmediate() {
+ final Task<String> t = newSleepTask(Duration.ONE_MINUTE, "foo");
+
+ // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
+ // However, it will quickly timeout as the task will not have completed.
+ Maybe<Maybe<String>> result = app.getExecutionContext()
+ .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe() ));
+
+ Assert.assertTrue(result.isPresent(), "result="+result);
+ Assert.assertTrue(result.get().isAbsent(), "result="+result);
+ Exception exception = Maybe.getException(result.get());
+ Asserts.assertStringContainsIgnoreCase(exception.toString(), "immediate", "not", "available");
+ Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS);
+ Asserts.assertThat(t, (tt) -> !tt.isDone());
}
public void testSwallowError() {
@@ -161,7 +226,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
public void testGetImmediatelyInTask() throws Exception {
final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
- Task<CallInfo> task = app.getExecutionContext().submit(new Callable<CallInfo>() {
+ Task<CallInfo> task = app.getExecutionContext().submit("test task for call stack", new Callable<CallInfo>() {
@Override
public CallInfo call() {
return myUniquelyNamedMethod();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
index e14433d..c799336 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
@@ -129,7 +129,7 @@ public class ServiceReplacer extends AbstractPolicy {
if (isRunning()) {
highlightViolation("Failure detected");
LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")");
- getExecutionContext().submit(() -> onDetectedFailure(event));
+ getExecutionContext().submit("Analyzing detected failure", () -> onDetectedFailure(event));
} else {
LOG.warn("ServiceReplacer not running, so not acting on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")");
}
@@ -171,9 +171,9 @@ public class ServiceReplacer extends AbstractPolicy {
return;
}
- highlightViolation(violationText+", triggering restart");
+ highlightViolation(violationText+", triggering replacement");
LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
- Task<?> t = getExecutionContext().submit(() -> {
+ Task<?> t = getExecutionContext().submit("Replace member on failure", () -> {
try {
Entities.invokeEffectorWithArgs(entity, entity, MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get();
consecutiveReplacementFailureTimes.clear();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
index a31e3c0..3c0341e 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
@@ -29,14 +29,12 @@ import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.core.sensor.BasicNotificationSensor;
import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
-import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.time.Duration;
@@ -112,7 +110,7 @@ public class ServiceRestarter extends AbstractPolicy {
if (isRunning()) {
LOG.info("ServiceRestarter notified; dispatching job for "+entity+" ("+event.getValue()+")");
- getExecutionContext().submit(() -> onDetectedFailure(event));
+ getExecutionContext().submit("Analyzing detected failure", () -> onDetectedFailure(event));
} else {
LOG.warn("ServiceRestarter not running, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java
index 391f2bb..e5fc5b2 100644
--- a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java
+++ b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java
@@ -325,7 +325,7 @@ public class SensorResourceTest extends BrooklynRestResourceTest {
@Test
public void testGetSensorValueOfTypeCompletedTask() throws Exception {
- Task<String> task = entity.getExecutionContext().submit(Callables.returning("myval"));
+ Task<String> task = entity.getExecutionContext().submit("returning myval", Callables.returning("myval"));
task.get();
entity.sensors().set(Sensors.newSensor(Task.class, "myTask"), task);
doGetSensorTest("myTask", String.class, "\"myval\"");
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
index 54c81d1..d6e3fce 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
@@ -50,32 +50,32 @@ import org.apache.brooklyn.entity.brooklynnode.EntityHttpClient.ResponseCodePred
import org.apache.brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody;
import org.apache.brooklyn.entity.brooklynnode.effector.SetHighAvailabilityModeEffectorBody;
import org.apache.brooklyn.entity.brooklynnode.effector.SetHighAvailabilityPriorityEffectorBody;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
import org.apache.brooklyn.entity.software.base.SoftwareProcess.StopSoftwareParameters.StopMode;
+import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks;
import org.apache.brooklyn.feed.http.HttpFeed;
import org.apache.brooklyn.feed.http.HttpPollConfig;
import org.apache.brooklyn.feed.http.HttpValueFunctions;
import org.apache.brooklyn.feed.http.JsonFunctions;
-import org.apache.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.brooklyn.util.collections.Jsonya;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.http.HttpToolResponse;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.TaskTags;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.PropagatedRuntimeException;
import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.http.HttpToolResponse;
import org.apache.brooklyn.util.javalang.Enums;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.repeat.Repeater;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
+import org.apache.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
@@ -222,7 +222,7 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
// we could wait for BrooklynTaskTags.getTasksInEntityContext(ExecutionManager, this).isEmpty();
Task<?> stopEffectorTask = BrooklynTaskTags.getClosestEffectorTask(Tasks.current(), Startable.STOP);
Task<?> topEntityTask = getTopEntityTask(stopEffectorTask);
- getManagementContext().getExecutionManager().submit(new UnmanageTask(topEntityTask, this));
+ getManagementContext().getExecutionManager().submit("Unmanage Brooklyn entity after stop", new UnmanageTask(topEntityTask, this));
}
}
[22/23] brooklyn-server git commit: address review comments
Posted by he...@apache.org.
address review comments
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/2dcb0a03
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/2dcb0a03
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/2dcb0a03
Branch: refs/heads/master
Commit: 2dcb0a03a9e9b27a81535a13bd48d1958f8984d9
Parents: 9213f0e
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Oct 6 01:04:34 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 6 01:12:43 2017 +0100
----------------------------------------------------------------------
.../apache/brooklyn/api/mgmt/ExecutionContext.java | 2 +-
.../apache/brooklyn/api/mgmt/ManagementContext.java | 3 +++
.../brooklyn/core/config/ConfigConstraints.java | 2 +-
.../mgmt/internal/AbstractManagementContext.java | 16 ++++++++++++++++
.../internal/NonDeploymentManagementContext.java | 9 +++++++++
.../core/mgmt/rebind/RebindManagerImpl.java | 3 ++-
.../brooklyn/core/objs/AbstractEntityAdjunct.java | 6 +-----
.../util/core/task/BasicExecutionManager.java | 5 ++++-
.../brooklyn/core/entity/EntityAssertsTest.java | 3 ++-
.../mgmt/internal/LocalSubscriptionManagerTest.java | 8 +++-----
10 files changed, 42 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 2d8fe23..d303ba9 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -62,7 +62,7 @@ public interface ExecutionContext extends Executor {
@Deprecated
<T> Task<T> submit(Callable<T> callable);
- /** {@link ExecutionManager#submit(String Runnable) */
+ /** {@link ExecutionManager#submit(String, Runnable) */
Task<?> submit(String displayName, Runnable runnable);
/** {@link ExecutionManager#submit(String, Callable) */
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index 00dd4d3..c1a2c21 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -166,6 +166,9 @@ public interface ManagementContext {
*/
ExecutionContext getExecutionContext(Entity entity);
+ /** As {@link #getExecutionContext(Entity)} where there is also an adjunct */
+ ExecutionContext getExecutionContext(Entity e, EntityAdjunct a);
+
/**
* Returns a {@link SubscriptionContext} instance representing subscriptions
* (from the {@link SubscriptionManager}) associated with this entity, and capable
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
index c4ce81d..682dedb 100644
--- a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
+++ b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
@@ -117,7 +117,7 @@ public abstract class ConfigConstraints<T extends BrooklynObject> {
public Iterable<ConfigKey<?>> getViolations() {
ExecutionContext exec =
getBrooklynObject() instanceof EntityInternal ? ((EntityInternal)getBrooklynObject()).getExecutionContext() :
- // getBrooklynObject() instanceof AbstractEntityAdjunct ? ((AbstractEntityAdjunct)getBrooklynObject()).getExecutionContext() :
+ getBrooklynObject() instanceof AbstractEntityAdjunct ? ((AbstractEntityAdjunct)getBrooklynObject()).getExecutionContext() :
null;
if (exec!=null) {
return exec.get(
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index 7a0fabd..27645bc 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -71,6 +71,7 @@ import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContex
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl;
import org.apache.brooklyn.core.mgmt.rebind.RebindManagerImpl;
+import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
import org.apache.brooklyn.core.typereg.BasicBrooklynTypeRegistry;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.ResourceUtils;
@@ -245,6 +246,21 @@ public abstract class AbstractManagementContext implements ManagementContextInte
return ((EntityInternal)e).getExecutionContext();
}
}
+
+ @Override
+ public ExecutionContext getExecutionContext(Entity e, EntityAdjunct adjunct) {
+ // BEC is a thin wrapper around EM so fine to create a new one here; but make sure it gets the real entity
+ if (e instanceof AbstractEntityAdjunct) {
+ ImmutableSet<Object> tags = ImmutableSet.<Object>of(
+ BrooklynTaskTags.tagForContextAdjunct(adjunct),
+ BrooklynTaskTags.tagForContextEntity(e),
+ this
+ );
+ return new BasicExecutionContext(getExecutionManager(), tags);
+ } else {
+ return ((EntityInternal)e).getExecutionContext();
+ }
+ }
@Override
public ExecutionContext getServerExecutionContext() {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index bac7aeb..b5fd0b5 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -282,6 +282,15 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
}
@Override
+ public ExecutionContext getExecutionContext(Entity entity, EntityAdjunct adjunct) {
+ if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity);
+ if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED)
+ throw new IllegalStateException("Entity "+entity+" is no longer managed; execution context not available");
+ checkInitialManagementContextReal();
+ return initialManagementContext.getExecutionContext(entity, adjunct);
+ }
+
+ @Override
public ExecutionContext getServerExecutionContext() {
return initialManagementContext.getServerExecutionContext();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index f5a1c0c..0830900 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -490,7 +490,8 @@ public class RebindManagerImpl implements RebindManager {
ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
if (ec == null) {
ec = managementContext.getServerExecutionContext();
- return ec.get(Tasks.<List<Application>>builder().displayName("rebind").dynamic(false).body(() -> rebindImpl(classLoader, exceptionHandler, mode)).build());
+ return ec.get(Tasks.<List<Application>>builder().displayName("rebind").dynamic(false)
+ .body(() -> rebindImpl(classLoader, exceptionHandler, mode)).build());
} else {
return rebindImpl(classLoader, exceptionHandler, mode);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index c6cb74c..7b2f6ad 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -54,15 +54,12 @@ import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.internal.ConfigUtilsInternal;
-import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
-import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.core.task.BasicExecutionContext;
import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -428,8 +425,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
public void setEntity(EntityLocal entity) {
if (destroyed.get()) throw new IllegalStateException("Cannot set entity on a destroyed entity adjunct");
this.entity = entity;
- this.execution = new BasicExecutionContext( getManagementContext().getExecutionManager(),
- MutableList.of(BrooklynTaskTags.tagForContextAdjunct(this), BrooklynTaskTags.tagForContextEntity(entity)) );
+ this.execution = getManagementContext().getExecutionContext(entity, this);
if (entity!=null && getCatalogItemId() == null) {
setCatalogItemIdAndSearchPath(entity.getCatalogItemId(), entity.getCatalogItemIdSearchPath());
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index c7a9855..c53277d 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -799,7 +799,10 @@ public class BasicExecutionManager implements ExecutionManager {
Task<T> t = Tasks.<T>builder().dynamic(false).displayName(displayName+" (placeholder for "+id+")")
.description("Details of the original task have been forgotten.")
.body(Callables.returning((T)null)).build();
- ((BasicTask<T>)t).ignoreIfNotRun();
+ // don't really want anyone executing the "gone" task...
+ // also if we are GC'ing tasks then cancelled may help with cleanup
+ // of sub-tasks that have lost their submitted-by-task reference ?
+ // also don't want warnings when it's finalized, this means we don't need ignoreIfNotRun()
((BasicTask<T>)t).cancelled = true;
return t;
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
index 3160518..2967f22 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
@@ -83,7 +83,8 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
entity.sensors().set(TestEntity.NAME, "before");
final String after = "after";
- Task<?> assertValue = entity.getExecutionContext().submit("assert attr equals", () -> EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after));
+ Task<?> assertValue = entity.getExecutionContext().submit("assert attr equals",
+ () -> EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after));
entity.sensors().set(TestEntity.NAME, after);
assertValue.get();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
index 1373545..5c04978 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
@@ -193,8 +193,7 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
// delivery should be in subscription order, so 123 then 456
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
// wait for the above delivery - otherwise it might get dropped
- Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
- Asserts.assertSize(listener.getEvents(), 1); });
+ Asserts.succeedsEventually(() -> Asserts.assertSize(listener.getEvents(), 1));
entity.sensors().set(TestEntity.SEQUENCE, 456);
// notifications don't have "initial value" so don't get -1
@@ -207,8 +206,7 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener);
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
- Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() {
- @Override public void run() {
+ Asserts.succeedsEventually(() -> {
Asserts.assertEquals(listener.getEvents(), ImmutableList.of(
new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456),
@@ -216,7 +214,7 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING),
new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
"actually got: "+listener.getEvents());
- }});
+ });
}
@Test
[04/23] brooklyn-server git commit: task visibility: better names for
config retrieval tasks
Posted by he...@apache.org.
task visibility: better names for config retrieval tasks
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/8ecf3950
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/8ecf3950
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/8ecf3950
Branch: refs/heads/master
Commit: 8ecf3950ac1c158e6cac815c596ae0906439363b
Parents: 4430f76
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Sep 15 11:24:43 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:55 2017 +0100
----------------------------------------------------------------------
.../core/objs/AbstractConfigurationSupportInternal.java | 12 ++----------
.../brooklyn/core/objs/BrooklynObjectInternal.java | 2 ++
2 files changed, 4 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8ecf3950/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
index 460c8c4..f67f1f5 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
@@ -106,8 +106,7 @@ public abstract class AbstractConfigurationSupportInternal implements BrooklynOb
}
};
- // TODO can we remove the DST ? this is structured so maybe not
- Task<T> t = Tasks.<T>builder().body(job)
+ Task<T> t = Tasks.<T>builder().dynamic(false).body(job)
.displayName("Resolving config "+key.getName())
.description("Internal non-blocking structured key resolution")
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
@@ -126,20 +125,13 @@ public abstract class AbstractConfigurationSupportInternal implements BrooklynOb
* See {@link #getNonBlockingResolvingStructuredKey(ConfigKey)}.
*/
protected <T> Maybe<T> getNonBlockingResolvingSimple(ConfigKey<T> key) {
- // TODO See AbstractConfigMapImpl.getConfigImpl, for how it looks up the "container" of the
- // key, so that it gets the right context entity etc.
-
- // getRaw returns Maybe(val) if the key was explicitly set (where val can be null)
- // or Absent if the config key was unset.
Object unresolved = getRaw(key).or(key.getDefaultValue());
- // TODO add description that we are evaluating this config key to be used if the code below submits futher tasks
- // and look at other uses of "description" method
- // and make sure it is marked transient
Maybe<Object> resolved = Tasks.resolving(unresolved)
.as(Object.class)
.immediately(true)
.deep(true)
.context(getContext())
+ .description("Resolving raw value of simple config "+key)
.getMaybe();
if (resolved.isAbsent()) return Maybe.Absent.<T>castAbsent(resolved);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8ecf3950/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
index 6ad42f4..972612b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
@@ -96,7 +96,9 @@ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable {
/**
* Returns the uncoerced value for this config key, if available, not taking any default.
* If there is no local value and there is an explicit inherited value, will return the inherited.
+ * May return a {@link Maybe}-wrapped null if the value is explicitly null.
* Returns {@link Maybe#absent()} if the key is not explicitly set on this object or an ancestor.
+ * Often this is used with {@link Maybe#or(Object))} to return default value.
* <p>
* See also {@link #getLocalRaw(ConfigKey).
*/
[05/23] brooklyn-server git commit: fix visibility: entity init runs
in same thread
Posted by he...@apache.org.
fix visibility: entity init runs in same thread
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/84d24d1a
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/84d24d1a
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/84d24d1a
Branch: refs/heads/master
Commit: 84d24d1a949275e6bad3bbabefe6bf422f9fade7
Parents: d4c9fe1
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 13:09:39 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Sep 19 13:12:11 2017 +0100
----------------------------------------------------------------------
.../apache/brooklyn/core/objs/proxy/InternalEntityFactory.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/84d24d1a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
index a14225b..bd6cc1d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
@@ -309,7 +309,7 @@ public class InternalEntityFactory extends InternalFactory {
* which currently show up at the top level once the initializer task completes.
* TODO It would be nice if these schedule tasks were grouped in a bucket!
*/
- ((EntityInternal)entity).getExecutionContext().submit(Tasks.builder().dynamic(false).displayName("Entity initialization")
+ ((EntityInternal)entity).getExecutionContext().get(Tasks.builder().dynamic(false).displayName("Entity initialization")
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.body(new Runnable() {
@Override
@@ -354,7 +354,7 @@ public class InternalEntityFactory extends InternalFactory {
initEntityAndDescendants(child.getId(), entitiesByEntityId, specsByEntityId);
}
}
- }).build()).getUnchecked();
+ }).build());
}
/**