You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2021/10/25 12:28:02 UTC

[brooklyn-server] 11/15: improve task garbage collection, using task name also

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 84410a8275f60701347fdd3e5dcef44d72d93104
Author: Alex Heneveld <al...@cloudsoftcorp.com>
AuthorDate: Thu Oct 21 21:23:48 2021 +0100

    improve task garbage collection, using task name also
    
    previously it looked only in task tags to determine which tasks to remember after completion;
    now it also looks at the task's _name_ and only keeps 10 tasks per entity with a given name.
    useful for scheduled tasks where otherwise we might keep lots, if they don't have distinguishing tags.
---
 .../mgmt/internal/BrooklynGarbageCollector.java    | 121 ++++++++++---
 .../mgmt/internal/EntityExecutionManagerTest.java  | 189 ++++++++++++++++++---
 2 files changed, 265 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
index f780b5c..5c6f2b7 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.mgmt.HasTaskChildren;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
@@ -133,7 +134,12 @@ public class BrooklynGarbageCollector {
         + "within an execution context (e.g. entity); "
         + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full",
         50);
-    
+
+    public static final ConfigKey<Integer> MAX_TASKS_PER_NAME = ConfigKeys.newIntegerConfigKey(
+            "brooklyn.gc.maxTasksPerName",
+            "the maximum number of tasks with the same name kept within an execution context (e.g. entity)",
+            10);
+
     public static final ConfigKey<Integer> MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey(
         "brooklyn.gc.maxTasksPerEntity", 
         "the maximum number of tasks to be kept for a given entity",
@@ -168,7 +174,7 @@ public class BrooklynGarbageCollector {
     
     private Duration gcPeriod;
     private volatile boolean running = true;
-    
+
     public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) {
         this.executionManager = executionManager;
         this.storage = storage;
@@ -330,8 +336,16 @@ public class BrooklynGarbageCollector {
     }
 
     /**
-     * Deletes old tasks. The age/number of tasks to keep is controlled by fields like 
+     * Deletes old tasks. The age/number of tasks to keep is controlled by fields including
      * {@link #MAX_TASKS_PER_TAG} and {@link #MAX_TASKS_PER_TAG}.
+     *
+     * This works by looking at the "entity" tag(s) [context and target], then at the "non-entity" tags (excluding some such as sub-task etc);
+     * any (completed) task which has one or more tag in the category, and where all such tags are over capacity
+     * (with some grace to ignore tasks for which one tag in category is under-capacity), these will get GC'd;
+     * with oldest first.  So it will keep up to 1000 tasks for an entity, and limit of up to 50 for each tag,
+     * so eg attaching an 'entityId:effectorName' tag means we keep up to 50 instances of each effector call, provided we don't exceed the 1000 global.
+     *
+     * (It might be nicer to score, based on age and name uniqueness and activity within an entity. But above works pretty well.)
      */
     @VisibleForTesting
     public synchronized int gcTasks() {
@@ -356,10 +370,13 @@ public class BrooklynGarbageCollector {
         expireAgedTasks();
         expireTransientTasks();
         
-        // now look at overcapacity tags, non-entity tags first
-        
+        // now look at overcapacity tags, names, then non-entity tags first
+
         Set<Object> taskTags = executionManager.getTaskTags();
-        
+
+        int deletedCount = 0;
+        deletedCount += expireOverCapacityNamesInCategory(taskTags, TagCategory.ENTITY);
+
         int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY);
         int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG);
         
@@ -392,10 +409,9 @@ public class BrooklynGarbageCollector {
             }
         }
         
-        int deletedCount = 0;
         deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false);
         deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true);
-        
+
         // if expensive we could optimize task GC here to avoid repeated lookups by
         // counting all expired above (not just prev two lines) and skipping if none
         // but that seems unlikely
