You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/21 17:23:42 UTC

[brooklyn-server] 06/07: conditions on some sensor feeds, and code tidy

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 907be023cf205c52b2136cf3b8c771aba4502157
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Sat Aug 20 00:42:15 2022 +0100

    conditions on some sensor feeds, and code tidy
---
 .../org/apache/brooklyn/core/feed/PollConfig.java  |  13 +++
 .../java/org/apache/brooklyn/core/feed/Poller.java | 108 ++++++++++++++++-----
 .../core/sensor/AbstractAddTriggerableSensor.java  |  56 +++++------
 .../apache/brooklyn/feed/AbstractCommandFeed.java  |  19 +---
 .../brooklyn/feed/function/FunctionFeed.java       |  18 +---
 .../org/apache/brooklyn/feed/http/HttpFeed.java    |  62 ++++--------
 6 files changed, 142 insertions(+), 134 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
index 7c8144811d..4533a73b7a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
@@ -21,9 +21,11 @@ package org.apache.brooklyn.core.feed;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.predicates.DslPredicates;
 import org.apache.brooklyn.util.time.Duration;
 
 /**
@@ -36,6 +38,7 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<
     private long period = -1;
     private Object otherTriggers;
     private String description;
+    private Supplier<DslPredicates.DslPredicate> condition;
 
     public PollConfig(AttributeSensor<T> sensor) {
         super(sensor);
@@ -45,6 +48,7 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<
         super(other);
         this.period = other.period;
         this.otherTriggers = other.otherTriggers;
+        this.condition = other.condition;
         this.description = other.description;
     }
 
@@ -82,6 +86,15 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<
         return otherTriggers;
     }
 
+    public F condition(Supplier<DslPredicates.DslPredicate> condition) {
+        this.condition = condition;
+        return self();
+    }
+
+    public Supplier<DslPredicates.DslPredicate> getCondition() {
+        return condition;
+    }
+
     public String getDescription() {
         return description;
     }
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 997c34fcf3..2b1a09488e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -19,25 +19,37 @@
 package org.apache.brooklyn.core.feed;
 
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.Sensor;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
+import org.apache.brooklyn.feed.AbstractCommandFeed;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.core.predicates.DslPredicates;
 import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
 import org.apache.brooklyn.util.core.task.ScheduledTask;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,30 +74,70 @@ public class Poller<V> {
     private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>();
     private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>();
     private volatile boolean started = false;
-    
+
+    public <PI,PC extends PollConfig> void scheduleFeed(AbstractFeed feed, SetMultimap<PI,PC> polls, Function<PI,Callable<?>> jobFactory) {
+        for (final PI identifer : polls.keySet()) {
+            Set<PC> configs = polls.get(identifer);
+            long minPeriodMillis = Long.MAX_VALUE;
+            Set<AttributePollHandler<?>> handlers = Sets.newLinkedHashSet();
+
+            for (PC config : configs) {
+                handlers.add(new AttributePollHandler(config, entity, feed));
+                if (config.getPeriod() > 0) minPeriodMillis = Math.min(minPeriodMillis, config.getPeriod());
+            }
+
+            Callable pollJob = jobFactory.apply(identifer);
+            DelegatingPollHandler handlerDelegate = new DelegatingPollHandler(handlers);
+            boolean subscribed = false;
+            for (PollConfig pc: configs) {
+                if (pc.getOtherTriggers()!=null) {
+                    List<Pair<Entity, Sensor>> triggersResolved = AbstractAddTriggerableSensor.resolveTriggers(feed.getEntity(), pc.getOtherTriggers());
+                    for (Pair<Entity, Sensor> pair : triggersResolved) {// TODO initial, condition
+                        subscribe(pollJob, handlerDelegate, pair.getLeft(), pair.getRight(), pc.getCondition());
+                        subscribed = true;
+                    }
+                }
+            }
+            if (minPeriodMillis>0 && (minPeriodMillis < Duration.PRACTICALLY_FOREVER.toMilliseconds() || !subscribed)) {
+                scheduleAtFixedRate(pollJob, handlerDelegate, minPeriodMillis);
+            }
+        }
+    }
+
     private static class PollJob<V> {
         final PollHandler<? super V> handler;
         final Duration pollPeriod;
         final Runnable wrappedJob;
-        final Entity sensorSource;
-        final Sensor<?> sensor;
+        final Entity pollTriggerEntity;
+        final Sensor<?> pollTriggerSensor;
+        final Supplier<DslPredicates.DslPredicate> pollCondition;
         SubscriptionHandle subscription;
         private boolean loggedPreviousException = false;
 
         PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) {
-            this(job, handler, period, null, null);
+            this(job, handler, period, null, null, null);
         }
 
-        PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period, Entity sensorSource, Sensor<?> sensor) {
+        PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period, Entity sensorSource, Sensor<?> sensor, Supplier<DslPredicates.DslPredicate> pollCondition) {
             this.handler = handler;
             this.pollPeriod = period;
-            this.sensorSource = sensorSource;
-            this.sensor = sensor;
-            
+            this.pollTriggerEntity = sensorSource;
+            this.pollTriggerSensor = sensor;
+            this.pollCondition = pollCondition;
+
             wrappedJob = new Runnable() {
                 @Override
                 public void run() {
                     try {
+                        if (pollCondition!=null) {
+                            DslPredicates.DslPredicate pc = pollCondition.get();
+                            if (pc!=null) {
+                                if (!pc.apply(BrooklynTaskTags.getContextEntity(Tasks.current()))) {
+                                    if (log.isTraceEnabled()) log.trace("PollJob for {} skipped because condition does not apply", job);
+                                    return;
+                                }
+                            }
+                        }
                         V val = job.call();
                         if (handler.checkSuccess(val)) {
                             handler.onSuccess(val);
@@ -126,8 +178,8 @@ public class Poller<V> {
         oneOffJobs.add(job);
     }
 
-    public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long period) {
-        scheduleAtFixedRate(job, handler, Duration.millis(period));
+    public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long periodMillis) {
+        scheduleAtFixedRate(job, handler, Duration.millis(periodMillis));
     }
     public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) {
         if (started) {
@@ -137,8 +189,8 @@ public class Poller<V> {
         pollJobs.add(foo);
     }
 
-    public void subscribe(Callable<V> job, PollHandler<? super V> handler, Entity sensorSource, Sensor<?> sensor) {
-        pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor));
+    public void subscribe(Callable<V> job, PollHandler<? super V> handler, Entity sensorSource, Sensor<?> sensor, Supplier<DslPredicates.DslPredicate> condition) {
+        pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor, condition));
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -179,28 +231,32 @@ public class Poller<V> {
                 return task;
             };
 
+            ScheduledTask.Builder tb = ScheduledTask.builder(tf)
+                    .cancelOnException(false)
+                    .tag(feed != null ? BrooklynTaskTags.tagForContextAdjunct(feed) : null);
+
             if (pollJob.pollPeriod!=null && pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
-                added =true;
-                ScheduledTask t = ScheduledTask.builder(tf)
-                        .displayName("Periodic: " + scheduleName)
-                        .period(pollJob.pollPeriod)
-                        .cancelOnException(false)
-                        .tag(feed!=null ? BrooklynTaskTags.tagForContextAdjunct(feed) : null)
-                        .build();
-                tasks.add(Entities.submit(entity, t));
+                added = true;
+                tb.displayName("Periodic: " + scheduleName);
+                tb.period(pollJob.pollPeriod);
+
                 if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) {
                     minPeriod = pollJob.pollPeriod;
                 }
+            } else {
+                // if no period, we simply need to run it initially
+                tb.displayName("Initial: "+scheduleName);
             }
+            tasks.add(Entities.submit(entity, tb.build()));
 
-            if (pollJob.sensor!=null) {
+            if (pollJob.pollTriggerSensor !=null) {
                 added = true;
                 if (pollJob.subscription!=null) {
                     throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already has subscription %s",
                             this, entity, pollJob.subscription));
                 }
-                sensors.add(pollJob.sensor.getName());
-                pollJob.subscription = feed.subscriptions().subscribe(pollJob.sensorSource!=null ? pollJob.sensorSource : feed.getEntity(), pollJob.sensor, event -> {
+                sensors.add(pollJob.pollTriggerSensor.getName());
+                pollJob.subscription = feed.subscriptions().subscribe(pollJob.pollTriggerEntity !=null ? pollJob.pollTriggerEntity : feed.getEntity(), pollJob.pollTriggerSensor, event -> {
                     // submit this on every event
                     try {
                         feed.getExecutionContext().submit(tf.call());
@@ -211,7 +267,7 @@ public class Poller<V> {
             }
 
             if (!added) {
-                if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {} and no subscriptions) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
+                if (log.isDebugEnabled()) log.debug("Activating poll (as one-off, as no period and no subscriptions) for {} (using {})", new Object[] {entity, this});
             }
         }
         
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
index b443747356..0d28f31b7f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
@@ -20,40 +20,29 @@ package org.apache.brooklyn.core.sensor;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
 import com.google.common.base.Predicates;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.sensor.Sensor;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.BasicConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.effector.AddSensorInitializer;
-import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityPredicates;
 import org.apache.brooklyn.core.feed.*;
 import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser;
-import org.apache.brooklyn.feed.http.HttpPollConfig;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.predicates.DslPredicates;
 import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
-import org.apache.brooklyn.util.http.HttpToolResponse;
-import org.apache.brooklyn.util.javalang.AtomicReferences;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.commons.lang3.tuple.Pair;
 
 import java.util.*;
-import java.util.concurrent.Callable;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import static com.fasterxml.jackson.databind.type.LogicalType.Collection;
-
 /**
  * Super-class for entity initializers that add feeds.
  */
