You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/16 15:53:03 UTC

[brooklyn-server] branch master updated (3c48af09f3 -> 86e75d683e)

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git


    from 3c48af09f3 give public constructors for Ssh*{Effector,Sensor} so they can be used as beans
     new cf5937fe30 prevent GC from looping waiting for non-deleteable tasks, and more efficient
     new 19fac6b1eb run container tasks in background and as transient
     new fbd9cb88e7 move AppGroupTraverser to core project for more access
     new 86e75d683e allow common feeds to be triggered from sensors

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camp/brooklyn/spi/dsl/AppGroupTraverser.java   | 109 +----------
 .../brooklyn/spi/dsl/methods/DslComponent.java     |   2 +-
 .../brooklyn/spi/dsl/AppGroupTraverserTest.java    |   1 +
 .../apache/brooklyn/core/feed/AbstractFeed.java    |   7 +-
 .../brooklyn/core/feed/AttributePollHandler.java   |   2 +-
 .../org/apache/brooklyn/core/feed/PollConfig.java  |  19 +-
 .../java/org/apache/brooklyn/core/feed/Poller.java | 118 +++++++++---
 .../core/mgmt/internal}/AppGroupTraverser.java     |   4 +-
 .../mgmt/internal/BrooklynGarbageCollector.java    |  41 +++-
 .../core/sensor/AbstractAddSensorFeed.java         |  17 ++
 .../core/sensor/AbstractAddTriggerableSensor.java  | 213 +++++++++++++++++++++
 .../core/sensor/http/HttpRequestSensor.java        |  11 +-
 .../brooklyn/core/sensor/ssh/SshCommandSensor.java |  16 +-
 .../brooklyn/entity/group/DynamicClusterImpl.java  |   1 +
 .../entity/group/DynamicMultiGroupImpl.java        |   1 +
 .../apache/brooklyn/feed/AbstractCommandFeed.java  |  16 +-
 .../brooklyn/feed/function/FunctionFeed.java       |  18 +-
 .../brooklyn/feed/function/FunctionPollConfig.java |  10 +-
 .../org/apache/brooklyn/feed/http/HttpFeed.java    |   7 +-
 .../util/core/task/BasicExecutionManager.java      |  90 ++++++---
 .../org/apache/brooklyn/util/core/task/Tasks.java  |  15 +-
 .../entity/software/base/SoftwareProcessImpl.java  |   1 +
 .../brooklyn/tasks/kubectl/ContainerCommons.java   |   4 +
 .../brooklyn/tasks/kubectl/ContainerSensor.java    |  62 +++---
 .../tasks/kubectl/ContainerTaskFactory.java        | 105 +++++-----
 .../tasks/kubectl/ContainerSensorTest.java         |  33 ++++
 .../core/sensor/windows/WinRmCommandSensor.java    |  14 +-
 27 files changed, 643 insertions(+), 294 deletions(-)
 copy {camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl => core/src/main/java/org/apache/brooklyn/core/mgmt/internal}/AppGroupTraverser.java (96%)
 create mode 100644 core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java


[brooklyn-server] 01/04: prevent GC from looping waiting for non-deleteable tasks, and more efficient

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit cf5937fe3031ecd0a4d877fc31ea8b94aa3f4f3c
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Aug 16 11:50:51 2022 +0100

    prevent GC from looping waiting for non-deleteable tasks, and more efficient
    
    by looking up tasks that won't be deleteable anyway
---
 .../mgmt/internal/BrooklynGarbageCollector.java    | 41 ++++++++--
 .../util/core/task/BasicExecutionManager.java      | 90 ++++++++++++++--------
 .../org/apache/brooklyn/util/core/task/Tasks.java  | 15 +++-
 3 files changed, 107 insertions(+), 39 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
index f66823b174..5c3b60dc79 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
@@ -308,6 +308,10 @@ public class BrooklynGarbageCollector {
         if (!task.isDone(true)) {
             return false;
         }
+        if (Tasks.isChildOfSubmitter(task, executionManager::getTask)) {
+            // we don't delete children until the parent is deleted; short-circuit the consideration of this
+            return false;
+        }
         
         Set<Object> tags = BrooklynTaskTags.getTagsFast(task);
         if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG))
@@ -418,16 +422,35 @@ public class BrooklynGarbageCollector {
         int deletedHere = 0;
         while ((deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()) > 0) {
             // delete in loop so we don't have descendants sticking around until deleted in later cycles
-            deletedCount += deletedHere; 
+            deletedCount += deletedHere;
+            if (LOG.isTraceEnabled()) LOG.trace("GC history loop deleted "+deletedHere+" this time, count now "+deletedCount);
         }
-        
+
         deletedHere = expireIfOverCapacityGlobally();
+        if (LOG.isTraceEnabled()) LOG.trace("GC history capacity deleted "+deletedHere);
         deletedCount += deletedHere;
         while (deletedHere > 0) {
-            deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()); 
+            deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion());
+            if (LOG.isTraceEnabled()) LOG.trace("GC history post=-capacity-deletion loop deleted "+deletedHere+" this time, count now "+deletedCount);
         }
-        
+
         return deletedCount;