@@ -416,14 +432,15 @@ public class BrooklynGarbageCollector {
 
     protected static boolean isTagIgnoredForGc(Object tag) {
         if (tag == null) return true;
+
         if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true;
         if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true;
         if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true;
         if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true;
-        if (tag instanceof WrappedStream) {
-            return true;
-        }
-        
+
+        if (tag instanceof ManagementContext) return true;
+        if (tag instanceof WrappedStream) return true;
+
         return false;
     }
     
@@ -550,7 +567,7 @@ public class BrooklynGarbageCollector {
     } 
 
 
-    /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */
+    /** expires tasks which are over-capacity in all their non-entity or entity (target, context) tag categories, returned count */
     protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
         if (emptyFilterNeeded) {
             // previous run may have decremented counts  
@@ -629,25 +646,29 @@ public class BrooklynGarbageCollector {
             LOG.debug("brooklyn-gc detected " + taskTagsInCategoryOverCapacity.size() + " " + category + " "
                     + "tag(s) over capacity, expiring old tasks; "
                     + tasksToConsiderDeleting.size() + " tasks under consideration; categories are: "
-                    + taskTagsInCategoryOverCapacity + "; including " + tasksToConsiderDeleting);
+                    + taskTagsInCategoryOverCapacity + "; including " + tasksToLog);
         }
 
         // now try deleting tasks which are overcapacity for each (non-entity) tag
-        int deleted = 0;
+        Set<Task<?>> deleted = MutableSet.of();
         for (Task<?> task: tasksToConsiderDeleting) {
-            boolean delete = true;
+            boolean delete = false;
             for (Object tag: task.getTags()) {
-                if (!category.acceptsTag(tag))
+                if (!category.acceptsTag(tag)) {
+                    // ignore this tag, not right for the category
                     continue;
+                }
                 if (taskTagsInCategoryOverCapacity.get(tag)==null) {
                     // no longer over capacity in this tag
                     delete = false;
                     break;
                 }
+                // has at least one tag in the category, and all such tags are overcapacity
+                delete = true;
             }
             if (delete) {
                 // delete this and update overcapacity info
-                deleted++;
+                deleted.add(task);
                 executionManager.deleteTask(task);
                 for (Object tag: task.getTags()) {
                     AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
@@ -662,9 +683,67 @@ public class BrooklynGarbageCollector {
         }
 
         if (LOG.isDebugEnabled())
-            LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; "
-                    + "capacities now: " + taskTagsInCategoryOverCapacity);
-        return deleted;
+            LOG.debug("brooklyn-gc deleted "+deleted.size()+" tasks in over-capacity " + category+" tag categories; "
+                    + "capacities now: " + taskTagsInCategoryOverCapacity+"; deleted tasks: "+
+                    deleted.stream().map(Task::getId).collect(Collectors.joining(",")));
+        return deleted.size();
+    }
+
+    protected int expireOverCapacityNamesInCategory(Set<Object> taskTags, TagCategory category) {
+        List<Object> entityTags = taskTags.stream().filter(tag -> category.acceptsTag(tag)).collect(Collectors.toList());
+        Integer maxPerName = brooklynProperties.getConfig(MAX_TASKS_PER_NAME);
+        if (maxPerName==null || maxPerName<=0) return 0;
+        Set<Task<?>> tasksToDelete = MutableSet.of();
+
+        try {
+            for (Object entityTag: entityTags) {
+                Set<Task<?>> tasks = executionManager.getTasksWithTag(entityTag);
+                Map<String,Set<Task<?>>> tasksByName = MutableMap.of();
+                for (Task<?> task: tasks) {
+                    if (!task.isDone(true)) continue;
+                    tasksByName.compute(task.getDisplayName(), (key,set) -> {
+                        if (set==null) set = MutableSet.of();
+                        set.add(task);
+                        return set;
+                    });
+                }
+
+                List<Entry<String,Set<Task<?>>>> overCapacityNames = tasksByName.entrySet().stream().filter(entry -> entry.getValue().size() > maxPerName).collect(Collectors.toList());
+                if (!overCapacityNames.isEmpty()) {
+                    LOG.debug("brooklyn-gc detected tasks exceeding max per-name for entity "+entityTag+"; collecting for deletion: " +
+                            overCapacityNames.stream().map(entry -> entry.getKey()+"("+entry.getValue().size()+")").collect(Collectors.joining(", ")));
+                }
+                overCapacityNames.forEach(entry -> {
+                    List<Task<?>> list = MutableList.copyOf(entry.getValue());
+                    Collections.sort(list, TASKS_NEWEST_FIRST_COMPARATOR);
+                    list.stream().skip(maxPerName).forEach(tasksToDelete::add);
+                });
+            }
+
+        } catch (ConcurrentModificationException e) {
+            // do CME's happen with these data structures?
+            // if so, let's just delete what we've found so far
+            LOG.debug("Got CME inspecting tasks by name to delete; ignoring: "+e);
+        }
+
+
+        if (tasksToDelete.isEmpty()) {
+            return 0;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("brooklyn-gc detected "
+                    + tasksToDelete.size() + " tasks with exceeding max per-name and will be deleted: "+
+                    tasksToDelete.stream().map(Task::getId).collect(Collectors.joining(",")));
+        }
+
+        // now try deleting tasks which are overcapacity for each (non-entity) tag
+        for (Task<?> task: tasksToDelete) {
+            // delete this and update overcapacity info
+            executionManager.deleteTask(task);
+        }
+
+        return tasksToDelete.size();
     }
 
     protected int expireIfOverCapacityGlobally() {
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 6168942..8ab3ead 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -18,6 +18,11 @@
  */
 package org.apache.brooklyn.core.mgmt.internal;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.brooklyn.core.entity.Dumper;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -146,29 +151,38 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
             }});
     }
 
