You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:15 UTC

[27/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to o.a.b.feed and o.a.b.core.feed

Rename o.a.b.sensor.feed to o.a.b.feed and o.a.b.core.feed

Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/daf40919
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/daf40919
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/daf40919

Branch: refs/heads/master
Commit: daf40919b6fa20f16b6cd7d9eb1ad9f79baaa8f4
Parents: 2a78e27
Author: Aled Sage <al...@gmail.com>
Authored: Wed Aug 19 22:56:13 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Wed Aug 19 22:56:13 2015 +0100

----------------------------------------------------------------------
 .../brooklyn/core/entity/AbstractEntity.java    |   4 +-
 .../apache/brooklyn/core/feed/AbstractFeed.java | 240 ++++++
 .../core/feed/AttributePollHandler.java         | 248 +++++++
 .../brooklyn/core/feed/ConfigToAttributes.java  |  59 ++
 .../core/feed/DelegatingPollHandler.java        |  96 +++
 .../apache/brooklyn/core/feed/FeedConfig.java   | 297 ++++++++
 .../apache/brooklyn/core/feed/PollConfig.java   |  85 +++
 .../apache/brooklyn/core/feed/PollHandler.java  |  38 +
 .../org/apache/brooklyn/core/feed/Poller.java   | 205 ++++++
 .../mgmt/rebind/BasicEntityRebindSupport.java   |   2 +-
 .../mgmt/rebind/BasicFeedRebindSupport.java     |   2 +-
 .../core/mgmt/rebind/RebindIteration.java       |   2 +-
 .../mgmt/rebind/dto/MementosGenerators.java     |   2 +-
 .../sensor/AttributeSensorAndConfigKey.java     |   2 +-
 .../brooklyn/core/sensor/HttpRequestSensor.java |   6 +-
 .../entity/group/DynamicMultiGroupImpl.java     |   4 +-
 .../brooklyn/entity/stock/DataEntityImpl.java   |   4 +-
 .../brooklyn/feed/function/FunctionFeed.java    | 208 ++++++
 .../feed/function/FunctionPollConfig.java       | 111 +++
 .../org/apache/brooklyn/feed/http/HttpFeed.java | 382 ++++++++++
 .../brooklyn/feed/http/HttpPollConfig.java      | 160 ++++
 .../brooklyn/feed/http/HttpPollValue.java       |  40 +
 .../apache/brooklyn/feed/http/HttpPolls.java    |  39 +
 .../brooklyn/feed/http/HttpValueFunctions.java  | 154 ++++
 .../brooklyn/feed/http/JsonFunctions.java       | 235 ++++++
 .../apache/brooklyn/feed/shell/ShellFeed.java   | 273 +++++++
 .../brooklyn/feed/shell/ShellPollConfig.java    | 125 ++++
 .../org/apache/brooklyn/feed/ssh/SshFeed.java   | 290 ++++++++
 .../apache/brooklyn/feed/ssh/SshPollConfig.java | 142 ++++
 .../apache/brooklyn/feed/ssh/SshPollValue.java  |  60 ++
 .../brooklyn/feed/ssh/SshValueFunctions.java    |  73 ++
 .../windows/WindowsPerformanceCounterFeed.java  | 412 +++++++++++
 .../WindowsPerformanceCounterPollConfig.java    |  53 ++
 .../brooklyn/sensor/feed/AbstractFeed.java      | 240 ------
 .../sensor/feed/AttributePollHandler.java       | 248 -------
 .../sensor/feed/ConfigToAttributes.java         |  59 --
 .../sensor/feed/DelegatingPollHandler.java      |  96 ---
 .../apache/brooklyn/sensor/feed/FeedConfig.java | 297 --------
 .../apache/brooklyn/sensor/feed/PollConfig.java |  85 ---
 .../brooklyn/sensor/feed/PollHandler.java       |  38 -
 .../org/apache/brooklyn/sensor/feed/Poller.java | 205 ------
 .../sensor/feed/function/FunctionFeed.java      | 208 ------
 .../feed/function/FunctionPollConfig.java       | 111 ---
 .../brooklyn/sensor/feed/http/HttpFeed.java     | 382 ----------
 .../sensor/feed/http/HttpPollConfig.java        | 160 ----
 .../sensor/feed/http/HttpPollValue.java         |  40 -
 .../brooklyn/sensor/feed/http/HttpPolls.java    |  39 -
 .../sensor/feed/http/HttpValueFunctions.java    | 154 ----
 .../sensor/feed/http/JsonFunctions.java         | 235 ------
 .../brooklyn/sensor/feed/shell/ShellFeed.java   | 273 -------
 .../sensor/feed/shell/ShellPollConfig.java      | 125 ----
 .../brooklyn/sensor/feed/ssh/SshFeed.java       | 290 --------
 .../brooklyn/sensor/feed/ssh/SshPollConfig.java | 142 ----
 .../brooklyn/sensor/feed/ssh/SshPollValue.java  |  60 --
 .../sensor/feed/ssh/SshValueFunctions.java      |  73 --
 .../windows/WindowsPerformanceCounterFeed.java  | 412 -----------
 .../WindowsPerformanceCounterPollConfig.java    |  53 --
 .../util/core/http/HttpToolResponse.java        |   2 +-
 .../core/feed/ConfigToAttributesTest.java       |  70 ++
 .../apache/brooklyn/core/feed/PollerTest.java   | 108 +++
 .../core/location/TestPortSupplierLocation.java |   2 +-
 .../core/mgmt/rebind/RebindFeedTest.java        |  16 +-
 .../feed/function/FunctionFeedTest.java         | 315 ++++++++
 .../feed/http/HttpFeedIntegrationTest.java      | 160 ++++
 .../apache/brooklyn/feed/http/HttpFeedTest.java | 392 ++++++++++
 .../feed/http/HttpValueFunctionsTest.java       |  94 +++
 .../brooklyn/feed/http/JsonFunctionsTest.java   | 130 ++++
 .../feed/shell/ShellFeedIntegrationTest.java    | 226 ++++++
 .../feed/ssh/SshFeedIntegrationTest.java        | 264 +++++++
 .../WindowsPerformanceCounterFeedLiveTest.java  | 104 +++
 .../WindowsPerformanceCounterFeedTest.java      | 132 ++++
 .../sensor/feed/ConfigToAttributesTest.java     |  70 --
 .../apache/brooklyn/sensor/feed/PollerTest.java | 108 ---
 .../sensor/feed/function/FunctionFeedTest.java  | 315 --------
 .../feed/http/HttpFeedIntegrationTest.java      | 160 ----
 .../brooklyn/sensor/feed/http/HttpFeedTest.java | 392 ----------
 .../feed/http/HttpValueFunctionsTest.java       |  94 ---
 .../sensor/feed/http/JsonFunctionsTest.java     | 130 ----
 .../feed/shell/ShellFeedIntegrationTest.java    | 226 ------
 .../sensor/feed/ssh/SshFeedIntegrationTest.java | 264 -------
 .../WindowsPerformanceCounterFeedLiveTest.java  | 104 ---
 .../WindowsPerformanceCounterFeedTest.java      | 132 ----
 .../policy/enricher/HttpLatencyDetector.java    |   6 +-
 .../entity/database/derby/DerbyDatabase.java    |   2 +-
 .../entity/database/derby/DerbySchema.java      |   6 +-
 .../postgresql/PostgreSqlNodeSaltImpl.java      |   4 +-
 .../entity/salt/SaltStackMasterImpl.java        |   3 +-
 .../entity/monitoring/zabbix/ZabbixFeed.java    |  10 +-
 .../monitoring/zabbix/ZabbixPollConfig.java     |   6 +-
 .../monitoring/zabbix/ZabbixServerImpl.java     |   6 +-
 .../nosql/hazelcast/HazelcastNodeImpl.java      |   6 +-
 .../brooklynnode/BrooklynClusterImpl.java       |   4 +-
 .../brooklynnode/BrooklynEntityMirrorImpl.java  |   4 +-
 .../entity/brooklynnode/BrooklynNodeImpl.java   |  10 +-
 .../SetHighAvailabilityModeEffectorBody.java    |   4 +-
 .../brooklyn/entity/chef/ChefAttributeFeed.java |   8 +-
 .../entity/chef/ChefAttributePollConfig.java    |   2 +-
 .../brooklyn/entity/java/JavaAppUtils.java      |   6 +-
 .../entity/java/JmxAttributeSensor.java         |   6 +-
 .../apache/brooklyn/entity/java/JmxSupport.java |   2 +-
 .../entity/java/VanillaJavaAppImpl.java         |   2 +-
 .../entity/machine/MachineEntityImpl.java       |   6 +-
 .../base/AbstractSoftwareProcessSshDriver.java  |   2 +-
 .../software/base/SoftwareProcessImpl.java      |   4 +-
 .../MachineLifecycleEffectorTasks.java          |   2 +-
 .../feed/jmx/JmxAttributePollConfig.java        |  74 ++
 .../org/apache/brooklyn/feed/jmx/JmxFeed.java   | 423 +++++++++++
 .../org/apache/brooklyn/feed/jmx/JmxHelper.java | 724 +++++++++++++++++++
 .../feed/jmx/JmxNotificationFilters.java        |  64 ++
 .../jmx/JmxNotificationSubscriptionConfig.java  |  95 +++
 .../feed/jmx/JmxOperationPollConfig.java        | 121 ++++
 .../brooklyn/feed/jmx/JmxValueFunctions.java    |  95 +++
 .../sensor/feed/jmx/JmxAttributePollConfig.java |  74 --
 .../brooklyn/sensor/feed/jmx/JmxFeed.java       | 423 -----------
 .../brooklyn/sensor/feed/jmx/JmxHelper.java     | 724 -------------------
 .../sensor/feed/jmx/JmxNotificationFilters.java |  64 --
 .../jmx/JmxNotificationSubscriptionConfig.java  |  95 ---
 .../sensor/feed/jmx/JmxOperationPollConfig.java | 121 ----
 .../sensor/feed/jmx/JmxValueFunctions.java      |  95 ---
 .../brooklyn/sensor/ssh/SshCommandSensor.java   |   6 +-
 .../winrm/WindowsPerformanceCounterSensors.java |   2 +-
 .../BrooklynNodeIntegrationTest.java            |   2 +-
 .../entity/brooklynnode/BrooklynNodeTest.java   |   2 +-
 .../brooklynnode/SameBrooklynNodeImpl.java      |   6 +-
 .../brooklynnode/SelectMasterEffectorTest.java  |   6 +-
 .../mysql/ChefSoloDriverToyMySqlEntity.java     |   4 +-
 .../brooklyn/entity/java/EntityPollingTest.java |   4 +-
 .../entity/java/VanillaJavaAppTest.java         |   2 +-
 .../base/lifecycle/ScriptHelperTest.java        |   4 +-
 .../software/base/test/jmx/JmxService.java      |   2 +-
 .../apache/brooklyn/feed/jmx/JmxFeedTest.java   | 422 +++++++++++
 .../apache/brooklyn/feed/jmx/JmxHelperTest.java | 311 ++++++++
 .../brooklyn/feed/jmx/RebindJmxFeedTest.java    | 148 ++++
 .../brooklyn/sensor/feed/jmx/JmxFeedTest.java   | 422 -----------
 .../brooklyn/sensor/feed/jmx/JmxHelperTest.java | 311 --------
 .../sensor/feed/jmx/RebindJmxFeedTest.java      | 148 ----
 .../entity/database/crate/CrateNodeImpl.java    |   8 +-
 .../database/mariadb/MariaDbNodeImpl.java       |   6 +-
 .../entity/database/mysql/MySqlClusterImpl.java |   4 +-
 .../entity/database/mysql/MySqlNodeImpl.java    |   6 +-
 .../PostgreSqlNodeChefImplFromScratch.java      |   4 +-
 .../messaging/activemq/ActiveMQBrokerImpl.java  |   4 +-
 .../activemq/ActiveMQDestinationImpl.java       |   4 +-
 .../messaging/activemq/ActiveMQQueueImpl.java   |   4 +-
 .../entity/messaging/kafka/KafkaBrokerImpl.java |   6 +-
 .../messaging/kafka/KafkaClusterImpl.java       |   2 +-
 .../entity/messaging/qpid/QpidBrokerImpl.java   |   6 +-
 .../messaging/qpid/QpidDestinationImpl.java     |   4 +-
 .../entity/messaging/qpid/QpidQueueImpl.java    |   4 +-
 .../entity/messaging/rabbit/RabbitQueue.java    |   6 +-
 .../entity/messaging/storm/StormImpl.java       |   4 +-
 .../entity/zookeeper/AbstractZooKeeperImpl.java |   6 +-
 .../entity/monitoring/monit/MonitNodeImpl.java  |   6 +-
 .../nosql/cassandra/CassandraNodeImpl.java      |  12 +-
 .../nosql/couchbase/CouchbaseClusterImpl.java   |   8 +-
 .../nosql/couchbase/CouchbaseNodeImpl.java      |   8 +-
 .../nosql/couchbase/CouchbaseNodeSshDriver.java |   2 +-
 .../couchbase/CouchbaseSyncGatewayImpl.java     |   6 +-
 .../entity/nosql/couchdb/CouchDBNodeImpl.java   |   6 +-
 .../elasticsearch/ElasticSearchNodeImpl.java    |   8 +-
 .../entity/nosql/mongodb/MongoDBServerImpl.java |   4 +-
 .../mongodb/sharding/MongoDBRouterImpl.java     |   4 +-
 .../entity/nosql/redis/RedisStoreImpl.java      |   8 +-
 .../entity/nosql/riak/RiakNodeImpl.java         |   6 +-
 .../entity/nosql/solr/SolrServerImpl.java       |   6 +-
 .../ElasticSearchClusterIntegrationTest.java    |   2 +-
 .../ElasticSearchNodeIntegrationTest.java       |   2 +-
 .../entity/osgi/karaf/KarafContainerImpl.java   |  10 +-
 .../entity/proxy/AbstractControllerImpl.java    |   2 +-
 .../AbstractNonProvisionedControllerImpl.java   |   2 +-
 .../entity/proxy/nginx/NginxControllerImpl.java |   8 +-
 .../ControlledDynamicWebAppClusterImpl.java     |   2 +-
 .../entity/webapp/jboss/JBoss6ServerImpl.java   |   4 +-
 .../entity/webapp/jboss/JBoss7ServerImpl.java   |   6 +-
 .../entity/webapp/jetty/Jetty6ServerImpl.java   |   4 +-
 .../webapp/nodejs/NodeJsWebAppServiceImpl.java  |   8 +-
 .../entity/webapp/tomcat/TomcatServerImpl.java  |   4 +-
 .../qa/load/SimulatedJBoss7ServerImpl.java      |  10 +-
 .../qa/load/SimulatedMySqlNodeImpl.java         |   4 +-
 .../qa/load/SimulatedNginxControllerImpl.java   |  10 +-
 .../brooklynnode/DeployBlueprintTest.java       |   2 +-
 181 files changed, 8717 insertions(+), 8718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index 994961c..0ec5903 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -63,6 +63,8 @@ import org.apache.brooklyn.core.entity.internal.EntityConfigMap;
 import org.apache.brooklyn.core.entity.lifecycle.PolicyDescriptor;
 import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
 import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.ConfigToAttributes;
 import org.apache.brooklyn.core.internal.BrooklynInitialization;
 import org.apache.brooklyn.core.internal.storage.BrooklynStorage;
 import org.apache.brooklyn.core.internal.storage.Reference;
@@ -82,8 +84,6 @@ import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
 import org.apache.brooklyn.core.sensor.BasicNotificationSensor;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
-import org.apache.brooklyn.sensor.feed.ConfigToAttributes;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
new file mode 100644
index 0000000..a31b73e
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento;
+import org.apache.brooklyn.api.sensor.Feed;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.BrooklynFeatureEnablement;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.rebind.BasicFeedRebindSupport;
+import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** 
+ * Captures common fields and processes for sensor feeds.
+ * These generally poll or subscribe to get sensor values for an entity.
+ * They make it easy to poll over http, jmx, etc.
+ */
+public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractFeed.class);
+
+    public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("feed.onlyIfServiceUp", "", false);
+    
+    private final Object pollerStateMutex = new Object();
+    private transient volatile Poller<?> poller;
+    private transient volatile boolean activated;
+    private transient volatile boolean suspended;
+
+    public AbstractFeed() {
+    }
+    
+    /**
+     * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)}
+     */
+    @Deprecated
+    public AbstractFeed(EntityLocal entity) {
+        this(entity, false);
+    }
+    
+    /**
+     * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)} and {@code setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp)}
+     */
+    @Deprecated
+    public AbstractFeed(EntityLocal entity, boolean onlyIfServiceUp) {
+        this.entity = checkNotNull(entity, "entity");
+        setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp);
+    }
+
+    // Ensure idempotent, as called in builders (in case not registered with entity), and also called
+    // when registering with entity
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+        if (BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_FEED_REGISTRATION_PROPERTY)) {
+            ((EntityInternal)entity).feeds().addFeed(this);
+        }
+    }
+
+    protected void initUniqueTag(String uniqueTag, Object ...valsForDefault) {
+        if (Strings.isNonBlank(uniqueTag)) this.uniqueTag = uniqueTag;
+        else this.uniqueTag = getDefaultUniqueTag(valsForDefault);
+    }
+
+    protected String getDefaultUniqueTag(Object ...valsForDefault) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(JavaClassNames.simpleClassName(this));
+        if (valsForDefault.length==0) {
+            sb.append("@");
+            sb.append(hashCode());
+        } else if (valsForDefault.length==1 && valsForDefault[0] instanceof Collection){
+            sb.append(Strings.toUniqueString(valsForDefault[0], 80));
+        } else {
+            sb.append("[");
+            boolean first = true;
+            for (Object x: valsForDefault) {
+                if (!first) sb.append(";");
+                else first = false;
+                sb.append(Strings.toUniqueString(x, 80));
+            }
+            sb.append("]");
+        }
+        return sb.toString(); 
+    }
+
+    @Override
+    public void start() {
+        if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity);
+        if (activated) { 
+            throw new IllegalStateException(String.format("Attempt to start feed %s of entity %s when already running", 
+                    this, entity));
+        }
+        if (poller != null) {
+            throw new IllegalStateException(String.format("Attempt to re-start feed %s of entity %s", this, entity));
+        }
+        
+        poller = new Poller<Object>(entity, getConfig(ONLY_IF_SERVICE_UP));
+        activated = true;
+        preStart();
+        synchronized (pollerStateMutex) {
+            // don't start poller if we are suspended
+            if (!suspended) {
+                poller.start();
+            }
+        }
+    }
+
+    @Override
+    public void suspend() {
+        synchronized (pollerStateMutex) {
+            if (activated && !suspended) {
+                poller.stop();
+            }
+            suspended = true;
+        }
+    }
+    
+    @Override
+    public void resume() {
+        synchronized (pollerStateMutex) {
+            if (activated && suspended) {
+                poller.start();
+            }
+            suspended = false;
+        }
+    }
+    
+    @Override
+    public void destroy() {
+        stop();
+    }
+
+    @Override
+    public void stop() {
+        if (!activated) { 
+            log.debug("Ignoring attempt to stop feed {} of entity {} when not running", this, entity);
+            return;
+        }
+        if (log.isDebugEnabled()) log.debug("stopping feed {} for {}", this, entity);
+        
+        activated = false;
+        preStop();
+        synchronized (pollerStateMutex) {
+            if (!suspended) {
+                poller.stop();
+            }
+        }
+        postStop();
+        super.destroy();
+    }
+
+    @Override
+    public boolean isActivated() {
+        return activated;
+    }
+    
+    public EntityLocal getEntity() {
+        return entity;
+    }
+    
+    protected boolean isConnected() {
+        // TODO Default impl will result in multiple logs for same error if becomes unreachable
+        // (e.g. if ssh gets NoRouteToHostException, then every AttributePollHandler for that
+        // feed will log.warn - so if polling for 10 sensors/attributes will get 10 log messages).
+        // Would be nice if reduced this logging duplication.
+        // (You can reduce it by providing a better 'isConnected' implementation of course.)
+        return isRunning() && entity!=null && !((EntityInternal)entity).getManagementSupport().isNoLongerManaged();
+    }
+
+    @Override
+    public boolean isSuspended() {
+        return suspended;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return isActivated() && !isSuspended() && !isDestroyed() && getPoller()!=null && getPoller().isRunning();
+    }
+
+    @Override
+    public RebindSupport<FeedMemento> getRebindSupport() {
+        return new BasicFeedRebindSupport(this);
+    }
+
+    @Override
+    protected void onChanged() {
+        // TODO Auto-generated method stub
+    }
+
+    /**
+     * For overriding.
+     */
+    protected void preStart() {
+    }
+    
+    /**
+     * For overriding.
+     */
+    protected void preStop() {
+    }
+    
+    /**
+     * For overriding.
+     */
+    protected void postStop() {
+    }
+    
+    /**
+     * For overriding, where sub-class can change return-type generics!
+     */
+    protected Poller<?> getPoller() {
+        return poller;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
new file mode 100644
index 0000000..c266836
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+
+/**
+ * Handler for when polling an entity's attribute. On each poll result the entity's attribute is set.
+ * 
+ * Calls to onSuccess and onError will happen sequentially, but may be called from different threads 
+ * each time. Note that no guarantees of a synchronized block exist, so additional synchronization 
+ * would be required for the Java memory model's "happens before" relationship.
+ * 
+ * @author aled
+ */
+public class AttributePollHandler<V> implements PollHandler<V> {
+
+    public static final Logger log = LoggerFactory.getLogger(AttributePollHandler.class);
+
+    private final FeedConfig<V,?,?> config;
+    private final EntityLocal entity;
+    @SuppressWarnings("rawtypes")
+    private final AttributeSensor sensor;
+    private final AbstractFeed feed;
+    private final boolean suppressDuplicates;
+    
+    // allow 30 seconds before logging at WARN, if there has been no success yet;
+    // after success WARN immediately
+    // TODO these should both be configurable
+    private Duration logWarningGraceTimeOnStartup = Duration.THIRTY_SECONDS;
+    private Duration logWarningGraceTime = Duration.millis(0);
+    
+    // internal state to look after whether to log warnings
+    private volatile Long lastSuccessTime = null;
+    private volatile Long currentProblemStartTime = null;
+    private volatile boolean currentProblemLoggedAsWarning = false;
+    private volatile boolean lastWasProblem = false;
+
+    
+    public AttributePollHandler(FeedConfig<V,?,?> config, EntityLocal entity, AbstractFeed feed) {
+        this.config = checkNotNull(config, "config");
+        this.entity = checkNotNull(entity, "entity");
+        this.sensor = checkNotNull(config.getSensor(), "sensor");
+        this.feed = checkNotNull(feed, "feed");
+        this.suppressDuplicates = config.getSupressDuplicates();
+    }
+
+    @Override
+    public boolean checkSuccess(V val) {
+        // Always true if no checkSuccess predicate was configured.
+        return !config.hasCheckSuccessHandler() || config.getCheckSuccess().apply(val);
+    }
+
+    @Override
+    public void onSuccess(V val) {
+        if (lastWasProblem) {
+            if (currentProblemLoggedAsWarning) { 
+                log.info("Success (following previous problem) reading "+getBriefDescription());
+            } else {
+                log.debug("Success (following previous problem) reading "+getBriefDescription());
+            }
+            lastWasProblem = false;
+            currentProblemStartTime = null;
+            currentProblemLoggedAsWarning = false;
+        }
+        lastSuccessTime = System.currentTimeMillis();
+        if (log.isTraceEnabled()) log.trace("poll for {} got: {}", new Object[] {getBriefDescription(), val});
+        
+        try {
+            setSensor(transformValueOnSuccess(val));
+        } catch (Exception e) {
+            if (feed.isConnected()) {
+                log.warn("unable to compute "+getBriefDescription()+"; on val="+val, e);
+            } else {
+                if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; val="+val+" (when inactive)", e);
+            }
+        }
+    }
+
+    /** allows post-processing, such as applying a success handler; 
+     * default applies the onSuccess handler (which is recommended) */
+    protected Object transformValueOnSuccess(V val) {
+        return config.hasSuccessHandler() ? config.getOnSuccess().apply(val) : val;
+    }
+
+    @Override
+    public void onFailure(V val) {
+        if (!config.hasFailureHandler()) {
+            onException(new Exception("checkSuccess of "+this+" for "+getBriefDescription()+" was false but poller has no failure handler"));
+        } else {
+            logProblem("failure", val);
+
+            try {
+                setSensor(config.hasFailureHandler() ? config.getOnFailure().apply((V)val) : val);
+            } catch (Exception e) {
+                if (feed.isConnected()) {
+                    log.warn("Error computing " + getBriefDescription() + "; val=" + val+": "+ e, e);
+                } else {
+                    if (log.isDebugEnabled())
+                        log.debug("Error computing " + getBriefDescription() + "; val=" + val + " (when inactive)", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onException(Exception exception) {
+        if (!feed.isConnected()) {
+            if (log.isTraceEnabled()) log.trace("Read of {} in {} gave exception (while not connected or not yet connected): {}", new Object[] {this, getBriefDescription(), exception});
+        } else {
+            logProblem("exception", exception);
+        }
+
+        if (config.hasExceptionHandler()) {
+            try {
+                setSensor( config.getOnException().apply(exception) );
+            } catch (Exception e) {
+                if (feed.isConnected()) {
+                    log.warn("unable to compute "+getBriefDescription()+"; on exception="+exception, e);
+                } else {
+                    if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; exception="+exception+" (when inactive)", e);
+                }
+            }
+        }
+    }
+
+    protected void logProblem(String type, Object val) {
+        if (lastWasProblem && currentProblemLoggedAsWarning) {
+            if (log.isTraceEnabled())
+                log.trace("Recurring {} reading {} in {}: {}", new Object[] {type, this, getBriefDescription(), val});
+        } else {
+            long nowTime = System.currentTimeMillis();
+            // get a non-volatile value
+            Long currentProblemStartTimeCache = currentProblemStartTime;
+            long expiryTime = 
+                    (lastSuccessTime!=null && !isTransitioningOrStopped()) ? lastSuccessTime+logWarningGraceTime.toMilliseconds() :
+                    currentProblemStartTimeCache!=null ? currentProblemStartTimeCache+logWarningGraceTimeOnStartup.toMilliseconds() :
+                    nowTime+logWarningGraceTimeOnStartup.toMilliseconds();
+            if (!lastWasProblem) {
+                if (expiryTime <= nowTime) {
+                    currentProblemLoggedAsWarning = true;
+                    if (entity==null || !Entities.isNoLongerManaged(entity)) {
+                        log.warn("Read of " + getBriefDescription() + " gave " + type + ": " + val);
+                    } else {
+                        log.debug("Read of " + getBriefDescription() + " gave " + type + ": " + val);
+                    }
+                    if (log.isDebugEnabled() && val instanceof Throwable)
+                        log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
+                } else {
+                    if (log.isDebugEnabled())
+                        log.debug("Read of " + getBriefDescription() + " gave " + type + " (in grace period): " + val);
+                }
+                lastWasProblem = true;
+                currentProblemStartTime = nowTime;
+            } else {
+                if (expiryTime <= nowTime) {
+                    currentProblemLoggedAsWarning = true;
+                    log.warn("Read of " + getBriefDescription() + " gave " + type + 
+                            " (grace period expired, occurring for "+Duration.millis(nowTime - currentProblemStartTimeCache)+
+                            (config.hasExceptionHandler() ? "" : ", no exception handler set for sensor")+
+                            ")"+
+                            ": " + val);
+                    if (log.isDebugEnabled() && val instanceof Throwable)
+                        log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
+                } else {
+                    if (log.isDebugEnabled()) 
+                        log.debug("Recurring {} reading {} in {} (still in grace period): {}", new Object[] {type, this, getBriefDescription(), val});
+                }
+            }
+        }
+    }
+
+    protected boolean isTransitioningOrStopped() {
+        if (entity==null) return false;
+        Transition expected = entity.getAttribute(Attributes.SERVICE_STATE_EXPECTED);
+        if (expected==null) return false;
+        return (expected.getState()==Lifecycle.STARTING || expected.getState()==Lifecycle.STOPPING || expected.getState()==Lifecycle.STOPPED);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void setSensor(Object v) {
+        if (Entities.isNoLongerManaged(entity)) {
+            if (Tasks.isInterrupted()) return;
+            log.warn(""+entity+" is not managed; feed "+this+" setting "+sensor+" to "+v+" at this time is not supported ("+Tasks.current()+")");
+        }
+        
+        if (v == FeedConfig.UNCHANGED) {
+            // nothing
+        } else if (v == FeedConfig.REMOVE) {
+            ((EntityInternal)entity).removeAttribute(sensor);
+        } else if (sensor == FeedConfig.NO_SENSOR) {
+            // nothing
+        } else {
+            Object coercedV = TypeCoercions.coerce(v, sensor.getType());
+            if (suppressDuplicates && Objects.equal(coercedV, entity.getAttribute(sensor))) {
+                // no change; nothing
+            } else {
+                entity.setAttribute(sensor, coercedV);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return super.toString()+"["+getDescription()+"]";
+    }
+    
+    @Override
+    public String getDescription() {
+        return sensor.getName()+" @ "+entity.getId()+" <- "+config;
+    }
+    
+    protected String getBriefDescription() {
+        return ""+entity+"->"+(sensor==FeedConfig.NO_SENSOR ? "(dynamic sensors)" : ""+sensor);
+    }
+        
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java b/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java
new file mode 100644
index 0000000..d455e80
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.sensor.TemplatedStringAttributeSensorAndConfigKey;
+
+
+/** Simple config adapter for setting {@link AttributeSensorAndConfigKey} sensor values from the config value or config default */ 
+public class ConfigToAttributes {
+
+    //normally just applied once, statically, not registered...
+    public static void apply(EntityLocal entity) {
+        for (Sensor<?> it : entity.getEntityType().getSensors()) {
+            if (it instanceof AttributeSensorAndConfigKey) {
+                apply(entity, (AttributeSensorAndConfigKey<?,?>)it);
+            }
+        }
+    }
+
+    /**
+     * Convenience for ensuring an individual sensor is set from its config key
+     * (e.g. sub-classes of DynamicWebAppCluster that don't want to set HTTP_PORT etc!)
+     */
+    public static <T> T apply(EntityLocal entity, AttributeSensorAndConfigKey<?,T> key) {
+        T v = entity.getAttribute(key);
+        if (v!=null) return v;
+        v = key.getAsSensorValue(entity);
+        if (v!=null) entity.setAttribute(key, v);
+        return v;
+    }
+
+    /**
+     * Convenience for transforming a config value (e.g. processing a {@link TemplatedStringAttributeSensorAndConfigKey}),
+     * outside of the context of an entity.
+     */
+    public static <T> T transform(ManagementContext managementContext, AttributeSensorAndConfigKey<?,T> key) {
+        return key.getAsSensorValue(managementContext);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java
new file mode 100644
index 0000000..fae7dd6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A poll handler that delegates each call to a set of poll handlers.
+ * 
+ * @author aled
+ */
+public class DelegatingPollHandler<V> implements PollHandler<V> {
+
+    private final List<AttributePollHandler<? super V>> delegates;
+
+    public DelegatingPollHandler(Iterable<AttributePollHandler<? super V>> delegates) {
+        super();
+        this.delegates = ImmutableList.copyOf(delegates);
+    }
+
+    @Override
+    public boolean checkSuccess(V val) {
+        for (AttributePollHandler<? super V> delegate : delegates) {
+            if (!delegate.checkSuccess(val))
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void onSuccess(V val) {
+        for (AttributePollHandler<? super V> delegate : delegates) {
+            delegate.onSuccess(val);
+        }
+    }
+
+    @Override
+    public void onFailure(V val) {
+        for (AttributePollHandler<? super V> delegate : delegates) {
+            delegate.onFailure(val);
+        }
+    }
+
+    @Override
+    public void onException(Exception exception) {
+        for (AttributePollHandler<? super V> delegate : delegates) {
+            delegate.onException(exception);
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return super.toString()+"["+getDescription()+"]";
+    }
+    
+    @Override
+    public String getDescription() {
+        if (delegates.isEmpty())
+            return "(empty delegate list)";
+        if (delegates.size()==1) 
+            return delegates.get(0).getDescription();
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        int count = 0;
+        for (AttributePollHandler<? super V> delegate : delegates) {
+            if (count>0) sb.append("; ");
+            sb.append(delegate.getDescription());
+            if (count>2) {
+                sb.append("; ...");
+                break;
+            }
+            count++;
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
new file mode 100644
index 0000000..4d06680
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+import org.apache.brooklyn.util.text.Strings;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+
+/**
+ * Configuration for a poll, or a subscription etc, that is being added to a feed.
+ * 
+ * @author aled
+ */
+public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
+
+    /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. 
+     * @deprecated since 0.7.0 use UNCHANGED */
+    public static final Object UNSET = Entities.UNCHANGED;
+    /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. */ 
+    public static final Object UNCHANGED = Entities.UNCHANGED;
+    /** The onSuccess or onError functions can return this value to indicate that the sensor value should be removed
+     * (cf 'null', but useful in dynamic situations) */ 
+    public static final Object REMOVE = Entities.REMOVE;
+    
+    /** Indicates that no sensor is being used here. This sensor is suppressed,
+     * but is useful where you want to use the feeds with custom success/exception/failure functions
+     * which directly set multiple sensors, e.g. dynamically based on the poll response.
+     * <p>
+     * See {@link HttpPollConfig#forMultiple()} and its usages.
+     * (It can work for any poll config, but conveniences have not been supplied for others.)  */
+    public static final AttributeSensor<Void> NO_SENSOR = Sensors.newSensor(Void.class, "brooklyn.no.sensor");
+    
+    private final AttributeSensor<T> sensor;
+    private Function<? super V, T> onsuccess;
+    private Function<? super V, T> onfailure;
+    private Function<? super Exception, T> onexception;
+    private Predicate<? super V> checkSuccess;
+    private boolean suppressDuplicates;
+    private boolean enabled = true;
+    
+    public FeedConfig(AttributeSensor<T> sensor) {
+        this.sensor = checkNotNull(sensor, "sensor");
+    }
+
+    public FeedConfig(FeedConfig<V, T, F> other) {
+        this.sensor = other.sensor;
+        this.onsuccess = other.onsuccess;
+        this.onfailure = other.onfailure;
+        this.onexception = other.onexception;
+        this.checkSuccess = other.checkSuccess;
+        this.suppressDuplicates = other.suppressDuplicates;
+        this.enabled = other.enabled;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected F self() {
+        return (F) this;
+    }
+    
+    public AttributeSensor<T> getSensor() {
+        return sensor;
+    }
+
+    public Predicate<? super V> getCheckSuccess() {
+        return checkSuccess;
+    }
+    
+    public Function<? super V, T> getOnSuccess() {
+        return onsuccess;
+    }
+
+    public Function<? super V, T> getOnFailure() {
+        return onfailure;
+    }
+    
+    public Function<? super Exception, T> getOnException() {
+        return onexception;
+    }
+
+    public boolean getSupressDuplicates() {
+        return suppressDuplicates;
+    }
+    
+    public boolean isEnabled() {
+        return enabled;
+    }
+    
+    /** sets the predicate used to check whether a feed run is successful */
+    public F checkSuccess(Predicate<? super V> val) {
+        this.checkSuccess = checkNotNull(val, "checkSuccess");
+        return self();
+    }
+    /** as {@link #checkSuccess(Predicate)} */
+    public F checkSuccess(final Function<? super V,Boolean> val) {
+        return checkSuccess(Functionals.predicate(val));
+    }
+    @SuppressWarnings("unused")
+    /** @deprecated since 0.7.0, kept for rebind */ @Deprecated
+    private F checkSuccessLegacy(final Function<? super V,Boolean> val) {
+        return checkSuccess(new Predicate<V>() {
+            @Override
+            public boolean apply(V input) {
+                return val.apply(input);
+            }
+        });
+    }
+
+    public F onSuccess(Function<? super V,T> val) {
+        this.onsuccess = checkNotNull(val, "onSuccess");
+        return self();
+    }
+    
+    public F setOnSuccess(T val) {
+        return onSuccess(Functions.constant(val));
+    }
+    
+    /** a failure is when the connection is fine (no exception) but the other end returns a result object V 
+     * which the feed can tell indicates a failure (e.g. HTTP code 404) */
+    public F onFailure(Function<? super V,T> val) {
+        this.onfailure = checkNotNull(val, "onFailure");
+        return self();
+    }
+
+    public F setOnFailure(T val) {
+        return onFailure(Functions.constant(val));
+    }
+
+    /** registers a callback to be used {@link #onSuccess(Function)} and {@link #onFailure(Function)}, 
+     * i.e. whenever a result comes back, but not in case of exceptions being thrown (ie problems communicating) */
+    public F onResult(Function<? super V, T> val) {
+        onSuccess(val);
+        return onFailure(val);
+    }
+
+    public F setOnResult(T val) {
+        return onResult(Functions.constant(val));
+    }
+
+    /** an exception is when there is an error in the communication */
+    public F onException(Function<? super Exception,T> val) {
+        this.onexception = checkNotNull(val, "onException");
+        return self();
+    }
+    
+    public F setOnException(T val) {
+        return onException(Functions.constant(val));
+    }
+
+    /** convenience for indicating a behaviour to occur for both
+     * {@link #onException(Function)}
+     * (error connecting) and 
+     * {@link #onFailure(Function)} 
+     * (successful communication but failure report from remote end) */
+    public F onFailureOrException(Function<Object,T> val) {
+        onFailure(val);
+        return onException(val);
+    }
+    
+    public F setOnFailureOrException(T val) {
+        return onFailureOrException(Functions.constant(val));
+    }
+
+    public F suppressDuplicates(boolean val) {
+        suppressDuplicates = val;
+        return self();
+    }
+    
+    /**
+     * Whether this feed is enabled (defaulting to true).
+     */
+    public F enabled(boolean val) {
+        enabled = val;
+        return self();
+    }
+    
+    public boolean hasSuccessHandler() {
+        return this.onsuccess != null;
+    }
+
+    public boolean hasFailureHandler() {
+        return this.onfailure != null;
+    }
+
+    public boolean hasExceptionHandler() {
+        return this.onexception != null;
+    }
+
+    public boolean hasCheckSuccessHandler() {
+        return this.checkSuccess != null;
+    }
+
+    
+    @Override
+    public String toString() {
+        StringBuilder result = new StringBuilder();
+        result.append(toStringBaseName());
+        result.append("[");
+        boolean contents = false;
+        Object source = toStringPollSource();
+        AttributeSensor<T> s = getSensor();
+        if (Strings.isNonBlank(Strings.toString(source))) {
+            result.append(Strings.toUniqueString(source, 40));
+            if (s!=null) {
+                result.append("->");
+                result.append(s.getName());
+            }
+            contents = true;
+        } else if (s!=null) {
+            result.append(s.getName());
+            contents = true;
+        }
+        MutableList<Object> fields = toStringOtherFields();
+        if (fields!=null) {
+            for (Object field: fields) {
+                if (Strings.isNonBlank(Strings.toString(field))) {
+                    if (contents) result.append(";");
+                    contents = true;
+                    result.append(field);
+                }
+            }
+        }
+        result.append("]");
+        return result.toString();
+    }
+
+    /** can be overridden to supply a simpler base name than the class name */
+    protected String toStringBaseName() {
+        return JavaClassNames.simpleClassName(this);
+    }
+    /** can be overridden to supply add'l info for the {@link #toString()}; subclasses can add to the returned value */
+    protected MutableList<Object> toStringOtherFields() {
+        return MutableList.<Object>of();
+    }
+    /** can be overridden to supply add'l info for the {@link #toString()}, placed before the sensor with -> */
+    protected Object toStringPollSource() {
+        return null;
+    }
+    /** all configs should supply a unique tag element, inserted into the feed */
+    protected String getUniqueTag() {
+        return toString();
+    }
+
+    /** returns fields which should be used for equality, including by default {@link #toStringOtherFields()} and {@link #toStringPollSource()};
+     * subclasses can add to the returned value */
+    protected MutableList<Object> equalsFields() {
+        MutableList<Object> result = MutableList.of().appendIfNotNull(getSensor()).appendIfNotNull(toStringPollSource());
+        for (Object field: toStringOtherFields()) result.appendIfNotNull(field);
+        return result;
+    }
+
+    @Override
+    public int hashCode() { 
+        int hc = super.hashCode();
+        for (Object f: equalsFields())
+            hc = Objects.hashCode(hc, f);
+        return hc;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (!super.equals(obj)) return false;
+        PollConfig<?,?,?> other = (PollConfig<?,?,?>) obj;
+        if (!Objects.equal(getUniqueTag(), other.getUniqueTag())) return false;
+        if (!Objects.equal(equalsFields(), other.equalsFields())) return false;
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
new file mode 100644
index 0000000..133431b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.time.Duration;
+
+/**
+ * Configuration for polling, which is being added to a feed (e.g. to poll a given URL over http).
+ * 
+ * @author aled
+ */
+public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> {
+
+    private long period = -1;
+    private String description;
+
+    public PollConfig(AttributeSensor<T> sensor) {
+        super(sensor);
+    }
+
+    public PollConfig(PollConfig<V,T,F> other) {
+        super(other);
+        this.period = other.period;
+    }
+
+    public long getPeriod() {
+        return period;
+    }
+    
+    public F period(Duration val) {
+        checkArgument(val.toMilliseconds() >= 0, "period must be greater than or equal to zero");
+        this.period = val.toMilliseconds();
+        return self();
+    }
+    
+    public F period(long val) {
+        checkArgument(val >= 0, "period must be greater than or equal to zero");
+        this.period = val; return self();
+    }
+    
+    public F period(long val, TimeUnit units) {
+        checkArgument(val >= 0, "period must be greater than or equal to zero");
+        return period(units.toMillis(val));
+    }
+    
+    public F description(String description) {
+        this.description = description;
+        return self();
+    }
+    
+    public String getDescription() {
+        return description;
+    }
+    
+    @Override protected MutableList<Object> toStringOtherFields() {
+        return super.toStringOtherFields().appendIfNotNull(description);
+    }
+
+    @Override
+    protected MutableList<Object> equalsFields() {
+        return super.equalsFields().appendIfNotNull(period);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java
new file mode 100644
index 0000000..a63ebde
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+/**
+ * Notified by the Poller of the result for each job, on each poll.
+ * 
+ * @author aled
+ */
+public interface PollHandler<V> {
+
+    public boolean checkSuccess(V val);
+
+    public void onSuccess(V val);
+
+    public void onFailure(V val);
+
+    public void onException(Exception exception);
+
+    public String getDescription();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
new file mode 100644
index 0000000..fd50ebd
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
+import org.apache.brooklyn.util.core.task.ScheduledTask;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+
+
+/** 
+ * For executing periodic polls.
+ * Jobs are added to the schedule, and then the poller is started.
+ * The jobs will then be executed periodically, and the handler called for the result/failure.
+ * 
+ * Assumes the schedule+start will be done single threaded, and that stop will not be done concurrently.
+ */
+public class Poller<V> {
+    public static final Logger log = LoggerFactory.getLogger(Poller.class);
+
+    private final EntityLocal entity;
+    private final boolean onlyIfServiceUp;
+    private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>();
+    private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>();
+    private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>();
+    private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>();
+    private volatile boolean started = false;
+    
+    private static class PollJob<V> {
+        final PollHandler<? super V> handler;
+        final Duration pollPeriod;
+        final Runnable wrappedJob;
+        private boolean loggedPreviousException = false;
+        
+        PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) {
+            this.handler = handler;
+            this.pollPeriod = period;
+            
+            wrappedJob = new Runnable() {
+                public void run() {
+                    try {
+                        V val = job.call();
+                        loggedPreviousException = false;
+                        if (handler.checkSuccess(val)) {
+                            handler.onSuccess(val);
+                        } else {
+                            handler.onFailure(val);
+                        }
+                    } catch (Exception e) {
+                        if (loggedPreviousException) {
+                            if (log.isTraceEnabled()) log.trace("PollJob for {}, repeated consecutive failures, handling {} using {}", new Object[] {job, e, handler});
+                        } else {
+                            if (log.isDebugEnabled()) log.debug("PollJob for {} handling {} using {}", new Object[] {job, e, handler});
+                            loggedPreviousException = true;
+                        }
+                        handler.onException(e);
+                    }
+                }
+            };
+        }
+    }
+    
+    /** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */
+    @Deprecated
+    public Poller(EntityLocal entity) {
+        this(entity, false);
+    }
+    public Poller(EntityLocal entity, boolean onlyIfServiceUp) {
+        this.entity = entity;
+        this.onlyIfServiceUp = onlyIfServiceUp;
+    }
+    
+    /** Submits a one-off poll job; recommended that callers supply to-String so that task has a decent description */
+    public void submit(Callable<?> job) {
+        if (started) {
+            throw new IllegalStateException("Cannot submit additional tasks after poller has started");
+        }
+        oneOffJobs.add(job);
+    }
+
+    public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long period) {
+        scheduleAtFixedRate(job, handler, Duration.millis(period));
+    }
+    public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) {
+        if (started) {
+            throw new IllegalStateException("Cannot schedule additional tasks after poller has started");
+        }
+        PollJob<V> foo = new PollJob<V>(job, handler, period);
+        pollJobs.add(foo);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    public void start() {
+        // TODO Previous incarnation of this logged this logged polledSensors.keySet(), but we don't know that anymore
+        // Is that ok, are can we do better?
+        
+        if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this});
+        if (started) { 
+            throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", 
+                    this, entity));
+        }
+        
+        started = true;
+        
+        for (final Callable<?> oneOffJob : oneOffJobs) {
+            Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).name("Poll").description("One-time poll job "+oneOffJob).build();
+            oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
+        }
+        
+        for (final PollJob<V> pollJob : pollJobs) {
+            final String scheduleName = pollJob.handler.getDescription();
+            if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
+                Callable<Task<?>> pollingTaskFactory = new Callable<Task<?>>() {
+                    public Task<?> call() {
+                        DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), 
+                            new Callable<Void>() { public Void call() {
+                                if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
+                                        return null;
+                                }
+                                pollJob.wrappedJob.run();
+                                return null; 
+                            } } );
+                        BrooklynTaskTags.setTransient(task);
+                        return task;
+                    }
+                };
+                ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollJob.pollPeriod, "displayName", "scheduled:"+scheduleName), pollingTaskFactory);
+                tasks.add((ScheduledTask)Entities.submit(entity, task));
+            } else {
+                if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
+            }
+        }
+    }
+    
+    public void stop() {
+        if (log.isDebugEnabled()) log.debug("Stopping poll for {} (using {})", new Object[] {entity, this});
+        if (!started) { 
+            throw new IllegalStateException(String.format("Attempt to stop poller %s of entity %s when not running", 
+                    this, entity));
+        }
+        
+        started = false;
+        for (Task<?> task : oneOffTasks) {
+            if (task != null) task.cancel(true);
+        }
+        for (ScheduledTask task : tasks) {
+            if (task != null) task.cancel();
+        }
+        oneOffTasks.clear();
+        tasks.clear();
+    }
+
+    public boolean isRunning() {
+        boolean hasActiveTasks = false;
+        for (Task<?> task: tasks) {
+            if (task.isBegun() && !task.isDone()) {
+                hasActiveTasks = true;
+                break;
+            }
+        }
+        if (!started && hasActiveTasks) {
+            log.warn("Poller should not be running, but has active tasks, tasks: "+tasks);
+        }
+        return started && hasActiveTasks;
+    }
+    
+    protected boolean isEmpty() {
+        return pollJobs.isEmpty();
+    }
+    
+    public String toString() {
+        return Objects.toStringHelper(this).add("entity", entity).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
index 2e4a971..2a5e92e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
@@ -34,11 +34,11 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.entity.AbstractEntity;
 import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.core.mgmt.rebind.dto.MementosGenerators;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.entity.group.AbstractGroupImpl;
 import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java
index 479fbbf..4630be1 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java
@@ -20,7 +20,7 @@ package org.apache.brooklyn.core.mgmt.rebind;
 
 import org.apache.brooklyn.api.mgmt.rebind.RebindContext;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.FlagUtils;
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index c3e8030..e9478ef 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -67,6 +67,7 @@ import org.apache.brooklyn.core.catalog.internal.CatalogUtils;
 import org.apache.brooklyn.core.entity.AbstractApplication;
 import org.apache.brooklyn.core.entity.AbstractEntity;
 import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.core.location.AbstractLocation;
 import org.apache.brooklyn.core.location.internal.LocationInternal;
 import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContext;
@@ -86,7 +87,6 @@ import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory;
 import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.flags.FlagUtils;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
index 761341b..929b63c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
@@ -51,6 +51,7 @@ import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.catalog.internal.CatalogItemDo;
 import org.apache.brooklyn.core.entity.EntityDynamicType;
 import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.core.location.internal.LocationInternal;
 import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
 import org.apache.brooklyn.core.mgmt.rebind.AbstractBrooklynObjectRebindSupport;
@@ -58,7 +59,6 @@ import org.apache.brooklyn.core.mgmt.rebind.TreeUtils;
 import org.apache.brooklyn.core.objs.BrooklynTypes;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.FlagUtils;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java
index 940d949..f76baaa 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java
@@ -27,7 +27,7 @@ import org.apache.brooklyn.core.config.BasicConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.AbstractEntity;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
-import org.apache.brooklyn.sensor.feed.ConfigToAttributes;
+import org.apache.brooklyn.core.feed.ConfigToAttributes;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java
index 79660ce..542fc01 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java
@@ -26,9 +26,9 @@ import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.effector.AddSensor;
-import org.apache.brooklyn.sensor.feed.http.HttpFeed;
-import org.apache.brooklyn.sensor.feed.http.HttpPollConfig;
-import org.apache.brooklyn.sensor.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
index a6880b7..e914bd2 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
@@ -30,8 +30,8 @@ import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.entity.Group;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.sensor.feed.function.FunctionFeed;
-import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
 import org.apache.brooklyn.util.collections.MutableMap;
 
 import com.google.common.base.Function;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java b/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java
index af34e0d..7f10d5b 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.core.entity.AbstractEntity;
-import org.apache.brooklyn.sensor.feed.function.FunctionFeed;
-import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
 
 import com.google.common.base.Functions;
 import com.google.common.base.Supplier;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
new file mode 100644
index 0000000..55db890
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.function;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by periodically invoking functions.
+ * 
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private FunctionFeed feed;
+ * 
+ * //@Override
+ * protected void connectSensors() {
+ *   super.connectSensors();
+ *   
+ *   feed = FunctionFeed.builder()
+ *     .entity(this)
+ *     .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
+ *         .period(500, TimeUnit.MILLISECONDS)
+ *         .callable(new Callable<Boolean>() {
+ *             public Boolean call() throws Exception {
+ *               return getDriver().isRunning();
+ *             }
+ *         })
+ *         .onExceptionOrFailure(Functions.constant(Boolan.FALSE))
+ *     .build();
+ * }
+ * 
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ *   super.disconnectSensors();
+ *   if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ * 
+ * @author aled
+ */
+public class FunctionFeed extends AbstractFeed {
+
+    private static final Logger log = LoggerFactory.getLogger(FunctionFeed.class);
+
+    // Treat as immutable once built
+    @SuppressWarnings("serial")
+    public static final ConfigKey<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>> POLLS = ConfigKeys.newConfigKey(
+            new TypeToken<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>>() {},
+            "polls");
+
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static Builder builder(String uniqueTag) {
+        return new Builder().uniqueTag(uniqueTag);
+    }
+    
+    public static class Builder {
+        private EntityLocal entity;
+        private boolean onlyIfServiceUp = false;
+        private long period = 500;
+        private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
+        private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList();
+        private String uniqueTag;
+        private volatile boolean built;
+
+        public Builder entity(EntityLocal val) {
+            this.entity = val;
+            return this;
+        }
+        public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
+        public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { 
+            this.onlyIfServiceUp = onlyIfServiceUp; 
+            return this; 
+        }
+        public Builder period(Duration d) {
+            return period(d.toMilliseconds(), TimeUnit.MILLISECONDS);
+        }
+        public Builder period(long millis) {
+            return period(millis, TimeUnit.MILLISECONDS);
+        }
+        public Builder period(long val, TimeUnit units) {
+            this.period = val;
+            this.periodUnits = units;
+            return this;
+        }
+        public Builder poll(FunctionPollConfig<?,?> config) {
+            polls.add(config);
+            return this;
+        }
+        public Builder uniqueTag(String uniqueTag) {
+            this.uniqueTag = uniqueTag;
+            return this;
+        }
+        public FunctionFeed build() {
+            built = true;
+            FunctionFeed result = new FunctionFeed(this);
+            result.setEntity(checkNotNull(entity, "entity"));
+            result.start();
+            return result;
+        }
+        @Override
+        protected void finalize() {
+            if (!built) log.warn("FunctionFeed.Builder created, but build() never called");
+        }
+    }
+    
+    private static class FunctionPollIdentifier {
+        final Callable<?> job;
+
+        private FunctionPollIdentifier(Callable<?> job) {
+            this.job = checkNotNull(job, "job");
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(job);
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            return (other instanceof FunctionPollIdentifier) && Objects.equal(job, ((FunctionPollIdentifier)other).job);
+        }
+    }
+
+    /**
+     * For rebind; do not call directly; use builder
+     */
+    public FunctionFeed() {
+    }
+    
+    protected FunctionFeed(Builder builder) {
+        setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
+        
+        SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>> polls = HashMultimap.<FunctionPollIdentifier,FunctionPollConfig<?,?>>create();
+        for (FunctionPollConfig<?,?> config : builder.polls) {
+            if (!config.isEnabled()) continue;
+            @SuppressWarnings({ "rawtypes", "unchecked" })
+            FunctionPollConfig<?,?> configCopy = new FunctionPollConfig(config);
+            if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
+            Callable<?> job = config.getCallable();
+            polls.put(new FunctionPollIdentifier(job), configCopy);
+        }
+        setConfig(POLLS, polls);
+        initUniqueTag(builder.uniqueTag, polls.values());
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    protected void preStart() {
+        SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?, ?>> polls = getConfig(POLLS);
+        for (final FunctionPollIdentifier pollInfo : polls.keySet()) {
+            Set<FunctionPollConfig<?,?>> configs = polls.get(pollInfo);
+            long minPeriod = Integer.MAX_VALUE;
+            Set<AttributePollHandler<?>> handlers = Sets.newLinkedHashSet();
+
+            for (FunctionPollConfig<?,?> config : configs) {
+                handlers.add(new AttributePollHandler(config, entity, this));
+                if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+            }
+            
+            getPoller().scheduleAtFixedRate(
+                    (Callable)pollInfo.job,
+                    new DelegatingPollHandler(handlers), 
+                    minPeriod);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
new file mode 100644
index 0000000..4951868
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.function;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import groovy.lang.Closure;
+
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.FeedConfig;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+
+import com.google.common.base.Supplier;
+
+public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfig<S, T>> {
+
+    private Callable<?> callable;
+    
+    public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> sensor) {
+        return new FunctionPollConfig<Object, T>(sensor);
+    }
+    
+    public FunctionPollConfig(AttributeSensor<T> sensor) {
+        super(sensor);
+    }
+
+    public FunctionPollConfig(FunctionPollConfig<S, T> other) {
+        super(other);
+        callable = other.callable;
+    }
+    
+    public Callable<? extends Object> getCallable() {
+        return callable;
+    }
+    
+    /**
+     * The {@link Callable} to be invoked on each poll.
+     * <p>
+     * Note this <em>must</em> use generics, otherwise the return type of subsequent chained
+     * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will
+     * return the wrong type.
+     */
+    @SuppressWarnings("unchecked")
+    public <newS> FunctionPollConfig<newS, T> callable(Callable<? extends newS> val) {
+        this.callable = checkNotNull(val, "callable");
+        return (FunctionPollConfig<newS, T>) this;
+    }
+    
+    /**
+     * Supplies the value to be returned by each poll.
+     * <p>
+     * Note this <em>must</em> use generics, otherwise the return type of subsequent chained
+     * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will
+     * return the wrong type.
+     */
+    @SuppressWarnings("unchecked")
+    public <newS> FunctionPollConfig<newS, T> supplier(final Supplier<? extends newS> val) {
+        this.callable = Functionals.callable( checkNotNull(val, "supplier") );
+        return (FunctionPollConfig<newS, T>) this;
+    }
+    
+    /** @deprecated since 0.7.0, kept for legacy compatibility when deserializing */
+    @SuppressWarnings({ "unchecked", "unused" })
+    private <newS> FunctionPollConfig<newS, T> supplierLegacy(final Supplier<? extends newS> val) {
+        checkNotNull(val, "supplier");
+        this.callable = new Callable<newS>() {
+            @Override
+            public newS call() throws Exception {
+                return val.get();
+            }
+        };
+        return (FunctionPollConfig<newS, T>) this;
+    }
+
+    public FunctionPollConfig<S, T> closure(Closure<?> val) {
+        this.callable = GroovyJavaMethods.callableFromClosure(checkNotNull(val, "closure"));
+        return this;
+    }
+
+    @Override protected String toStringBaseName() { return "fn"; }
+    @Override protected String toStringPollSource() {
+        if (callable==null) return null;
+        String cs = callable.toString();
+        if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) {
+            return cs;
+        }
+        // if hashcode is in callable it's probably a custom internal; return class name
+        return JavaClassNames.simpleClassName(callable);
+    }
+
+}