+
+        // or not to run in a loop
+//        int deletedHere = expireHistoricTasksNowReadyForImmediateDeletion();
+//        if (LOG.isTraceEnabled()) LOG.trace("GC history loop deleted "+deletedHere+" this time, deletion count now "+deletedCount);
+//        deletedCount += deletedHere;
+//
+//        deletedHere = expireIfOverCapacityGlobally();
+//        if (LOG.isTraceEnabled()) LOG.trace("GC history capacity deleted "+deletedHere);
+//        deletedCount += deletedHere;
+//        if (deletedHere > 0) {
+//            deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion());
+//            if (LOG.isTraceEnabled()) LOG.trace("GC history post-capacity deleted "+deletedHere+" this time, count now "+deletedCount);
+//        }
+//
+//        return deletedCount;
+
     }
 
     protected static boolean isTagIgnoredForGc(Object tag) {
@@ -506,6 +529,7 @@ public class BrooklynGarbageCollector {
         Collection<Task<?>> allTasks = executionManager.getAllTasks();
         Collection<Task<?>> tasksToDelete = MutableList.of();
         try {
+            if (LOG.isTraceEnabled()) LOG.trace("GC history scanning "+allTasks.size()+" tasks");
             for (Task<?> task: allTasks) {
                 if (!shouldDeleteTaskImmediately(task)) {
                     // 2017-09 previously we only checked done and submitter expired, and deleted if both were true
@@ -522,11 +546,14 @@ public class BrooklynGarbageCollector {
             // delete what we've found so far
             LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e);
         }
-        
+
+        if (LOG.isTraceEnabled()) LOG.trace("GC history scanned "+allTasks.size()+" tasks, found "+tasksToDelete.size()+" for deletion, now deleting");
+        int deletionCount = 0;
         for (Task<?> task: tasksToDelete) {
-            executionManager.deleteTask(task);
+            if (executionManager.deleteTask(task)) deletionCount++;
         }
-        return tasksToDelete.size();
+        if (LOG.isTraceEnabled()) LOG.trace("GC history scanned "+allTasks.size()+" tasks, "+tasksToDelete.size()+" proposed for deletion, actually deleted "+deletionCount);
+        return deletionCount;
     }
     
     private boolean isAssociatedToActiveEntity(Task<?> task) {
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 504c7f143e..005ba07336 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -418,14 +418,15 @@ public class BasicExecutionManager implements ExecutionManager {
         return false;
     }
 
-    public void deleteTask(Task<?> task) {
-        deleteTask(task, true);
+    public boolean deleteTask(Task<?> task) {
+        return deleteTask(task, true);
     }
     /** removes all exec manager records of a task, except, if second argument is true (usually is) keep the pointer to ID
-     * if its submitter is a parent with an active record to this child */
-    public void deleteTask(Task<?> task, boolean keepByIdIfParentPresentById) {
+     * if its submitter is a parent with an active record to this child.
+     * returns true if completely deleted (false if not deleted, or deleted in byTags map but kept due to parent ID) */
+    public boolean deleteTask(Task<?> task, boolean keepByIdIfParentPresentById) {
         Boolean removed = deleteTaskNonRecursive(task, keepByIdIfParentPresentById);
-        if (Boolean.FALSE.equals(removed)) return;
+        if (!Boolean.TRUE.equals(removed)) return false;
 
         if (task instanceof HasTaskChildren) {
             List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren) task).getChildren());
@@ -433,6 +434,7 @@ public class BasicExecutionManager implements ExecutionManager {
                 deleteTask(child, keepByIdIfParentPresentById);
             }
         }
+        return true;
     }
 
     protected Boolean deleteTaskNonRecursive(Task<?> task) {
@@ -446,57 +448,56 @@ public class BasicExecutionManager implements ExecutionManager {
          in this case it will of course get deleted when its parent/ancestor is deleted. */
     protected Boolean deleteTaskNonRecursive(Task<?> task, boolean keepByIdIfParentPresentById) {
         Set<?> tags = TaskTags.getTagsFast(checkNotNull(task, "task"));
+        int removedByTagCount = 0;
         for (Object tag : tags) {
             synchronized (tasksByTag) {
                 Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag);
                 if (tasks != null) {
-                    tasks.remove(task);
-                    if (tasks.isEmpty()) {
-                        tasksByTag.remove(tag);
+                    if (tasks.remove(task)) {
+                        removedByTagCount++;
+                        if (tasks.isEmpty()) {
+                            tasksByTag.remove(tag);
+                        }
                     }
                 }
             }
         }
+        int removedByTagMissingCount = tags.size() - removedByTagCount;
 
-        boolean removeById = true;
+        boolean removeById;
         if (keepByIdIfParentPresentById) {
-            String submittedById = task.getSubmittedByTaskId();
-            if (submittedById!=null) {
-                Task<?> submittedBy = tasksById.get(submittedById);
-                if (submittedBy != null && submittedBy instanceof HasTaskChildren) {
-                    if (Iterables.contains(((HasTaskChildren) submittedBy).getChildren(), task)) {
-                        removeById = false;
-                    }
-                }
-            }
+            removeById = !Tasks.isChildOfSubmitter(task, tasksById::get);
+        } else {
+            removeById = true;
         }
+
         Boolean result;
-        Task<?> removed;
+        Task<?> removedById;
         if (removeById) {
-            removed = tasksById.remove(task.getId());
-            result = removed != null;
+            removedById = tasksById.remove(task.getId());
+            result = removedById != null;
         } else {
-            removed = null;
+            removedById = null;
             result = null;
         }
 
-        incompleteTaskIds.remove(task.getId());
-        if (removed != null && removed.isSubmitted() && !removed.isDone(true)) {
-            Entity context = BrooklynTaskTags.getContextEntity(removed);
+        boolean removedIncompleteById = incompleteTaskIds.remove(task.getId());
+        if (removedById != null && removedById.isSubmitted() && !removedById.isDone(true)) {
+            Entity context = BrooklynTaskTags.getContextEntity(removedById);
             if (context != null && !Entities.isManaged(context)) {
-                log.debug("Deleting active task on unmanagement of " + context + ": " + removed);
+                log.debug("Deleting active task on unmanagement of " + context + ": " + removedById);
             } else {
-                boolean debugOnly = removed.isDone();
+                boolean debugOnly = removedById.isDone();
 
                 if (debugOnly) {
-                    log.debug("Deleting cancelled task before completion: " + removed + "; this task will continue to run in the background outwith " + this);
+                    log.debug("Deleting cancelled task before completion: " + removedById + "; this task will continue to run in the background outwith " + this);
                 } else {
-                    log.warn("Deleting submitted task before completion: " + removed + " (tags " + removed.getTags() + "); this task will continue to run in the background outwith " + this + ", but perhaps it should have been cancelled?");
+                    log.warn("Deleting submitted task before completion: " + removedById + " (tags " + removedById.getTags() + "); this task will continue to run in the background outwith " + this + ", but perhaps it should have been cancelled?");
                     log.debug("Active task deletion trace", new Throwable("Active task deletion trace"));
                 }
             }
         }
-        if (removed != null) {
+        if (removedById != null) {
             task.getTags().forEach(t -> {
                 // remove tags which might have references to entities etc (help out garbage collector)
                 if (t instanceof TaskInternal) {
@@ -505,6 +506,32 @@ public class BasicExecutionManager implements ExecutionManager {
                 }
             });
         }
+
+        // if tag deletion is problematic we can use this logic to investigate (but currently there is no reason to think it is, i was just checking)
+//        if ((!removeById || Boolean.TRUE.equals(result)) && removedByTagMissingCount==0) {
+//            // cleanly deleted, or not deleted if so requested
+//            return true;
+//        }
+//        if ((removeById && !Boolean.TRUE.equals(result)) && removedByTagCount==0) {
+//            // not deleted at all
+//            return false;
+//        }
+//        // incomplete deletion detected
+//        log.warn("Incomplete deletion for "+task+"; removeById="+removeById+", removedById="+removedById+", removedIncompleteById="+removedIncompleteById+", removedByTagCount="+removedByTagCount+", removedByTagMissingCount="+removedByTagMissingCount);
+//        // if tag deletion is not working, investigate each task (don't rely on hashes)
+//        synchronized (tasksByTag) {
+//            Set<Object> tagsToDelete = MutableSet.of();
+//            tasksByTag.forEach( (tag, tasks) -> {
+//                if (tasks.removeIf(task::equals)) {
+//                    log.warn("Special deletion needed for tag "+tag+" on task "+task);
+//                    if (tasks.isEmpty()) {
+//                        tagsToDelete.add(tag);
+//                    }
+//                }
+//            });
+//            tagsToDelete.forEach(t -> tasksByTag.remove(t));
+//        }
+
         return result;
     }
 
@@ -557,7 +584,8 @@ public class BasicExecutionManager implements ExecutionManager {
      * exposes live view, for internal use only
      */
     @Beta
-    public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) {
+    public Set<Task<?>>
+    tasksWithTagLiveOrNull(Object tag) {
         synchronized (tasksByTag) {
             return tasksByTag.get(tag);
         }
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
index 74fd919a05..ac4b8c60c6 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
@@ -557,7 +557,20 @@ public class Tasks {
             Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD);
         }
     }
-    
+
+    public static boolean isChildOfSubmitter(Task<?> task, Function<String,Task> parentLookupFunction) {
+        String submittedById = task.getSubmittedByTaskId();
+        if (submittedById!=null) {
+            Task<?> submittedBy = parentLookupFunction.apply(submittedById);
+            if (submittedBy != null && submittedBy instanceof HasTaskChildren) {
+                if (Iterables.contains(((HasTaskChildren) submittedBy).getChildren(), task)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     /** returns true if either the current thread or the current task is interrupted/cancelled */
     public static boolean isInterrupted() {
         if (Thread.currentThread().isInterrupted()) return true;


[brooklyn-server] 04/04: allow common feeds to be triggered from sensors

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 86e75d683ec25c505ce63ae270b02f5693d93e59
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Aug 16 12:05:04 2022 +0100

    allow common feeds to be triggered from sensors
    
    also improve names/display/unique-tag of some common sensors
---
 .../apache/brooklyn/core/feed/AbstractFeed.java    |   7 +-
 .../brooklyn/core/feed/AttributePollHandler.java   |   2 +-
 .../org/apache/brooklyn/core/feed/PollConfig.java  |  19 +-
 .../java/org/apache/brooklyn/core/feed/Poller.java | 118 +++++++++---
 .../core/sensor/AbstractAddSensorFeed.java         |  17 ++
 .../core/sensor/AbstractAddTriggerableSensor.java  | 213 +++++++++++++++++++++
 .../core/sensor/http/HttpRequestSensor.java        |  11 +-
 .../brooklyn/core/sensor/ssh/SshCommandSensor.java |  16 +-
 .../brooklyn/entity/group/DynamicClusterImpl.java  |   1 +
 .../entity/group/DynamicMultiGroupImpl.java        |   1 +
 .../apache/brooklyn/feed/AbstractCommandFeed.java  |  16 +-
 .../brooklyn/feed/function/FunctionFeed.java       |  18 +-
 .../brooklyn/feed/function/FunctionPollConfig.java |  10 +-
 .../org/apache/brooklyn/feed/http/HttpFeed.java    |   7 +-
 .../entity/software/base/SoftwareProcessImpl.java  |   1 +
 .../brooklyn/tasks/kubectl/ContainerCommons.java   |   4 +
 .../brooklyn/tasks/kubectl/ContainerSensor.java    |  62 +++---
 .../tasks/kubectl/ContainerTaskFactory.java        |   9 +-
 .../tasks/kubectl/ContainerSensorTest.java         |  33 ++++
 .../core/sensor/windows/WinRmCommandSensor.java    |  14 +-
 20 files changed, 471 insertions(+), 108 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
index 70ef397ac1..7a50fff4db 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
@@ -201,7 +201,6 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed
     
     @Override
     protected void onChanged() {
-        // TODO Auto-generated method stub
     }
 
     /**
@@ -209,7 +208,7 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed
      */
     protected void preStart() {
     }
-    
+
     /**
      * For overriding.
      */
@@ -233,6 +232,10 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed
         highlightTriggers("Running every "+minPeriod);
     }
 
+    public void highlightTriggers(String message) {
+        super.highlightTriggers(message);
+    }
+
     void onRemoveSensor(Sensor<?> sensor) {
         highlightActionPublishSensor("Clear sensor "+sensor.getName());
     }
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
index c59916984d..84e4d852ba 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
@@ -233,7 +233,7 @@ public class AttributePollHandler<V> implements PollHandler<V> {
     
     @Override
     public String getDescription() {
-        return sensor.getName()+" @ "+entity.getId()+" <- "+config;
+        return sensor.getName() + " " /*+entity.getId()*/ +" <- " + config;
     }
     
     protected String getBriefDescription() {
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
index d9990538e5..7c8144811d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.util.time.Duration;
 public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> {
 
     private long period = -1;
+    private Object otherTriggers;
     private String description;
 
     public PollConfig(AttributeSensor<T> sensor) {
@@ -43,6 +44,8 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<
     public PollConfig(PollConfig<V,T,F> other) {
         super(other);
         this.period = other.period;
+        this.otherTriggers = other.otherTriggers;
+        this.description = other.description;
     }
 
     public long getPeriod() {
@@ -69,13 +72,25 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<
         this.description = description;
         return self();
     }
-    
+
+    public F otherTriggers(Object otherTriggers) {
+        this.otherTriggers = otherTriggers;
+        return self();
+    }
+
+    public Object getOtherTriggers() {
+        return otherTriggers;
+    }
+
     public String getDescription() {
         return description;
     }
     
     @Override protected MutableList<Object> toStringOtherFields() {
-        return super.toStringOtherFields().appendIfNotNull(description);
+        MutableList<Object> result = super.toStringOtherFields().appendIfNotNull(description);
+        if (period>0 && period <= Duration.PRACTICALLY_FOREVER.toMilliseconds()) result.append("period: "+Duration.of(period));
+        if (otherTriggers!=null) result.append("triggers: "+otherTriggers);
+        return result;
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 6049ecf0e5..997c34fcf3 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -23,14 +23,20 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 
 import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
 import org.apache.brooklyn.util.core.task.ScheduledTask;
 import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,11 +67,20 @@ public class Poller<V> {
         final PollHandler<? super V> handler;
         final Duration pollPeriod;
         final Runnable wrappedJob;
+        final Entity sensorSource;
+        final Sensor<?> sensor;
+        SubscriptionHandle subscription;
         private boolean loggedPreviousException = false;
-        
+
         PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) {
+            this(job, handler, period, null, null);
+        }
+
+        PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period, Entity sensorSource, Sensor<?> sensor) {
             this.handler = handler;
             this.pollPeriod = period;
+            this.sensorSource = sensorSource;
+            this.sensor = sensor;
             
             wrappedJob = new Runnable() {
                 @Override
@@ -122,11 +137,12 @@ public class Poller<V> {
         pollJobs.add(foo);
     }
 
+    public void subscribe(Callable<V> job, PollHandler<? super V> handler, Entity sensorSource, Sensor<?> sensor) {
+        pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor));
+    }
+
     @SuppressWarnings({ "unchecked" })
     public void start() {
-        // TODO Previous incarnation of this logged this logged polledSensors.keySet(), but we don't know that anymore
-        // Is that ok, are can we do better?
-        
         if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this});
         if (started) { 
             throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", 
@@ -141,26 +157,32 @@ public class Poller<V> {
         }
         
         Duration minPeriod = null;
+        Set<String> sensors = MutableSet.of();
         for (final PollJob<V> pollJob : pollJobs) {
-            final String scheduleName = pollJob.handler.getDescription();
-            if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
-                ScheduledTask t = ScheduledTask.builder(() -> {
-                            DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), 
-                                new Callable<Void>() { @Override public Void call() {
-                                    if (!Entities.isManagedActive(entity)) {
-                                        return null;
-                                    }
-                                    if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
-                                        return null;
-                                    }
-                                    pollJob.wrappedJob.run();
-                                    return null; 
-                                } } );
-                            // explicitly make non-transient -- we want to see its execution, even if parent is transient
-                            BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
-                            return task;
-                        })
-                        .displayName("scheduled:" + scheduleName)
+            final String scheduleName = (feed!=null ? feed.getDisplayName()+", " : "") +pollJob.handler.getDescription();
+            boolean added = false;
+
+            Callable<Task<?>> tf = () -> {
+                DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity),
+                        new Callable<Void>() { @Override public Void call() {
+                            if (!Entities.isManagedActive(entity)) {
+                                return null;
+                            }
+                            if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
+                                return null;
+                            }
+                            pollJob.wrappedJob.run();
+                            return null;
+                        } } );
+                // explicitly make non-transient -- we want to see its execution, even if parent is transient
+                BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
+                return task;
+            };
+
+            if (pollJob.pollPeriod!=null && pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
+                added =true;
+                ScheduledTask t = ScheduledTask.builder(tf)
+                        .displayName("Periodic: " + scheduleName)
                         .period(pollJob.pollPeriod)
                         .cancelOnException(false)
                         .tag(feed!=null ? BrooklynTaskTags.tagForContextAdjunct(feed) : null)
@@ -169,13 +191,43 @@ public class Poller<V> {
                 if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) {
                     minPeriod = pollJob.pollPeriod;
                 }
-            } else {
-                if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
+            }
+
+            if (pollJob.sensor!=null) {
+                added = true;
+                if (pollJob.subscription!=null) {
+                    throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already has subscription %s",
+                            this, entity, pollJob.subscription));
+                }
+                sensors.add(pollJob.sensor.getName());
+                pollJob.subscription = feed.subscriptions().subscribe(pollJob.sensorSource!=null ? pollJob.sensorSource : feed.getEntity(), pollJob.sensor, event -> {
+                    // submit this on every event
+                    try {
+                        feed.getExecutionContext().submit(tf.call());
+                    } catch (Exception e) {
+                        throw Exceptions.propagate(e);
+                    }
+                });
+            }
+
+            if (!added) {
+                if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {} and no subscriptions) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
             }
         }
         
-        if (minPeriod!=null && feed!=null) {
-            feed.highlightTriggerPeriod(minPeriod);
+        if (feed!=null) {
+            if (sensors.isEmpty()) {
+                if (minPeriod==null) {
+                    feed.highlightTriggers("Not configured with a period or triggers");
+                } else {
+                    feed.highlightTriggerPeriod(minPeriod);
+                }
+            } else if (minPeriod==null) {
+                feed.highlightTriggers("Triggered by: "+sensors);
+            } else {
+                // both
+                feed.highlightTriggers("Running every "+minPeriod+" and on triggers: "+sensors);
+            }
         }
     }
     
@@ -193,6 +245,12 @@ public class Poller<V> {
         for (ScheduledTask task : tasks) {
             if (task != null) task.cancel();
         }
+        for (PollJob<?> j: pollJobs) {
+            if (j.subscription!=null) {
+                feed.subscriptions().unsubscribe(j.subscription);
+                j.subscription = null;
+            }
+        }
         oneOffTasks.clear();
         tasks.clear();
     }
@@ -205,10 +263,14 @@ public class Poller<V> {
                 break;
             }
         }
+        boolean hasSubscriptions = pollJobs.stream().anyMatch(j -> j.subscription!=null);
         if (!started && hasActiveTasks) {
             log.warn("Poller should not be running, but has active tasks, tasks: "+tasks);
         }
-        return started && hasActiveTasks;
+        if (!started && hasSubscriptions) {
+            log.warn("Poller should not be running, but has subscriptions on jobs: "+pollJobs);
+        }
+        return started && (hasActiveTasks || hasSubscriptions);
     }
     
     protected boolean isEmpty() {
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
index d7cf001235..02dd58e0ff 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
@@ -18,14 +18,30 @@
  */
 package org.apache.brooklyn.core.sensor;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.effector.AddSensorInitializer;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser;
 import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.time.Duration;
 
 import com.google.common.annotations.Beta;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 /**
  * Super-class for entity initializers that add feeds.
@@ -55,4 +71,5 @@ public abstract class AbstractAddSensorFeed<T> extends AddSensorInitializer<T> {
     public AbstractAddSensorFeed(final ConfigBag params) {
         super(params);
     }
+
 }
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
new file mode 100644
index 0000000000..b443747356
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.sensor;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.BasicConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.effector.AddSensorInitializer;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.core.feed.*;
+import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.http.HttpToolResponse;
+import org.apache.brooklyn.util.javalang.AtomicReferences;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static com.fasterxml.jackson.databind.type.LogicalType.Collection;
+
+/**
+ * Super-class for entity initializers that add feeds.
+ */
+@Beta
+public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorFeed<T> {
+
+    public static final ConfigKey<Object> SENSOR_TRIGGERS = ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers",
+            "Sensors which should trigger this feed, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor");
+
+    protected AbstractAddTriggerableSensor() {}
+    public AbstractAddTriggerableSensor(ConfigBag parameters) {
+        super(parameters);
+    }
+
+    public static <V> void scheduleWithTriggers(AbstractFeed feed, Poller<V> poller, Callable<V> pollJob, PollHandler<V> handler, long minPeriod, Set<? extends PollConfig> configs) {
+        // the logic for feeds with pollers is unncessarily convoluted; for now we try to standardize by routing calls that take other triggers
+        // through this method; would be nice to clean up (but a big job)
+
+        if (minPeriod>0 && minPeriod < Duration.PRACTICALLY_FOREVER.toMilliseconds()) {
+            poller.scheduleAtFixedRate(pollJob, handler, minPeriod);
+        }
+        for (PollConfig pc: configs) {
+            if (pc.getOtherTriggers()!=null) {
+                List<Pair<Entity, Sensor>> triggersResolved = resolveTriggers(feed.getEntity(), pc.getOtherTriggers());
+                triggersResolved.forEach(pair -> {
+                    poller.subscribe(pollJob, handler, pair.getLeft(), pair.getRight());
+                });
+            }
+        }
+    }
+
+    @JsonIgnore
+    protected Duration getPeriod(Entity context, ConfigBag config) {
+        if (config.containsKey(SENSOR_PERIOD) || !hasTriggers(config)) {
+            if (context!=null) return Tasks.resolving(config, SENSOR_PERIOD).context(context).immediately(true).get();
+            else return config.get(SENSOR_PERIOD);
+        }
+        return Duration.PRACTICALLY_FOREVER;
+    }
+
+    @JsonIgnore
+    protected Maybe<Object> getTriggersMaybe(Entity context, ConfigBag config) {
+        return Tasks.resolving(config, SENSOR_TRIGGERS).context(context).deep().immediately(true).getMaybe();
+    }
+
+    static List<Pair<Entity,Sensor>> resolveTriggers(Entity context, Object otherTriggers) {
+        Object triggers = Tasks.resolving(otherTriggers, Object.class).context(context).deep().immediately(true).get();
+
+        if (triggers==null || (triggers instanceof Collection && ((Collection)triggers).isEmpty())) return Collections.emptyList();
+        if (triggers instanceof String) {
+            SensorFeedTrigger t = new SensorFeedTrigger();
+            t.sensorName = (String)triggers;
+            triggers = MutableList.of(t);
+        }
+        if (!(triggers instanceof Collection)) {
+            throw new IllegalStateException("Triggers should be a list containing sensors or sensor names");
+        }
+
+        return ((Collection<?>)triggers).stream().map(ti -> {
+            SensorFeedTrigger t;
+
+            if (ti instanceof SensorFeedTrigger) {
+                t = (SensorFeedTrigger) ti;
+            } else {
+                if (ti instanceof Map) {
+                    t = Tasks.resolving(ti, SensorFeedTrigger.class).context(context).deep().get();
+                } else if (ti instanceof String) {
+                    t = new SensorFeedTrigger();
+                    t.sensorName = (String) ti;
+                } else {
+                    throw new IllegalStateException("Trigger should be a map specifyin entity and sensor");
+                }
+            }
+
+            Entity entity = t.entity;
+            if (entity==null && t.entityId!=null) {
+                String desiredComponentId = t.entityId;
+                List<Entity> firstGroupOfMatches = AppGroupTraverser.findFirstGroupOfMatches(context,
+                        Predicates.and(EntityPredicates.configEqualTo(BrooklynConfigKeys.PLAN_ID, desiredComponentId), x->true)::apply);
+                if (firstGroupOfMatches.isEmpty()) {
+                    firstGroupOfMatches = AppGroupTraverser.findFirstGroupOfMatches(context,
+                            Predicates.and(EntityPredicates.idEqualTo(desiredComponentId), x->true)::apply);
+                }
+                if (!firstGroupOfMatches.isEmpty()) {
+                    entity = firstGroupOfMatches.get(0);
+                } else {
+                    throw new IllegalStateException("Cannot find entity with ID '"+desiredComponentId+"'");
+                }
+            } else {
+                entity = context;
+            }
+
+            Sensor sensor = t.sensor;
+            if (sensor==null) {
+                if (t.sensorName!=null) {
+                    sensor = entity.getEntityType().getSensor(t.sensorName);
+                    if (sensor==null) sensor = Sensors.newSensor(Object.class, t.sensorName);
+                } else {
+                    throw new IllegalStateException("Sensor is required for a trigger");
+                }
+            }
+            return Pair.of(entity, sensor);
+        }).collect(Collectors.toList());
+    }
+
+    protected boolean hasTriggers(ConfigBag config) {
+        Maybe<Object> triggers = getTriggersMaybe(null, config);
+        if (triggers==null || triggers.isAbsent()) return false;
+        if (triggers.get() instanceof Collection && ((Collection)triggers.get()).isEmpty()) return false;
+        return true;
+    }
+
+    public static class SensorFeedTrigger {
+        Entity entity;
+        @JsonIgnore
+        String entityId;
+        Sensor<?> sensor;
+        @JsonIgnore
+        String sensorName;
+
+        // TODO could support predicates on the value
+
+        public void setEntity(Entity entity) {
+            this.entity = entity;
+        }
+        public void setEntity(String entityId) {
+            this.entityId = entityId;
+        }
+        public Object getEntity() {
+            return entity!=null ? entity : entityId;
+        }
+
+        public void setSensor(Sensor<?> sensor) {
+            this.sensor = sensor;
+        }
+        public void setSensor(String sensorName) {
+            this.sensorName = sensorName;
+        }
+        public Object getSensor() {
+            return sensor!=null ? sensor : sensorName;
+        }
+    }
+
+
+    protected void standardPollConfig(Entity entity, ConfigBag configBag, PollConfig<?,?,?> poll) {
+        final Boolean suppressDuplicates = EntityInitializers.resolve(configBag, SUPPRESS_DUPLICATES);
+        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME_ON_STARTUP);
+        final Duration logWarningGraceTime = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME);
+
+        poll.suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
+                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
+                .logWarningGraceTime(logWarningGraceTime)
+                .period(getPeriod(entity, initParams()))
+                .otherTriggers(getTriggersMaybe(entity, configBag).orNull());
+    }
+
+}
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
index 646ca82d22..e078eaffbe 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
@@ -29,6 +29,7 @@ import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.MapConfigKey;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor;
 import org.apache.brooklyn.feed.http.HttpFeed;
 import org.apache.brooklyn.feed.http.HttpPollConfig;
@@ -58,7 +59,7 @@ import net.minidev.json.JSONObject;
  * @see SshCommandSensor
  */
 @Beta
-public class HttpRequestSensor<T> extends AbstractAddSensorFeed<T> {
+public class HttpRequestSensor<T> extends AbstractAddTriggerableSensor<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(HttpRequestSensor.class);
 
@@ -126,11 +127,9 @@ public class HttpRequestSensor<T> extends AbstractAddSensorFeed<T> {
         HttpPollConfig<T> pollConfig = new HttpPollConfig<T>(sensor)
                 .checkSuccess(HttpValueFunctions.responseCodeEquals(200))
                 .onFailureOrException(Functions.constant((T) null))
-                .onSuccess(successFunction)
-                .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
-                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
-                .logWarningGraceTime(logWarningGraceTime)
-                .period(initParam(SENSOR_PERIOD));
+                .onSuccess(successFunction);
+
+        standardPollConfig(entity, initParams(), pollConfig);
 
         HttpFeed.Builder httpRequestBuilder = HttpFeed.builder().entity(entity)
                 .baseUri(uri)
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
index cdc9e1d083..cdc77112ad 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
@@ -39,6 +39,7 @@ import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.location.Locations;
 import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
 import org.apache.brooklyn.feed.CommandPollConfig;
 import org.apache.brooklyn.feed.ssh.SshFeed;
@@ -77,7 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * @see HttpRequestSensor
  */
 @Beta
-public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> {
+public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SshCommandSensor.class);
 
@@ -150,24 +151,17 @@ public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> {
             LOG.debug("Adding SSH sensor {} to {}", name, entity);
         }
 
-        final Boolean suppressDuplicates = EntityInitializers.resolve(params, SUPPRESS_DUPLICATES);
-        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME_ON_STARTUP);
-        final Duration logWarningGraceTime = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME);
-
         Supplier<Map<String,String>> envSupplier = new EnvSupplier(entity, params);
-
         Supplier<String> commandSupplier = new CommandSupplier(entity, params);
 
         CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor)
-                .period(initParam(SENSOR_PERIOD))
                 .env(envSupplier)
                 .command(commandSupplier)
-                .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
                 .checkSuccess(SshValueFunctions.exitStatusEquals(0))
                 .onFailureOrException(Functions.constant((T)params.get(VALUE_ON_ERROR)))
-                .onSuccess(Functionals.chain(SshValueFunctions.stdout(), new CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))))
-                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
-                .logWarningGraceTime(logWarningGraceTime);
+                .onSuccess(Functionals.chain(SshValueFunctions.stdout(), new CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))));
+
+        standardPollConfig(entity, initParams(), pollConfig);
 
         SshFeed feed = SshFeed.builder()
                 .entity(entity)
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 47db417054..69b8ca5ade 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -276,6 +276,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
     
     private void connectAllMembersUp() {
         clusterOneAndAllMembersUp = FunctionFeed.builder()
+                .uniqueTag("one-and-all-members-up")
                 .entity(this)
                 .period(Duration.FIVE_SECONDS)
                 .poll(new FunctionPollConfig<Boolean, Boolean>(CLUSTER_ONE_AND_ALL_MEMBERS_UP)
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
index a9d192ef71..7b187df98d 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
@@ -127,6 +127,7 @@ public class DynamicMultiGroupImpl extends DynamicGroupImpl implements DynamicMu
         Long interval = getConfig(RESCAN_INTERVAL);
         if (interval != null && interval > 0L) {
             rescan = FunctionFeed.builder()
+                    .uniqueTag("dynamic-multi-group-scanner")
                     .entity(this)
                     .poll(new FunctionPollConfig<Object, Void>(RESCAN)
                             .period(interval, TimeUnit.SECONDS)
diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
index 95fe7ac82a..79b2b4dd34 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
@@ -37,6 +37,7 @@ import org.apache.brooklyn.core.feed.AttributePollHandler;
 import org.apache.brooklyn.core.feed.DelegatingPollHandler;
 import org.apache.brooklyn.core.feed.Poller;
 import org.apache.brooklyn.core.location.Locations;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.feed.ssh.SshPollValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -237,15 +238,12 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
                 handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
                 if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
             }
-            
-            getPoller().scheduleAtFixedRate(
-                    new Callable<SshPollValue>() {
-                        @Override
-                        public SshPollValue call() throws Exception {
-                            return exec(pollInfo.command.get(), pollInfo.env.get());
-                        }}, 
-                    new DelegatingPollHandler<SshPollValue>(handlers),
-                    minPeriod);
+
+            AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), new Callable<SshPollValue>() {
+                @Override
+                public SshPollValue call() throws Exception {
+                    return exec(pollInfo.command.get(), pollInfo.env.get());
+                }}, new DelegatingPollHandler(handlers), minPeriod, configs);
         }
     }
     
diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
index 962c2c0638..f9f2a1867a 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
@@ -32,6 +32,8 @@ import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.core.feed.AttributePollHandler;
 import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
+import org.apache.brooklyn.util.http.HttpToolResponse;
 import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,6 +104,7 @@ public class FunctionFeed extends AbstractFeed {
         private long period = 500;
         private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
         private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList();
+        private String name;
         private String uniqueTag;
         private volatile boolean built;
 
@@ -129,6 +132,10 @@ public class FunctionFeed extends AbstractFeed {
             polls.add(config);
             return this;
         }
+        public Builder name(String name) {
+            this.name = name;
+            return this;
+        }
         public Builder uniqueTag(String uniqueTag) {
             this.uniqueTag = uniqueTag;
             return this;
@@ -182,6 +189,10 @@ public class FunctionFeed extends AbstractFeed {
             Callable<?> job = config.getCallable();
             polls.put(new FunctionPollIdentifier(job), configCopy);
         }
+
+        if (builder.name!=null) setDisplayName(builder.name);
+        else if (builder.uniqueTag!=null) setDisplayName(builder.uniqueTag);
+
         config().set(POLLS, polls);
         initUniqueTag(builder.uniqueTag, polls.values());
     }
@@ -199,11 +210,8 @@ public class FunctionFeed extends AbstractFeed {
                 handlers.add(new AttributePollHandler(config, entity, this));
                 if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
             }
-            
-            getPoller().scheduleAtFixedRate(
-                    (Callable)pollInfo.job,
-                    new DelegatingPollHandler(handlers), 
-                    minPeriod);
+
+            AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), (Callable)pollInfo.job, new DelegatingPollHandler(handlers), minPeriod, configs);
         }
     }
 }
diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
index ffe690af44..95a11a2a24 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.util.javalang.JavaClassNames;
 
 public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfig<S, T>> {
 
+    private String name;
     private Callable<?> callable;
     
     public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> sensor) {
@@ -51,12 +52,18 @@ public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfi
     public FunctionPollConfig(FunctionPollConfig<S, T> other) {
         super(other);
         callable = other.callable;
+        name = other.name;
     }
     
     public Callable<? extends Object> getCallable() {
         return callable;
     }
-    
+
+    public FunctionPollConfig<S, T> name(String name) {
+        this.name = name;
+        return this;
+    }
+
     /**
      * The {@link Callable} to be invoked on each poll.
      * <p>
@@ -108,6 +115,7 @@ public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfi
 
     @Override protected String toStringBaseName() { return "fn"; }
     @Override protected String toStringPollSource() {
+        if (name!=null) return name;
         if (callable==null) return null;
         String cs = callable.toString();
         if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) {
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
index 2f0247bc8b..4d1a13ffa1 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
@@ -37,6 +37,7 @@ import org.apache.brooklyn.core.feed.Poller;
 import org.apache.brooklyn.core.location.Locations;
 import org.apache.brooklyn.core.location.Machines;
 import org.apache.brooklyn.core.location.internal.LocationInternal;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.util.core.javalang.BrooklynHttpConfig;
 import org.apache.brooklyn.util.executor.HttpExecutorFactory;
 import org.apache.brooklyn.util.guava.Maybe;
@@ -416,8 +417,10 @@ public class HttpFeed extends AbstractFeed {
                             .config(BrooklynHttpConfig.httpConfigBuilder(getEntity()).build())
                             .build());
                     return createHttpToolRespose(response, startTime);
-                }};
-                getPoller().scheduleAtFixedRate(pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod);
+                }
+            };
+
+            AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod, configs);
         }
     }
 
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
index 623e8f4090..faaff23662 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
@@ -291,6 +291,7 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft
     protected void connectServiceUpIsRunning() {
         Duration period = config().get(SERVICE_PROCESS_IS_RUNNING_POLL_PERIOD);
         serviceProcessIsRunning = FunctionFeed.builder()
+                .uniqueTag("check-service-process-is-running")
                 .entity(this)
                 .period(period)
                 .poll(new FunctionPollConfig<Boolean, Boolean>(SERVICE_PROCESS_IS_RUNNING)
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
index 130c25b963..474761c724 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
@@ -22,7 +22,9 @@ import com.google.common.collect.Lists;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.config.MapConfigKey;
 import org.apache.brooklyn.core.config.SetConfigKey;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.util.time.Duration;
 
 import java.util.List;
@@ -42,6 +44,8 @@ public interface ContainerCommons {
     ConfigKey<List> COMMAND = ConfigKeys.newConfigKey(List.class,"command", "Single command and optional arguments to execute for the container (overrides image EntryPoint and Cmd)", Lists.newArrayList());
     ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args", "Additional arguments to pass to the command at the container (in addition to the command supplied here or the default in the image)", Lists.newArrayList());
 
+    MapConfigKey<Object> SHELL_ENVIRONMENT = BrooklynConfigKeys.SHELL_ENVIRONMENT;
+
     ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "timeout", "Container execution timeout (default 5 minutes)", Duration.minutes(5));
 
     ConfigKey<Boolean> REQUIRE_EXIT_CODE_ZERO = ConfigKeys.newConfigKey(Boolean.class, "requireExitCodeZero", "Whether task should fail if container returns non-zero exit code (default true)", true);
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
index d35cac56d9..dbcf071622 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
@@ -18,29 +18,29 @@
  */
 package org.apache.brooklyn.tasks.kubectl;
 
+import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor;
 import org.apache.brooklyn.feed.function.FunctionFeed;
 import org.apache.brooklyn.feed.function.FunctionPollConfig;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.brooklyn.core.mgmt.BrooklynTaskTags.SENSOR_TAG;
 
 @SuppressWarnings({"UnstableApiUsage", "deprecation", "unchecked"})
-public class ContainerSensor<T> extends AbstractAddSensorFeed<T> implements ContainerCommons {
+public class ContainerSensor<T> extends AbstractAddTriggerableSensor<T> implements ContainerCommons {
 
     public static final ConfigKey<String> FORMAT = SshCommandSensor.FORMAT;
     public static final ConfigKey<Boolean> LAST_YAML_DOCUMENT = SshCommandSensor.LAST_YAML_DOCUMENT;
@@ -63,34 +63,42 @@ public class ContainerSensor<T> extends AbstractAddSensorFeed<T> implements Cont
 
         ConfigBag configBag = ConfigBag.newInstanceCopying(initParams());
 
-        final Boolean suppressDuplicates = EntityInitializers.resolve(configBag, SUPPRESS_DUPLICATES);
-        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME_ON_STARTUP);
-        final Duration logWarningGraceTime = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME);
+        FunctionPollConfig<Object, String> poll = new FunctionPollConfig<>(sensor)
+                .callable(new ContainerSensorCallable(entity, configBag, sensor));
+        standardPollConfig(entity, configBag, poll);
 
-        ((EntityInternal)entity).feeds().add(FunctionFeed.builder()
+        ((EntityInternal) entity).feeds().add(FunctionFeed.builder()
                 .entity(entity)
-                .period(initParam(SENSOR_PERIOD))
                 .onlyIfServiceUp()
-                .poll(new FunctionPollConfig<>(sensor)
-                        .callable(new Callable<Object>() {
-                            @Override
-                            public Object call() throws Exception {
-                                Task<ContainerTaskResult> containerTask = ContainerTaskFactory.newInstance()
-                                        .summary("Running " + EntityInitializers.resolve(configBag, SENSOR_NAME))
-                                        .jobIdentifier(entity.getApplication()+"-"+entity.getId() + "-" + SENSOR_TAG)
-                                        .configure(configBag.getAllConfig())
-                                        .newTask();
-                                DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
-                                String mainStdout = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES)).getMainStdout();
-                                return (new SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(mainStdout);
-                            }
-                        })
-                        .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
-                        .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
-                        .logWarningGraceTime(logWarningGraceTime))
+                .poll(poll)
                 .build());
     }
 
