You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2021/10/25 12:28:02 UTC
[brooklyn-server] 11/15: improve task garbage collection,
using task name also
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 84410a8275f60701347fdd3e5dcef44d72d93104
Author: Alex Heneveld <al...@cloudsoftcorp.com>
AuthorDate: Thu Oct 21 21:23:48 2021 +0100
improve task garbage collection, using task name also
previously it looked only in task tags to determine which tasks to remember after completion;
now it also looks at the task's _name_ and only keeps 10 tasks per entity with a given name.
useful for scheduled tasks where otherwise we might keep lots, if they don't have distinguishing tags.
---
.../mgmt/internal/BrooklynGarbageCollector.java | 121 ++++++++++---
.../mgmt/internal/EntityExecutionManagerTest.java | 189 ++++++++++++++++++---
2 files changed, 265 insertions(+), 45 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
index f780b5c..5c6f2b7 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.mgmt.HasTaskChildren;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
@@ -133,7 +134,12 @@ public class BrooklynGarbageCollector {
+ "within an execution context (e.g. entity); "
+ "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full",
50);
-
+
+ public static final ConfigKey<Integer> MAX_TASKS_PER_NAME = ConfigKeys.newIntegerConfigKey(
+ "brooklyn.gc.maxTasksPerName",
+ "the maximum number of tasks with the same name kept within an execution context (e.g. entity)",
+ 10);
+
public static final ConfigKey<Integer> MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey(
"brooklyn.gc.maxTasksPerEntity",
"the maximum number of tasks to be kept for a given entity",
@@ -168,7 +174,7 @@ public class BrooklynGarbageCollector {
private Duration gcPeriod;
private volatile boolean running = true;
-
+
public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) {
this.executionManager = executionManager;
this.storage = storage;
@@ -330,8 +336,16 @@ public class BrooklynGarbageCollector {
}
/**
- * Deletes old tasks. The age/number of tasks to keep is controlled by fields like
+ * Deletes old tasks. The age/number of tasks to keep is controlled by fields including
* {@link #MAX_TASKS_PER_TAG} and {@link #MAX_TASKS_PER_TAG}.
+ *
+ * This works by looking at the "entity" tag(s) [context and target], then at the "non-entity" tags (excluding some such as sub-task etc);
+ * any (completed) task which has one or more tag in the category, and where all such tags are over capacity
+ * (with some grace to ignore tasks for which one tag in category is under-capacity), these will get GC'd;
+ * with oldest first. So it will keep up to 1000 tasks for an entity, and limit of up to 50 for each tag,
+ * so eg attaching an 'entityId:effectorName' tag means we keep up to 50 instances of each effector call, provided we don't exceed the 1000 global.
+ *
+ * (It might be nicer to score, based on age and name uniqueness and activity within an entity. But above works pretty well.)
*/
@VisibleForTesting
public synchronized int gcTasks() {
@@ -356,10 +370,13 @@ public class BrooklynGarbageCollector {
expireAgedTasks();
expireTransientTasks();
- // now look at overcapacity tags, non-entity tags first
-
+ // now look at overcapacity tags, names, then non-entity tags first
+
Set<Object> taskTags = executionManager.getTaskTags();
-
+
+ int deletedCount = 0;
+ deletedCount += expireOverCapacityNamesInCategory(taskTags, TagCategory.ENTITY);
+
int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY);
int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG);
@@ -392,10 +409,9 @@ public class BrooklynGarbageCollector {
}
}
- int deletedCount = 0;
deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false);
deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true);
-
+
// if expensive we could optimize task GC here to avoid repeated lookups by
// counting all expired above (not just prev two lines) and skipping if none
// but that seems unlikely
@@ -416,14 +432,15 @@ public class BrooklynGarbageCollector {
protected static boolean isTagIgnoredForGc(Object tag) {
if (tag == null) return true;
+
if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true;
if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true;
if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true;
if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true;
- if (tag instanceof WrappedStream) {
- return true;
- }
-
+
+ if (tag instanceof ManagementContext) return true;
+ if (tag instanceof WrappedStream) return true;
+
return false;
}
@@ -550,7 +567,7 @@ public class BrooklynGarbageCollector {
}
- /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */
+ /** expires tasks which are over-capacity in all their non-entity or entity (target, context) tag categories, returned count */
protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
if (emptyFilterNeeded) {
// previous run may have decremented counts
@@ -629,25 +646,29 @@ public class BrooklynGarbageCollector {
LOG.debug("brooklyn-gc detected " + taskTagsInCategoryOverCapacity.size() + " " + category + " "
+ "tag(s) over capacity, expiring old tasks; "
+ tasksToConsiderDeleting.size() + " tasks under consideration; categories are: "
- + taskTagsInCategoryOverCapacity + "; including " + tasksToConsiderDeleting);
+ + taskTagsInCategoryOverCapacity + "; including " + tasksToLog);
}
// now try deleting tasks which are overcapacity for each (non-entity) tag
- int deleted = 0;
+ Set<Task<?>> deleted = MutableSet.of();
for (Task<?> task: tasksToConsiderDeleting) {
- boolean delete = true;
+ boolean delete = false;
for (Object tag: task.getTags()) {
- if (!category.acceptsTag(tag))
+ if (!category.acceptsTag(tag)) {
+ // ignore this tag, not right for the category
continue;
+ }
if (taskTagsInCategoryOverCapacity.get(tag)==null) {
// no longer over capacity in this tag
delete = false;
break;
}
+ // has at least one tag in the category, and all such tags are overcapacity
+ delete = true;
}
if (delete) {
// delete this and update overcapacity info
- deleted++;
+ deleted.add(task);
executionManager.deleteTask(task);
for (Object tag: task.getTags()) {
AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
@@ -662,9 +683,67 @@ public class BrooklynGarbageCollector {
}
if (LOG.isDebugEnabled())
- LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; "
- + "capacities now: " + taskTagsInCategoryOverCapacity);
- return deleted;
+ LOG.debug("brooklyn-gc deleted "+deleted.size()+" tasks in over-capacity " + category+" tag categories; "
+ + "capacities now: " + taskTagsInCategoryOverCapacity+"; deleted tasks: "+
+ deleted.stream().map(Task::getId).collect(Collectors.joining(",")));
+ return deleted.size();
+ }
+
+ protected int expireOverCapacityNamesInCategory(Set<Object> taskTags, TagCategory category) {
+ List<Object> entityTags = taskTags.stream().filter(tag -> category.acceptsTag(tag)).collect(Collectors.toList());
+ Integer maxPerName = brooklynProperties.getConfig(MAX_TASKS_PER_NAME);
+ if (maxPerName==null || maxPerName<=0) return 0;
+ Set<Task<?>> tasksToDelete = MutableSet.of();
+
+ try {
+ for (Object entityTag: entityTags) {
+ Set<Task<?>> tasks = executionManager.getTasksWithTag(entityTag);
+ Map<String,Set<Task<?>>> tasksByName = MutableMap.of();
+ for (Task<?> task: tasks) {
+ if (!task.isDone(true)) continue;
+ tasksByName.compute(task.getDisplayName(), (key,set) -> {
+ if (set==null) set = MutableSet.of();
+ set.add(task);
+ return set;
+ });
+ }
+
+ List<Entry<String,Set<Task<?>>>> overCapacityNames = tasksByName.entrySet().stream().filter(entry -> entry.getValue().size() > maxPerName).collect(Collectors.toList());
+ if (!overCapacityNames.isEmpty()) {
+ LOG.debug("brooklyn-gc detected tasks exceeding max per-name for entity "+entityTag+"; collecting for deletion: " +
+ overCapacityNames.stream().map(entry -> entry.getKey()+"("+entry.getValue().size()+")").collect(Collectors.joining(", ")));
+ }
+ overCapacityNames.forEach(entry -> {
+ List<Task<?>> list = MutableList.copyOf(entry.getValue());
+ Collections.sort(list, TASKS_NEWEST_FIRST_COMPARATOR);
+ list.stream().skip(maxPerName).forEach(tasksToDelete::add);
+ });
+ }
+
+ } catch (ConcurrentModificationException e) {
+ // do CME's happen with these data structures?
+ // if so, let's just delete what we've found so far
+ LOG.debug("Got CME inspecting tasks by name to delete; ignoring: "+e);
+ }
+
+
+ if (tasksToDelete.isEmpty()) {
+ return 0;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("brooklyn-gc detected "
+ + tasksToDelete.size() + " tasks with exceeding max per-name and will be deleted: "+
+ tasksToDelete.stream().map(Task::getId).collect(Collectors.joining(",")));
+ }
+
+ // now try deleting tasks which are overcapacity for each (non-entity) tag
+ for (Task<?> task: tasksToDelete) {
+ // delete this and update overcapacity info
+ executionManager.deleteTask(task);
+ }
+
+ return tasksToDelete.size();
}
protected int expireIfOverCapacityGlobally() {
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 6168942..8ab3ead 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -18,6 +18,11 @@
*/
package org.apache.brooklyn.core.mgmt.internal;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.brooklyn.core.entity.Dumper;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -146,29 +151,38 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
}});
}
+ static Set<String> SYSTEM_TASK_WORDS = ImmutableSet.of("initialize model", "entity init", "management start");
+
static Set<Task<?>> removeSystemTasks(Iterable<Task<?>> tasks) {
Set<Task<?>> result = MutableSet.of();
for (Task<?> t: tasks) {
if (t instanceof ScheduledTask) continue;
if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue;
- if (t.getDisplayName().contains("Validating")) continue;
+ if (SYSTEM_TASK_WORDS.stream().anyMatch(t.getDisplayName().toLowerCase()::contains)) continue;
result.add(t);
}
return result;
}
// Needed because of https://issues.apache.org/jira/browse/BROOKLYN-401
- protected void assertTaskMaxCountForEntityEventually(final Entity entity, final int expectedMaxCount) {
+ protected void assertNonSystemTaskCountForEntityEventuallyEquals(final Entity entity, final int expectedCount) {
+ assertNonSystemTaskCountForEntityEventuallyIsInRange(entity, expectedCount, expectedCount);
+ }
+
+ protected void assertNonSystemTaskCountForEntityEventuallyIsInRange(final Entity entity, final int expectedMinCount, final int expectedMaxCount) {
// Dead task (and initialization task) should have been GC'd on completion.
// However, the GC'ing happens in a listener, executed in a different thread - the task.get()
// doesn't block for it. Therefore can't always guarantee it will be GC'ed by now.
- Asserts.succeedsEventually(new Runnable() {
+ Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.seconds(3)), new Runnable() {
@Override public void run() {
forceGc();
Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) );
- Assert.assertTrue(tasks.size() <= expectedMaxCount,
- "Expected tasks count max of " + expectedMaxCount + ". Tasks were "+tasks);
+ Assert.assertTrue(tasks.size() >= expectedMinCount && tasks.size() <= expectedMaxCount,
+ "Expected tasks count [" + expectedMinCount+","+expectedMaxCount + "]. Tasks were:\n"+tasks.stream().map(t -> ""+t+": "+t.getTags()+"\n").collect(Collectors.joining()));
}});
+
+ Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) );
+ LOG.info("Expected tasks count [" + expectedMinCount+","+expectedMaxCount + "] satisfied; tasks were:\n"+tasks.stream().map(t -> ""+t+": "+t.getTags()+"\n").collect(Collectors.joining()));
}
public void testGetTasksAndGcBoringTags() throws Exception {
@@ -200,7 +214,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
stopCondition.set(true);
- assertTaskMaxCountForEntityEventually(e, 2);
+ assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2);
}
public void testGcTaskAtEntityLimit() throws Exception {
@@ -224,15 +238,24 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
forceGc();
stopCondition.set(true);
- assertTaskMaxCountForEntityEventually(app, 2);
- assertTaskMaxCountForEntityEventually(e, 2);
+ assertNonSystemTaskCountForEntityEventuallyIsInRange(app, 0, 2);
+ assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2);
+
+ for (int count=0; count<5; count++)
+ runEmptyTaskWithNameAndTags(e, "task-e-"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag");
+ for (int count=0; count<5; count++)
+ runEmptyTaskWithNameAndTags(app, "task-app-"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag");
+
+ forceGc();
+ assertNonSystemTaskCountForEntityEventuallyEquals(app, 2);
+ assertNonSystemTaskCountForEntityEventuallyEquals(e, 2);
}
public void testGcTaskWithTagAndEntityLimit() throws Exception {
TestEntity e = app.createAndManageChild(EntitySpec.create(TestEntity.class));
((BrooklynProperties)app.getManagementContext().getConfig()).put(
- BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 6);
+ BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 8);
((BrooklynProperties)app.getManagementContext().getConfig()).put(
BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2);
@@ -254,13 +277,17 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag", "another-tag-e");
runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag", "another-tag-e");
// should keep both the above
-
+
runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag");
runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag");
+ runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-e");
+ runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-e");
Time.sleep(Duration.ONE_MILLISECOND);
runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag");
- // should keep the below since they have unique tags, but remove one of the e tasks above
+
+ // should keep the below since they have unique tags, plus 4 to 6 of the above, depending which of boring-tags are kept, but might remove 1 of the above
runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag", "and-another-tag");
+
runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-app", "another-tag");
runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-app", "another-tag");
@@ -268,36 +295,54 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
forceGc();
stopCondition.set(true);
- assertTaskMaxCountForEntityEventually(e, 6);
- assertTaskMaxCountForEntityEventually(app, 3);
-
+ assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 4, 7);
+ assertNonSystemTaskCountForEntityEventuallyIsInRange(app, 2, 3);
+
// now with a lowered limit, we should remove one more e
((BrooklynProperties)app.getManagementContext().getConfig()).put(
BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 5);
- assertTaskMaxCountForEntityEventually(e, 5);
+ forceGc();
+ assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 4, 5);
}
public void testGcDynamicTaskAtNormalTagLimit() throws Exception {
+ TestEntity e = doTestGcDynamicTaskAtNormalTagLimit(false);
+ // can go to zero if just one tag, shared by the transient flooding tasks
+ assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2);
+ }
+
+ public void testGcDynamicTaskAtNormalTagLimitWithExtraTag() throws Exception {
+ TestEntity e = doTestGcDynamicTaskAtNormalTagLimit(true);
+ // should keep two of our task-N tasks if that has a unique tag
+ assertNonSystemTaskCountForEntityEventuallyEquals(e, 2);
+ }
+
+ public TestEntity doTestGcDynamicTaskAtNormalTagLimit(boolean addExtraTag) throws Exception {
TestEntity e = app.createAndManageChild(EntitySpec.create(TestEntity.class));
-
- ((BrooklynProperties)app.getManagementContext().getConfig()).put(
- BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2);
+
+ ((BrooklynProperties) app.getManagementContext().getConfig()).put(
+ BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2);
AtomicBoolean stopCondition = new AtomicBoolean();
scheduleRecursiveTemporaryTask(stopCondition, e, "foo");
scheduleRecursiveTemporaryTask(stopCondition, e, "foo");
- for (int count=0; count<5; count++) {
- TaskBuilder<Object> tb = Tasks.builder().displayName("task-"+count).dynamic(true).body(new Runnable() { @Override public void run() {}})
- .tag(ManagementContextInternal.NON_TRANSIENT_TASK_TAG).tag("foo");
- ((EntityInternal)e).getExecutionContext().submit(tb.build()).getUnchecked();
+ for (int count = 0; count < 5; count++) {
+ TaskBuilder<Object> tb = Tasks.builder().displayName("task-" + count).dynamic(true).body(new Runnable() {
+ @Override
+ public void run() {
+ }
+ })
+ .tag(ManagementContextInternal.NON_TRANSIENT_TASK_TAG).tag("foo");
+ if (addExtraTag) tb.tag("bar");
+ ((EntityInternal) e).getExecutionContext().submit(tb.build()).getUnchecked();
}
// Makes sure there's a GC while the transient tasks are running
forceGc();
stopCondition.set(true);
- assertTaskMaxCountForEntityEventually(e, 2);
+ return e;
}
public void testUnmanagedEntityCanBeGcedEvenIfPreviouslyTagged() throws Exception {
@@ -426,7 +471,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
int maxNumTasks = 2;
BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty();
brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.ONE_SECOND);
- brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2);
+ brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, maxNumTasks);
replaceManagementContext(LocalManagementContextForTests.newInstance(brooklynProperties));
setUpApp();
@@ -467,6 +512,102 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
}});
}
+ static class IncrementingCallable implements Callable<Integer> {
+ private final AtomicInteger next = new AtomicInteger(0);
+
+ @Override public Integer call() {
+ return next.getAndIncrement();
+ }
+ }
+
+ @Test(groups={"Integration"})
+ public void testEffectorTasksTwoEntitiesPreferByName() throws Exception {
+ int maxNumTasksPerName = 4;
+ int maxNumTasksPerTag = 5;
+ int maxNumTasksPerEntity = 15; // no more than 5 of each
+ BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty();
+ brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.seconds(3));
+ brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, maxNumTasksPerTag);
+ brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, maxNumTasksPerEntity);
+ brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_NAME, maxNumTasksPerName);
+
+ replaceManagementContext(LocalManagementContextForTests.newInstance(brooklynProperties));
+ setUpApp();
+ final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ final TestEntity entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ List<Task<?>> tasks = Lists.newArrayList();
+
+ Set<Task<?>> storedTasks1 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() );
+ String storedTasks1Str = storedTasks1.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n"));
+ LOG.info("TASKS BEFORE RUN:\n"+storedTasks1Str);
+
+ FunctionFeed feed = FunctionFeed.builder()
+ .entity(entity)
+ .poll(new FunctionPollConfig<Integer, Integer>(TestEntity.SEQUENCE)
+ .period(Duration.millis(20))
+ .callable(new IncrementingCallable())
+ //.onSuccess((Function<Object,Integer>)(Function)Functions.identity()))
+ )
+ .build();
+
+ for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) {
+ Task<?> task = entity.invoke(TestEntity.MY_EFFECTOR, ImmutableMap.<String,Object>of());
+ task.get();
+ tasks.add(task);
+ // see testEffectorTasksGcedForMaxPerTag
+ Thread.sleep(10);
+ }
+
+ for (int i = 0; i < (4*maxNumTasksPerEntity+1); i++) {
+ Task<?> task = entity.invoke(TestEntity.IDENTITY_EFFECTOR, ImmutableMap.<String,Object>of("arg", "id-"+i));
+ task.get();
+ tasks.add(task);
+ entity2.invoke(TestEntity.IDENTITY_EFFECTOR, ImmutableMap.<String,Object>of("arg", "id-"+i));
+ Thread.sleep(10);
+ }
+
+ // and add some context-only (the above have a target entity, so don't interfere with the below):
+
+ for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) {
+ entity.getExecutionContext().submit(
+ Tasks.fail("failure-flood-1", null)).blockUntilEnded();
+ }
+ // normally flood-2 will remove the flood 1
+ for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) {
+ entity.getExecutionContext().submit(
+ Tasks.fail("failure-flood-2", null)).blockUntilEnded();
+ }
+
+ Dumper.dumpInfo(app);
+
+ // Should initially have all tasks
+ feed.stop();
+
+
+ // oldest should be GC'ed to leave only maxNumTasks
+ Set<Task<?>> storedTasks2 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() );
+ String storedTasks2Str = storedTasks2.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n"));
+ LOG.info("TASKS AFTER RUN:\n"+storedTasks2Str);
+
+ ((LocalManagementContext)mgmt).getGarbageCollector().gcIteration();
+
+ Set<Task<?>> storedTasks3 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() );
+ String storedTasks3Str = storedTasks3.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n"));
+ LOG.info("TASKS AFTER GC:\n"+storedTasks3Str);
+
+ assertTrue(!storedTasks3.containsAll(storedTasks2), "some tasks should have been GC'd");
+ assertTrue(storedTasks3.size() <= maxNumTasksPerEntity*2 /* number of TestEntity instances */ *2 /* target and context */ + 10 /* grace for tasks on the app root node */, "too many tasks: "+storedTasks3.size());
+ // and should keep some in each category
+ Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity.getId()+":myEffector")).count(), n -> n==maxNumTasksPerName);
+ Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity.getId()+":identityEffector")).count(), n -> n==maxNumTasksPerName);
+ Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity2.getId()+":identityEffector")).count(), n -> n==maxNumTasksPerName);
+ Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("test.sequence")).count(), n -> n>0 && n<=maxNumTasksPerName + 3); // might be still running
+ Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("failure-flood-1")).count(), n -> n==maxNumTasksPerName);
+ Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("failure-flood-2")).count(), n -> n==maxNumTasksPerName);
+ }
+
+
private String taskToVerboseString(Task<?> t) {
return MoreObjects.toStringHelper(t)
.add("id", t.getId())