+    static Set<String> SYSTEM_TASK_WORDS = ImmutableSet.of("initialize model", "entity init", "management start");
+
     static Set<Task<?>> removeSystemTasks(Iterable<Task<?>> tasks) {
         Set<Task<?>> result = MutableSet.of();
         for (Task<?> t: tasks) {
             if (t instanceof ScheduledTask) continue;
             if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue;
-            if (t.getDisplayName().contains("Validating")) continue;
+            if (SYSTEM_TASK_WORDS.stream().anyMatch(t.getDisplayName().toLowerCase()::contains)) continue;
             result.add(t);
         }
         return result;
     }
 
     // Needed because of https://issues.apache.org/jira/browse/BROOKLYN-401
-    protected void assertTaskMaxCountForEntityEventually(final Entity entity, final int expectedMaxCount) {
+    protected void assertNonSystemTaskCountForEntityEventuallyEquals(final Entity entity, final int expectedCount) {
+        assertNonSystemTaskCountForEntityEventuallyIsInRange(entity, expectedCount, expectedCount);
+    }
+
+    protected void assertNonSystemTaskCountForEntityEventuallyIsInRange(final Entity entity, final int expectedMinCount, final int expectedMaxCount) {
         // Dead task (and initialization task) should have been GC'd on completion.
         // However, the GC'ing happens in a listener, executed in a different thread - the task.get()
         // doesn't block for it. Therefore can't always guarantee it will be GC'ed by now.
-        Asserts.succeedsEventually(new Runnable() {
+        Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.seconds(3)), new Runnable() {
             @Override public void run() {
                 forceGc();
                 Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) );
-                Assert.assertTrue(tasks.size() <= expectedMaxCount,
-                        "Expected tasks count max of " + expectedMaxCount + ". Tasks were "+tasks);
+                Assert.assertTrue(tasks.size() >= expectedMinCount && tasks.size() <= expectedMaxCount,
+                        "Expected tasks count [" + expectedMinCount+","+expectedMaxCount + "]. Tasks were:\n"+tasks.stream().map(t -> ""+t+": "+t.getTags()+"\n").collect(Collectors.joining()));
             }});
+
+        Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) );
+        LOG.info("Expected tasks count [" + expectedMinCount+","+expectedMaxCount + "] satisfied; tasks were:\n"+tasks.stream().map(t -> ""+t+": "+t.getTags()+"\n").collect(Collectors.joining()));
     }
 
     public void testGetTasksAndGcBoringTags() throws Exception {
@@ -200,7 +214,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
 
         stopCondition.set(true);
 
-        assertTaskMaxCountForEntityEventually(e, 2);
+        assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2);
     }
 
     public void testGcTaskAtEntityLimit() throws Exception {
@@ -224,15 +238,24 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         forceGc();
         stopCondition.set(true);
 
-        assertTaskMaxCountForEntityEventually(app, 2);
-        assertTaskMaxCountForEntityEventually(e, 2);
+        assertNonSystemTaskCountForEntityEventuallyIsInRange(app, 0, 2);
+        assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2);
+
+        for (int count=0; count<5; count++)
+            runEmptyTaskWithNameAndTags(e, "task-e-"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag");
+        for (int count=0; count<5; count++)
+            runEmptyTaskWithNameAndTags(app, "task-app-"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag");
+
+        forceGc();
+        assertNonSystemTaskCountForEntityEventuallyEquals(app, 2);
+        assertNonSystemTaskCountForEntityEventuallyEquals(e, 2);
     }
 
     public void testGcTaskWithTagAndEntityLimit() throws Exception {
         TestEntity e = app.createAndManageChild(EntitySpec.create(TestEntity.class));
         
         ((BrooklynProperties)app.getManagementContext().getConfig()).put(
-            BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 6);
+            BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 8);
         ((BrooklynProperties)app.getManagementContext().getConfig()).put(
             BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2);
 
@@ -254,13 +277,17 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag", "another-tag-e");
         runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag", "another-tag-e");
         // should keep both the above
-        
+
         runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag");
         runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag");
+        runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-e");
+        runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-e");
         Time.sleep(Duration.ONE_MILLISECOND);
         runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag");
-        // should keep the below since they have unique tags, but remove one of the e tasks above 
+
+        // should keep the below since they have unique tags, plus 4 to 6 of the above, depending which of boring-tags are kept, but might remove 1 of the above
         runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag", "and-another-tag");
+
         runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-app", "another-tag");
         runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-app", "another-tag");
         
@@ -268,36 +295,54 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         forceGc();
         stopCondition.set(true);
 
-        assertTaskMaxCountForEntityEventually(e, 6);
-        assertTaskMaxCountForEntityEventually(app, 3);
-        
+        assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 4, 7);
+        assertNonSystemTaskCountForEntityEventuallyIsInRange(app, 2, 3);
+
         // now with a lowered limit, we should remove one more e
         ((BrooklynProperties)app.getManagementContext().getConfig()).put(
             BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 5);