+    public static class ContainerSensorCallable implements Callable<Object> {
+        private final Entity entity;
+        private final ConfigBag configBag;
+        private final Sensor<?> sensor;
 
-}
+        public ContainerSensorCallable(Entity entity, ConfigBag configBag, Sensor<?> sensor) {
+            this.entity = entity;
+            this.configBag = configBag;
+            this.sensor = sensor;
+        }
+        public Object call() throws Exception {
+            Task<ContainerTaskResult> containerTask = ContainerTaskFactory.newInstance()
+                    .summary("Running " + EntityInitializers.resolve(configBag, SENSOR_NAME))
+                    .jobIdentifier(entity.getApplication() + "-" + entity.getId() + "-" + SENSOR_TAG)
+                    .configure(configBag.getAllConfig())
+                    .newTask();
+            DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+            String mainStdout = containerTask.getUnchecked(configBag.get(TIMEOUT)).getMainStdout();
+            return (new SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), configBag.get(FORMAT), configBag.get(LAST_YAML_DOCUMENT))).apply(mainStdout);
+        }
 
+        @Override
+        public String toString() {
+            return "container-sensor[" + configBag.get(CONTAINER_IMAGE) + "]";
+        }
+    }
+
+}
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index 84cb436b4b..310888cf08 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -23,7 +23,6 @@ import com.google.gson.Gson;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.TaskAdaptable;
-import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
@@ -124,7 +123,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
 
                     LOG.debug("Submitting container job in namespace "+namespace+", name "+kubeJobName);
 