@@ -62,29 +51,13 @@ public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorF
 
     public static final ConfigKey<Object> SENSOR_TRIGGERS = ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers",
             "Sensors which should trigger this feed, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor");
+    public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", "Optional condition required for this sensor feed to run");
 
     protected AbstractAddTriggerableSensor() {}
     public AbstractAddTriggerableSensor(ConfigBag parameters) {
         super(parameters);
     }
 
-    public static <V> void scheduleWithTriggers(AbstractFeed feed, Poller<V> poller, Callable<V> pollJob, PollHandler<V> handler, long minPeriod, Set<? extends PollConfig> configs) {
-        // the logic for feeds with pollers is unncessarily convoluted; for now we try to standardize by routing calls that take other triggers
-        // through this method; would be nice to clean up (but a big job)
-
-        if (minPeriod>0 && minPeriod < Duration.PRACTICALLY_FOREVER.toMilliseconds()) {
-            poller.scheduleAtFixedRate(pollJob, handler, minPeriod);
-        }
-        for (PollConfig pc: configs) {
-            if (pc.getOtherTriggers()!=null) {
-                List<Pair<Entity, Sensor>> triggersResolved = resolveTriggers(feed.getEntity(), pc.getOtherTriggers());
-                triggersResolved.forEach(pair -> {
-                    poller.subscribe(pollJob, handler, pair.getLeft(), pair.getRight());
-                });
-            }
-        }
-    }
-
     @JsonIgnore
     protected Duration getPeriod(Entity context, ConfigBag config) {
         if (config.containsKey(SENSOR_PERIOD) || !hasTriggers(config)) {
@@ -99,7 +72,8 @@ public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorF
         return Tasks.resolving(config, SENSOR_TRIGGERS).context(context).deep().immediately(true).getMaybe();
     }
 
-    static List<Pair<Entity,Sensor>> resolveTriggers(Entity context, Object otherTriggers) {
+    @Beta
+    public static List<Pair<Entity,Sensor>> resolveTriggers(Entity context, Object otherTriggers) {
         Object triggers = Tasks.resolving(otherTriggers, Object.class).context(context).deep().immediately(true).get();
 
         if (triggers==null || (triggers instanceof Collection && ((Collection)triggers).isEmpty())) return Collections.emptyList();
@@ -174,7 +148,7 @@ public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorF
         @JsonIgnore
         String sensorName;
 
-        // TODO could support predicates on the value
+        // could support predicates on the value; but we do it on the entity which is enough
 
         public void setEntity(Entity entity) {
             this.entity = entity;
@@ -207,7 +181,23 @@ public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorF
                 .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
                 .logWarningGraceTime(logWarningGraceTime)
                 .period(getPeriod(entity, initParams()))
-                .otherTriggers(getTriggersMaybe(entity, configBag).orNull());
+                .otherTriggers(getTriggersMaybe(entity, configBag).orNull())
+                .condition(new ConditionSupplier(configBag, entity));
+    }
+
+    static class ConditionSupplier implements Supplier<DslPredicates.DslPredicate> {
+        final ConfigBag configBag;
+        final Entity entity;
+
+        ConditionSupplier(ConfigBag configBag, Entity entity) {
+            this.configBag = configBag;
+            this.entity = entity;
+        }
+
+        @Override
+        public DslPredicates.DslPredicate get() {
+            return Tasks.resolving(configBag, CONDITION).context(entity).deep().immediately(true).get();
+        }
     }
 
 }
diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
index 79b2b4dd34..c4fab5c647 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
@@ -227,24 +227,7 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
     
     @Override
     protected void preStart() {
-        SetMultimap<CommandPollIdentifier, CommandPollConfig<?>> polls = config().get(POLLS);
-        
-        for (final CommandPollIdentifier pollInfo : polls.keySet()) {
-            Set<CommandPollConfig<?>> configs = polls.get(pollInfo);
-            long minPeriod = Integer.MAX_VALUE;
-            Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
-
-            for (CommandPollConfig<?> config : configs) {
-                handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
-                if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
-            }
-
-            AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), new Callable<SshPollValue>() {
-                @Override
-                public SshPollValue call() throws Exception {
-                    return exec(pollInfo.command.get(), pollInfo.env.get());
-                }}, new DelegatingPollHandler(handlers), minPeriod, configs);
-        }
+        getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> exec(pollInfo.command.get(), pollInfo.env.get()));
     }
     
     @Override
diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
index f9f2a1867a..378bfe6b56 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
@@ -33,7 +33,11 @@ import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.core.feed.AttributePollHandler;
 import org.apache.brooklyn.core.feed.DelegatingPollHandler;
 import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
+import org.apache.brooklyn.util.core.javalang.BrooklynHttpConfig;
 import org.apache.brooklyn.util.http.HttpToolResponse;
+import org.apache.brooklyn.util.http.auth.UsernamePassword;
+import org.apache.brooklyn.util.http.executor.HttpRequest;
+import org.apache.brooklyn.util.http.executor.HttpResponse;
 import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -200,18 +204,6 @@ public class FunctionFeed extends AbstractFeed {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     protected void preStart() {
-        SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?, ?>> polls = getConfig(POLLS);
-        for (final FunctionPollIdentifier pollInfo : polls.keySet()) {
-            Set<FunctionPollConfig<?,?>> configs = polls.get(pollInfo);
-            long minPeriod = Integer.MAX_VALUE;
-            Set<AttributePollHandler<?>> handlers = Sets.newLinkedHashSet();
-
-            for (FunctionPollConfig<?,?> config : configs) {
-                handlers.add(new AttributePollHandler(config, entity, this));
-                if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
-            }
-
-            AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), (Callable)pollInfo.job, new DelegatingPollHandler(handlers), minPeriod, configs);
-        }
+        getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> pollInfo.job);
     }
 }
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
index 4d1a13ffa1..4a17f04b84 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
@@ -375,53 +375,27 @@ public class HttpFeed extends AbstractFeed {
 
     @Override
     protected void preStart() {
-        SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = getConfig(POLLS);
+        getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> {
+                if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
 
-        for (final HttpPollIdentifier pollInfo : polls.keySet()) {
-            // Though HttpClients are thread safe and can take advantage of connection pooling
-            // and authentication caching, the httpcomponents documentation says:
-            //    "While HttpClient instances are thread safe and can be shared between multiple
-            //     threads of execution, it is highly recommended that each thread maintains its
-            //     own dedicated instance of HttpContext.
-            //  http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
-
-            Set<HttpPollConfig<?>> configs = polls.get(pollInfo);
-            long minPeriod = Integer.MAX_VALUE;
-            Set<AttributePollHandler<? super HttpToolResponse>> handlers = Sets.newLinkedHashSet();
-
-            for (HttpPollConfig<?> config : configs) {
-                handlers.add(new AttributePollHandler<HttpToolResponse>(config, entity, this));
-                if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
-            }
-
-            Callable<HttpToolResponse> pollJob;
-            pollJob = new Callable<HttpToolResponse>() {
-                @Override
-                public HttpToolResponse call() throws Exception {
-                    if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
-
-                    UsernamePassword creds = null;
-                    if (pollInfo.credentials.isPresent()) {
-                        creds =  new UsernamePassword(
-                                pollInfo.credentials.get().getUserPrincipal().getName(),
-                                pollInfo.credentials.get().getPassword());
-                    }
-
-                    final long startTime = System.currentTimeMillis();
-                    HttpResponse response =  pollInfo.httpExecutor.execute(new HttpRequest.Builder()
-                            .headers(pollInfo.headers)
-                            .uri(pollInfo.uriProvider.get())
-                            .credentials(creds)
-                            .method(pollInfo.method)
-                            .body(pollInfo.body)
-                            .config(BrooklynHttpConfig.httpConfigBuilder(getEntity()).build())
-                            .build());
-                    return createHttpToolRespose(response, startTime);
+                UsernamePassword creds = null;
+                if (pollInfo.credentials.isPresent()) {
+                    creds = new UsernamePassword(
+                            pollInfo.credentials.get().getUserPrincipal().getName(),
+                            pollInfo.credentials.get().getPassword());
                 }
-            };
 
-            AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod, configs);
-        }
+                final long startTime = System.currentTimeMillis();
+                HttpResponse response = pollInfo.httpExecutor.execute(new HttpRequest.Builder()
+                        .headers(pollInfo.headers)
+                        .uri(pollInfo.uriProvider.get())
+                        .credentials(creds)
+                        .method(pollInfo.method)
+                        .body(pollInfo.body)
+                        .config(BrooklynHttpConfig.httpConfigBuilder(getEntity()).build())
+                        .build());
+                return createHttpToolRespose(response, startTime);
+        });
     }
 
     @Override