-        assertTaskMaxCountForEntityEventually(e, 5);
+        forceGc();
+        assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 4, 5);
     }
 
     public void testGcDynamicTaskAtNormalTagLimit() throws Exception {
+        TestEntity e = doTestGcDynamicTaskAtNormalTagLimit(false);
+        // can go to zero if just one tag, shared by the transient flooding tasks
+        assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2);
+    }
+
+    public void testGcDynamicTaskAtNormalTagLimitWithExtraTag() throws Exception {
+        TestEntity e = doTestGcDynamicTaskAtNormalTagLimit(true);
+        // should keep two of our task-N tasks if that has a unique tag
+        assertNonSystemTaskCountForEntityEventuallyEquals(e, 2);
+    }
+
+    public TestEntity doTestGcDynamicTaskAtNormalTagLimit(boolean addExtraTag) throws Exception {
         TestEntity e = app.createAndManageChild(EntitySpec.create(TestEntity.class));
-        
-        ((BrooklynProperties)app.getManagementContext().getConfig()).put(
-            BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2);
+
+        ((BrooklynProperties) app.getManagementContext().getConfig()).put(
+                BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2);
 
         AtomicBoolean stopCondition = new AtomicBoolean();
         scheduleRecursiveTemporaryTask(stopCondition, e, "foo");
         scheduleRecursiveTemporaryTask(stopCondition, e, "foo");
 
-        for (int count=0; count<5; count++) {
-            TaskBuilder<Object> tb = Tasks.builder().displayName("task-"+count).dynamic(true).body(new Runnable() { @Override public void run() {}})
-                .tag(ManagementContextInternal.NON_TRANSIENT_TASK_TAG).tag("foo");
-            ((EntityInternal)e).getExecutionContext().submit(tb.build()).getUnchecked();
+        for (int count = 0; count < 5; count++) {
+            TaskBuilder<Object> tb = Tasks.builder().displayName("task-" + count).dynamic(true).body(new Runnable() {
+                        @Override
+                        public void run() {
+                        }
+                    })
+                    .tag(ManagementContextInternal.NON_TRANSIENT_TASK_TAG).tag("foo");
+            if (addExtraTag) tb.tag("bar");
+            ((EntityInternal) e).getExecutionContext().submit(tb.build()).getUnchecked();
         }
 
         // Makes sure there's a GC while the transient tasks are running
         forceGc();
         stopCondition.set(true);
 
-        assertTaskMaxCountForEntityEventually(e, 2);
+        return e;
     }
 
     public void testUnmanagedEntityCanBeGcedEvenIfPreviouslyTagged() throws Exception {
@@ -426,7 +471,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         int maxNumTasks = 2;
         BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty();
         brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.ONE_SECOND);
-        brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2);
+        brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, maxNumTasks);
 
         replaceManagementContext(LocalManagementContextForTests.newInstance(brooklynProperties));
         setUpApp();
@@ -467,6 +512,102 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
             }});
     }
 