-                    Map<String, String> env = new ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config, BrooklynConfigKeys.SHELL_ENVIRONMENT));
+                    Map<String, String> env = new ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config, SHELL_ENVIRONMENT));
                     final BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> jobYaml =  new KubeJobFileCreator()
                             .withImage(containerImage)
                             .withImagePullPolicy(containerImagePullPolicy)
@@ -578,7 +577,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
             return null;
         }
 
-        LOG.info("Deleting namespace " + namespace);
+        LOG.debug("Deleting namespace " + namespace);
         // do this not as a subtask so we can run even if the main queue fails
         ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode();
         if (!requireSuccess) tf = tf.allowingNonZeroExitCode();
@@ -611,7 +610,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         return environmentVariablesRaw(map);
     }
     public T environmentVariablesRaw(Map<String,?> map) {
-        config.put(BrooklynConfigKeys.SHELL_ENVIRONMENT, MutableMap.copyOf( map ) );
+        config.put(SHELL_ENVIRONMENT, MutableMap.copyOf( map ) );
         return self();
     }
 
@@ -620,7 +619,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         return this.environmentVariableRaw(key, (Object)val);
     }
     public T environmentVariableRaw(String key, Object val) {
-        return environmentVariablesRaw(MutableMap.copyOf( config.get(BrooklynConfigKeys.SHELL_ENVIRONMENT) ).add(key, val));
+        return environmentVariablesRaw(MutableMap.copyOf( config.get(SHELL_ENVIRONMENT) ).add(key, val));
     }
 
     @Override
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
index 5de54807d3..679162d49e 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
@@ -21,13 +21,20 @@ package org.apache.brooklyn.tasks.kubectl;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Dumper;
 import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.testng.annotations.Test;
 
 @SuppressWarnings( "UnstableApiUsage")
@@ -123,4 +130,30 @@ public class ContainerSensorTest extends BrooklynAppUnitTestSupport {
         EntityAsserts.assertAttributeEqualsEventually(parentEntity, Attributes.SERVICE_UP, true);
         EntityAsserts.assertAttributeEventually(parentEntity, Sensors.newStringSensor("tf-version-sensor"), s -> s.contains("Terraform"));
     }
+
+    @Test
+    public void testTriggeredContainerSensor() {
+        AttributeSensor<Object> trigger = Sensors.newSensor(Object.class, "the-trigger");
+        AttributeSensor<Object> triggered = Sensors.newSensor(Object.class, "triggered");
+        ConfigBag parameters = ConfigBag.newInstance(MutableMap.of(
+                ContainerCommons.CONTAINER_IMAGE, "stedolan/jq",
+                ContainerCommons.CONTAINER_IMAGE_PULL_POLICY, PullPolicy.IF_NOT_PRESENT,
+                ContainerCommons.SHELL_ENVIRONMENT, MutableMap.of("LAST_TRIGGER", DependentConfiguration.attributeWhenReady(app, trigger)),
+                ContainerCommons.BASH_SCRIPT, ImmutableList.of("echo " + "$LAST_TRIGGER" + " | jq .value"),
+                ContainerSensor.SENSOR_TRIGGERS, MutableList.of(MutableMap.of("entity", app.getId(), "sensor", "the-trigger")),
+                ContainerSensor.SENSOR_NAME, "triggered"));
+
+        ContainerSensor<String> initializer = new ContainerSensor<>(parameters);
+        TestEntity child = app.createAndManageChild(EntitySpec.create(TestEntity.class).addInitializer(initializer));
+        app.start(ImmutableList.of());
+
+        EntityAsserts.assertAttributeEquals(child, triggered, null);
+        app.sensors().set(trigger, "{ \"name\": \"bob\", \"value\": 3 }");
+
+        Time.sleep(Duration.ONE_SECOND);
+        Dumper.dumpInfo(app);
+
+        EntityAsserts.assertAttributeEventuallyNonNull(child, triggered);
+        EntityAsserts.assertAttributeEquals(child, triggered, "3");
+    }
 }
diff --git a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
index ca18b25048..08d9a2ec03 100644
--- a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
+++ b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
@@ -30,6 +30,7 @@ import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
 import org.apache.brooklyn.feed.CommandPollConfig;
 import org.apache.brooklyn.feed.ssh.SshValueFunctions;
@@ -63,7 +64,7 @@ import com.google.common.reflect.TypeToken;
  * @see HttpRequestSensor
  */
 @Beta
-public final class WinRmCommandSensor<T> extends AbstractAddSensorFeed<T> {
+public final class WinRmCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(WinRmCommandSensor.class);
 
@@ -90,10 +91,6 @@ public final class WinRmCommandSensor<T> extends AbstractAddSensorFeed<T> {
             LOG.debug("Adding WinRM sensor {} to {}", name, entity);
         }
 
-        final Boolean suppressDuplicates = EntityInitializers.resolve(initParams(), SUPPRESS_DUPLICATES);
-        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(initParams(), LOG_WARNING_GRACE_TIME_ON_STARTUP);
-        final Duration logWarningGraceTime = EntityInitializers.resolve(initParams(), LOG_WARNING_GRACE_TIME);
-
         Supplier<Map<String,String>> envSupplier = new Supplier<Map<String,String>>() {
             @SuppressWarnings("serial")
             @Override
@@ -127,16 +124,15 @@ public final class WinRmCommandSensor<T> extends AbstractAddSensorFeed<T> {
                 .period(initParam(SENSOR_PERIOD))
                 .env(envSupplier)
                 .command(commandSupplier)
-                .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
                 .checkSuccess(SshValueFunctions.exitStatusEquals(0))
                 .onFailureOrException(Functions.constant((T) null))
                 .onSuccess(Functions.compose(new Function<String, T>() {
                         @Override
                         public T apply(String input) {
                             return TypeCoercions.coerce(Strings.trimEnd(input), (Class<T>) sensor.getType());
-                        }}, SshValueFunctions.stdout()))
-                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
-                .logWarningGraceTime(logWarningGraceTime);
+                        }}, SshValueFunctions.stdout()));
+
+        standardPollConfig(entity, initParams(), pollConfig);
 
         CmdFeed feed = CmdFeed.builder()
                 .entity(entity)


[brooklyn-server] 02/04: run container tasks in background and as transient

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 19fac6b1eb8ebcab4d319235158888fbb16f6f24
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Aug 16 11:51:53 2022 +0100

    run container tasks in background and as transient
    
    avoid polluting tasks view with boring details of how container tasks need to be run
---
 .../tasks/kubectl/ContainerTaskFactory.java        | 96 +++++++++++++---------
 1 file changed, 58 insertions(+), 38 deletions(-)

diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index f1b8caf255..84cb436b4b 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInitializers;
@@ -79,6 +80,17 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
     private Boolean deleteNamespace;
     Function<ContainerTaskResult,RET> returnConversion;
 
+    private <T extends TaskAdaptable<?>> T runTask(Entity entity, T t, boolean block, boolean markTransient) {
+        // previously we queued all the callers of this as sub-tasks, but that bloats the kilt diagram, so use entity.submit instead, optionally with blocking.
+        // most will be transient, apart from the main flow, so that they get GC'd quicker and don't clutter the kilt
+        //DynamicTasks.queue(t);
+
+        if (markTransient) BrooklynTaskTags.setTransient(t.asTask());
+        Entities.submit(entity, t);
+        if (block) { t.asTask().blockUntilEnded(Duration.PRACTICALLY_FOREVER); }
+        return t;
+    }
+
     @Override
     public Task<RET> newTask() {
         final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
@@ -152,7 +164,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                             if (createNamespace==null) {
                                 createNsJobF.allowingNonZeroExitCode();
                             }
-                            createNsJob = DynamicTasks.queue(createNsJobF.newTask());
+                            createNsJob = runTask(entity, createNsJobF.newTask(), true, true);
                         }
 
                         // only delete if told to always, unless we successfully create it
@@ -177,13 +189,13 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                                 }
                             }
 
-                            DynamicTasks.queue(
-                                    newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask());
+                            runTask(entity,
+                                    newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask(), true, true);
 
                             final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
 
                             // wait for it to be running (or failed / succeeded) -
-                            PodPhases phaseOnceActive = waitForContainerAvailable(kubeJobName, result, timer);
+                            PodPhases phaseOnceActive = waitForContainerAvailable(entity, kubeJobName, result, timer);
 //                            waitForContainerPodContainerState(kubeJobName, result, timer);
 
                             // notify once pod is available
@@ -198,14 +210,14 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
 
                             LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded);
 
-                            ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask());
-                            ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask());
+                            ProcessTaskWrapper<String> retrieveOutput = runTask(entity, newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask(), false, true);
+                            ProcessTaskWrapper<String> retrieveExitCode = runTask(entity, newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask(), false, true);
 
-                            DynamicTasks.waitForLast();
                             result.mainStdout = retrieveOutput.get();
 
                             updateStdoutWithNewData(stdout, result.mainStdout);
 
+                            retrieveExitCode.get();
                             String exitCodeS = retrieveExitCode.getStdout();
                             if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
                             else result.mainExitCode = -1;
@@ -223,10 +235,12 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                             } else {
                                 Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
                                 if (!Boolean.TRUE.equals(devMode)) {
-                                    Entities.submit(entity, newDeleteJobTask(kubeJobName)
-                                                    // namespace might have been deleted in parallel so okay if we don't delete the job
-                                                    .allowingNonZeroExitCode()
-                                                    .newTask()).get();
+                                    Task<String> deletion = Entities.submit(entity, BrooklynTaskTags.setTransient(newDeleteJobTask(kubeJobName)
+                                            // namespace might have been deleted in parallel so okay if we don't delete the job;
+                                            .allowingNonZeroExitCode()
+                                            .newTask().asTask()));
+                                    // no big deal if not deleted, job ID will always be unique, so allow to delete in background and not block subsequent tasks
+                                    //deletion.get();
                                 }
                             }
                             DynamicTasks.waitForLast();
@@ -242,7 +256,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
     }
 
     private Boolean waitForContainerCompletedUsingK8sWaitFor(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
-        return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+        return runTask(entity, Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
             while (true) {
                 LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
 
@@ -255,22 +269,22 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 // other one-off checks for job error, we could do here
                 // e.g. if image can't be pulled, for instance
 
-                refreshStdout(stdout, kubeJobName, timer);
+                refreshStdout(entity, stdout, kubeJobName, timer);
 
                 // probably timed out or job not yet available; short wait then retry
                 Time.sleep(Duration.millis(50));
             }
 
-        }).build()).getUnchecked();
+        }).build(), false, true).getUnchecked();
     }
 
     private Boolean waitForContainerCompletedUsingPodState(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
-        return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+        return runTask(entity, Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
             long retryDelay = 10;
             while (true) {
                 LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
 
-                PodPhases phase = checkPodPhase(kubeJobName);
+                PodPhases phase = checkPodPhase(entity, kubeJobName);
                 if (phase.equals(PodPhases.Succeeded)) return true;
                 if (phase.equals(PodPhases.Failed)) return false;
 
@@ -279,7 +293,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 // other one-off checks for job error, we could do here
                 // e.g. if image can't be pulled, for instance
 
-                refreshStdout(stdout, kubeJobName, timer);
+                refreshStdout(entity, stdout, kubeJobName, timer);
 
                 // probably timed out or job not yet available; short wait then retry
                 Time.sleep(Duration.millis(retryDelay));
@@ -290,14 +304,13 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 }
             }
 
-        }).build()).getUnchecked();
+        }).build(), false, true).getUnchecked();
     }
 
-    private void refreshStdout(ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException {
+    private void refreshStdout(Entity entity, ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException {
         // finally get the partial log for reporting
-        ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask());
-        BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
-        outputSoFarCmd.block();
+        ProcessTaskWrapper<String> outputSoFarCmd = runTask(entity,
+                newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask(), true, true);
         if (outputSoFarCmd.getExitCode() != 0) {
             throw new IllegalStateException("Error detected with container job while reading logs (exit code " + outputSoFarCmd.getExitCode() + "): " + outputSoFarCmd.getStdout() + " / " + outputSoFarCmd.getStderr());
         }
@@ -415,15 +428,15 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         return null;
     }
 
-    private PodPhases waitForContainerAvailable(String kubeJobName, ContainerTaskResult result, CountdownTimer timer) {
-        return DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
+    private PodPhases waitForContainerAvailable(Entity entity, String kubeJobName, ContainerTaskResult result, CountdownTimer timer) {
+        return runTask(entity, Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
             long first = System.currentTimeMillis();
             long last = first;
             long backoffMillis = 10;
             PodPhases phase = PodPhases.Unknown;
             long startupReportDelay = 1000;  // report any start longer than 1s
             while (timer.isNotExpired()) {
-                phase = checkPodPhase(kubeJobName);
+                phase = checkPodPhase(entity, kubeJobName);
                 if (phase == PodPhases.Failed || phase == PodPhases.Succeeded || phase == PodPhases.Running) {
                     if (startupReportDelay>5000) LOG.info("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
                     else LOG.debug("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
@@ -431,13 +444,15 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 }
 
                 if (phase!=PodPhases.Unknown && Strings.isBlank(result.kubePodName)) {
-                    result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim();
+                    result.kubePodName = runTask(entity, newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask(), false, true).get().trim();
                 }
                 if (phase == PodPhases.Pending && Strings.isNonBlank(result.kubePodName)) {
                     // if pending, need to look for errors
-                    String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask()).get().trim();
+                    String failedEvents = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask(),
+                            false, true).get().trim();
                     if (!"[]".equals(failedEvents)) {
-                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask()).get().trim();
+                        String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask(),
+                                false, false).get().trim();
                         throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events);
                     }
                 }
@@ -450,15 +465,18 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                     Consumer<String> log = startupReportDelay<3*1000 ? LOG::debug : LOG::info;
 
                     log.accept("Container taking a while to start ("+Duration.millis(last-first)+"): "+namespace+" "+ kubeJobName +" "+ result.kubePodName+" / phase '"+phase+"'");
-                    String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+                    String stateJsonS = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask(),
+                            false, true).get().trim();
                     if (Strings.isNonBlank(stateJsonS)) {
                         log.accept("Pod state: "+stateJsonS);
                     }
                     if (Strings.isNonBlank(result.kubePodName)) {
-                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
+                        String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask(),
+                                false, true).get().trim();
                         log.accept("Pod events: \n"+events);
                     } else {
-                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim();
+                        String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask(),
+                                false, true).get().trim();
                         log.accept("Job events: \n"+events);
                     }
 
@@ -474,24 +492,25 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 if (backoffMillis<80) backoffMillis*=2;
             }
             throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'");
-        }).build()).getUnchecked();
+        }).build(), false, true).getUnchecked();
     }
 