+    static class IncrementingCallable implements Callable<Integer> {
+        private final AtomicInteger next = new AtomicInteger(0);
+
+        @Override public Integer call() {
+            return next.getAndIncrement();
+        }
+    }
+
+    @Test(groups={"Integration"})
+    public void testEffectorTasksTwoEntitiesPreferByName() throws Exception {
+        int maxNumTasksPerName = 4;
+        int maxNumTasksPerTag = 5;
+        int maxNumTasksPerEntity = 15;  // no more than 5 of each
+        BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty();
+        brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.seconds(3));
+        brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, maxNumTasksPerTag);
+        brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, maxNumTasksPerEntity);
+        brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_NAME, maxNumTasksPerName);
+
+        replaceManagementContext(LocalManagementContextForTests.newInstance(brooklynProperties));
+        setUpApp();
+        final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        final TestEntity entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+        List<Task<?>> tasks = Lists.newArrayList();
+
+        Set<Task<?>> storedTasks1 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() );
+        String storedTasks1Str = storedTasks1.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n"));
+        LOG.info("TASKS BEFORE RUN:\n"+storedTasks1Str);
+
+        FunctionFeed feed = FunctionFeed.builder()
+                .entity(entity)
+                .poll(new FunctionPollConfig<Integer, Integer>(TestEntity.SEQUENCE)
+                                .period(Duration.millis(20))
+                                .callable(new IncrementingCallable())
+                        //.onSuccess((Function<Object,Integer>)(Function)Functions.identity()))
+                )
+                .build();
+
+        for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) {
+            Task<?> task = entity.invoke(TestEntity.MY_EFFECTOR, ImmutableMap.<String,Object>of());
+            task.get();
+            tasks.add(task);
+            // see testEffectorTasksGcedForMaxPerTag
+            Thread.sleep(10);
+        }
+
+        for (int i = 0; i < (4*maxNumTasksPerEntity+1); i++) {
+            Task<?> task = entity.invoke(TestEntity.IDENTITY_EFFECTOR, ImmutableMap.<String,Object>of("arg", "id-"+i));
+            task.get();
+            tasks.add(task);
+            entity2.invoke(TestEntity.IDENTITY_EFFECTOR, ImmutableMap.<String,Object>of("arg", "id-"+i));
+            Thread.sleep(10);
+        }
+
+        // and add some context-only (the above have a target entity, so don't interfere with the below):
+
+        for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) {
+            entity.getExecutionContext().submit(
+                    Tasks.fail("failure-flood-1", null)).blockUntilEnded();
+        }
+        // normally flood-2 will remove the flood 1
+        for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) {
+            entity.getExecutionContext().submit(
+                    Tasks.fail("failure-flood-2", null)).blockUntilEnded();
+        }
+
+        Dumper.dumpInfo(app);
+
+        // Should initially have all tasks
+        feed.stop();
+
+
+        // oldest should be GC'ed to leave only maxNumTasks
+        Set<Task<?>> storedTasks2 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() );
+        String storedTasks2Str = storedTasks2.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n"));
+        LOG.info("TASKS AFTER RUN:\n"+storedTasks2Str);
+
+        ((LocalManagementContext)mgmt).getGarbageCollector().gcIteration();
+
+        Set<Task<?>> storedTasks3 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() );
+        String storedTasks3Str = storedTasks3.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n"));
+        LOG.info("TASKS AFTER GC:\n"+storedTasks3Str);
+
+        assertTrue(!storedTasks3.containsAll(storedTasks2), "some tasks should have been GC'd");
+        assertTrue(storedTasks3.size() <= maxNumTasksPerEntity*2 /* number of TestEntity instances */ *2 /* target and context */ + 10 /* grace for tasks on the app root node */, "too many tasks: "+storedTasks3.size());
+        // and should keep some in each category
+        Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity.getId()+":myEffector")).count(), n -> n==maxNumTasksPerName);
+        Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity.getId()+":identityEffector")).count(), n -> n==maxNumTasksPerName);
+        Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity2.getId()+":identityEffector")).count(), n -> n==maxNumTasksPerName);
+        Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("test.sequence")).count(), n -> n>0 && n<=maxNumTasksPerName + 3);  // might be still running
+        Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("failure-flood-1")).count(), n -> n==maxNumTasksPerName);
+        Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("failure-flood-2")).count(), n -> n==maxNumTasksPerName);
+    }
+
+
     private String taskToVerboseString(Task<?> t) {
         return MoreObjects.toStringHelper(t)
                 .add("id", t.getId())