-    private PodPhases checkPodPhase(String kubeJobName) {
-        PodPhases succeeded = getPodPhaseFromContainerState(kubeJobName);
+    private PodPhases checkPodPhase(Entity entity, String kubeJobName) {
+        PodPhases succeeded = getPodPhaseFromContainerState(entity, kubeJobName);
         if (succeeded != null) return succeeded;
 
         // this is the more official way, fall back to it if above is not recognised (eg waiting)
-        String phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim();
+        String phase = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask(), false, true).get().trim();
         for (PodPhases candidate: PodPhases.values()) {
             if (candidate.name().equalsIgnoreCase(phase)) return candidate;
         }
         return PodPhases.Unknown;
     }
 
-    private PodPhases getPodPhaseFromContainerState(String kubeJobName) {
+    private PodPhases getPodPhaseFromContainerState(Entity entity, String kubeJobName) {
         // pod container state is populated much sooner than the pod status and job fields and wait, so prefer it
-        String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+        String stateJsonS = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask(),
+                false, true).get().trim();
         if (Strings.isNonBlank(stateJsonS)) {
             Object stateO = new Gson().fromJson(stateJsonS, Object.class);
             if (stateO instanceof Map) {
@@ -564,7 +583,8 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode();
         if (!requireSuccess) tf = tf.allowingNonZeroExitCode();
         else tf = tf.requiringExitCodeZero();
-        ProcessTaskWrapper<String> task = Entities.submit(entity, tf.newTask());
+        ProcessTaskWrapper<String> task = tf.newTask();
+        Entities.submit(entity, BrooklynTaskTags.setTransient(task.asTask()));
         if (wait) {
             task.get();
             LOG.info("Deleted namespace " + namespace);


[brooklyn-server] 03/04: move AppGroupTraverser to core project for more access

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit fbd9cb88e7b25c42fdc7cd88dfa7b54964dd255a
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Aug 16 12:15:26 2022 +0100

    move AppGroupTraverser to core project for more access
---
 .../camp/brooklyn/spi/dsl/AppGroupTraverser.java   | 109 +--------------------
 .../brooklyn/spi/dsl/methods/DslComponent.java     |   2 +-
 .../brooklyn/spi/dsl/AppGroupTraverserTest.java    |   1 +
 .../core/mgmt/internal}/AppGroupTraverser.java     |   4 +-
 4 files changed, 7 insertions(+), 109 deletions(-)

diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverser.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverser.java
index 6f1bdebf53..545a95d481 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverser.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverser.java
@@ -18,122 +18,19 @@
  */
 package org.apache.brooklyn.camp.brooklyn.spi.dsl;
 
-import org.apache.brooklyn.api.entity.Application;
 import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.util.collections.MutableSet;
 
 import java.util.List;
-import java.util.Set;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 
-// progressively look up and down to application (topology) boundaries
-
-// ie first look subject to application boundaries, ie up to ancestor which is an application, and through descendants which are applications
-// then look past all of those to next app boundary
-
-// TODO would be nice to have tests which look at parent, children, grandchildren, older and younger nephews
-
-// TODO ideally would be enhanced to look at the "depth in ancestor" tag, to work based on entity definition boundaries rather than app boundaries
-
-/**
- *
- * Progressively expands groups based on "application" boundaries, returning each distal group.
- * Useful e.g. to find nodes matching an ID included in a particular application definition and not children.
- <code>
- - APP1
-   - ENT1
-   - APP2
-     - ENT2
-     - ENT3
-   - APP3
-     - ENT4
-       - ENT5
-       - APP4
-          - ENT6
-       - ENT7
-     - APP5
-       - ENT8
- </code>
-
- if this is initialized with ENT4, it will start with that in {@link #visitedThisTime} and {@link #visited}.
- one invocation of {@link #next()} will return an instance where {@link #visitedThisTime} is {APP3,ENT5,APP4,ENT7,APP5}, and {@link #visited} contains that and ENT4;
- it will not go below APP4 or APP5 because those are {@link Application} boundaries, nor will it go above APP3, on that pass.
- an invocation of {@link #next()} on that instance will then return {@link #visitedThisTime} containing {APP1,ENT1,APP2,ENT6,ENT8},
- i.e. the items above APP3 (but not above the next highest ancestor implementing {@link Application},
- and its {@link #visited} will (as it always does) contain those items plus the items previously visited.
- an invocation of {@link #next()} on that instance will then return {@link #visitedThisTime} empty.
- <p>
- see {@link #findFirstGroupOfMatches(Entity, Predicate)}.
-*/
+@Deprecated
+/** @deprecated since 1.1 use {@link org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser */
 public class AppGroupTraverser {
 
-    int depth = -1;
-    Set<Entity> visited = MutableSet.of();
-    Set<Entity> visitedThisTime = MutableSet.of();
-    // _parent_ of the last node we have visited;
-    // after first iteration this should have at least (exactly?) one visited child which is a topology template (not a node)
-    Entity ancestorBound = null;
-    // children whom we have not yet visited, because they are part of a new topology template
-    Set<Entity> descendantBounds = MutableSet.of();
-
-    protected AppGroupTraverser() {
-    }
-
-    AppGroupTraverser(Entity source) {
-        this.visitedThisTime.add(source);
-        this.visited.add(source);
-        this.ancestorBound = source.getParent();
-        this.descendantBounds.addAll(source.getChildren());
-    }
-
-    AppGroupTraverser next() {
-        AppGroupTraverser result = new AppGroupTraverser();
-        result.visited.addAll(visited);
-        result.depth = depth + 1;
-        descendantBounds.forEach(c -> result.visitDescendants(c, true));
-        if (ancestorBound != null) result.visitAncestorsAndTheirDescendants(ancestorBound);
-        return result;
-    }
-
-    protected void visitAncestorsAndTheirDescendants(Entity ancestor) {
-        // go to the top of the containing topology template / application boundary
-        Entity appAncestor = ancestor;
-        while (!(appAncestor instanceof Application) && appAncestor.getParent() != null)
-            appAncestor = appAncestor.getParent();
-        visitDescendants(appAncestor, true);
-        ancestorBound = appAncestor.getParent() != null ? appAncestor.getParent() : null;
-    }
-
-    protected void visitDescendants(Entity node, boolean isFirst) {
-        if (!isFirst && !visited.add(node)) {
-            // already visited
-            return;
-        }
-        // normal entity
-        visitedThisTime.add(node);
-
-        if (!isFirst && node instanceof Application) {
-            descendantBounds.add(node);
-        } else {
-            node.getChildren().forEach(c -> this.visitDescendants(c, false));
-        }
-    }
-
-    boolean hasNext() {
-        return ancestorBound != null || !descendantBounds.isEmpty();
-    }
-
-    AppGroupTraverser expandUntilMatchesFound(Predicate<Entity> test) {
-        if (visitedThisTime.stream().anyMatch(test) || visitedThisTime.isEmpty()) return this;
-        return next().expandUntilMatchesFound(test);
-    }
-
     /** Progressively expands across {@link org.apache.brooklyn.api.entity.Application} boundaries until one or more matching entities are found. */
     public static List<Entity> findFirstGroupOfMatches(Entity source, Predicate<Entity> test) {
-        AppGroupTraverser traversed = new AppGroupTraverser(source).expandUntilMatchesFound(test);
-        return traversed.visitedThisTime.stream().filter(test).collect(Collectors.toList());
+        return org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser.findFirstGroupOfMatches(source, test);
     }
 
 }
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index f9e3b5b460..10e1802c13 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -46,7 +46,7 @@ import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.camp.brooklyn.BrooklynCampConstants;
-import org.apache.brooklyn.camp.brooklyn.spi.dsl.AppGroupTraverser;
+import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.DslAccessible;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.DslFunctionSource;
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverserTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverserTest.java
index b1d01f47d9..f2f1c56798 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverserTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverserTest.java
@@ -26,6 +26,7 @@ import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.camp.brooklyn.AbstractYamlTest;
 import org.apache.brooklyn.camp.brooklyn.BrooklynCampConstants;
 import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import org.testng.annotations.Test;
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverser.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AppGroupTraverser.java
similarity index 96%
copy from camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverser.java
copy to core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AppGroupTraverser.java
index 6f1bdebf53..77c96bfcd9 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/AppGroupTraverser.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AppGroupTraverser.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.brooklyn.camp.brooklyn.spi.dsl;
+package org.apache.brooklyn.core.mgmt.internal;
 
 import org.apache.brooklyn.api.entity.Application;
 import org.apache.brooklyn.api.entity.Entity;
@@ -130,7 +130,7 @@ public class AppGroupTraverser {
         return next().expandUntilMatchesFound(test);
     }
 
-    /** Progressively expands across {@link org.apache.brooklyn.api.entity.Application} boundaries until one or more matching entities are found. */
+    /** Progressively expands across {@link Application} boundaries until one or more matching entities are found. */
     public static List<Entity> findFirstGroupOfMatches(Entity source, Predicate<Entity> test) {
         AppGroupTraverser traversed = new AppGroupTraverser(source).expandUntilMatchesFound(test);
         return traversed.visitedThisTime.stream().filter(test).collect(Collectors.toList());