You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:13 UTC

[25/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to o.a.b.feed and o.a.b.core.feed

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
new file mode 100644
index 0000000..d34d0a5
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.windows;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import io.cloudsoft.winrm4j.winrm.WinRmToolResponse;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.effector.EffectorTasks;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.PollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.location.winrm.WinRmMachineLocation;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A sensor feed that retrieves performance counters from a Windows host and posts the values to sensors.
+ *
+ * <p>To use this feed, you must provide the entity, and a collection of mappings between Windows performance counter
+ * names and Brooklyn attribute sensors.</p>
+ *
+ * <p>This feed uses SSH to invoke the windows utility <tt>typeperf</tt> to query for a specific set of performance
+ * counters, by name. The values are extracted from the response, and published to the entity's sensors.</p>
+ *
+ * <p>Example:</p>
+ *
+ * {@code
+ * @Override
+ * protected void connectSensors() {
+ *     WindowsPerformanceCounterFeed feed = WindowsPerformanceCounterFeed.builder()
+ *         .entity(entity)
+ *         .addSensor("\\Processor(_total)\\% Idle Time", CPU_IDLE_TIME)
+ *         .addSensor("\\Memory\\Available MBytes", AVAILABLE_MEMORY)
+ *         .build();
+ * }
+ * }
+ *
+ * @since 0.6.0
+ * @author richardcloudsoft
+ */
+public class WindowsPerformanceCounterFeed extends AbstractFeed {
+
+    private static final Logger log = LoggerFactory.getLogger(WindowsPerformanceCounterFeed.class);
+
+    // This pattern matches CSV line(s) with the date in the first field, and at least one further field.
+    protected static final Pattern lineWithPerfData = Pattern.compile("^\"[\\d:/\\-. ]+\",\".*\"$", Pattern.MULTILINE);
+    private static final Joiner JOINER_ON_SPACE = Joiner.on(' ');
+    private static final Joiner JOINER_ON_COMMA = Joiner.on(',');
+    private static final int OUTPUT_COLUMN_WIDTH = 100;
+
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Collection<WindowsPerformanceCounterPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+            new TypeToken<Collection<WindowsPerformanceCounterPollConfig<?>>>() {},
+            "polls");
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private EntityLocal entity;
+        private Set<WindowsPerformanceCounterPollConfig<?>> polls = Sets.newLinkedHashSet();
+        private Duration period = Duration.of(30, TimeUnit.SECONDS);
+        private String uniqueTag;
+        private volatile boolean built;
+
+        public Builder entity(EntityLocal val) {
+            this.entity = checkNotNull(val, "entity");
+            return this;
+        }
+        public Builder addSensor(WindowsPerformanceCounterPollConfig<?> config) {
+            polls.add(config);
+            return this;
+        }
+        public Builder addSensor(String performanceCounterName, AttributeSensor<?> sensor) {
+            return addSensor(new WindowsPerformanceCounterPollConfig(sensor).performanceCounterName(checkNotNull(performanceCounterName, "performanceCounterName")));
+        }
+        public Builder addSensors(Map<String, AttributeSensor> sensors) {
+            for (Map.Entry<String, AttributeSensor> entry : sensors.entrySet()) {
+                addSensor(entry.getKey(), entry.getValue());
+            }
+            return this;
+        }
+        public Builder period(Duration period) {
+            this.period = checkNotNull(period, "period");
+            return this;
+        }
+        public Builder period(long millis) {
+            return period(millis, TimeUnit.MILLISECONDS);
+        }
+        public Builder period(long val, TimeUnit units) {
+            return period(Duration.of(val, units));
+        }
+        public Builder uniqueTag(String uniqueTag) {
+            this.uniqueTag = uniqueTag;
+            return this;
+        }
+        public WindowsPerformanceCounterFeed build() {
+            built = true;
+            WindowsPerformanceCounterFeed result = new WindowsPerformanceCounterFeed(this);
+            result.setEntity(checkNotNull(entity, "entity"));
+            result.start();
+            return result;
+        }
+        @Override
+        protected void finalize() {
+            if (!built) log.warn("WindowsPerformanceCounterFeed.Builder created, but build() never called");
+        }
+    }
+
+    /**
+     * For rebind; do not call directly; use builder
+     */
+    public WindowsPerformanceCounterFeed() {
+    }
+
+    protected WindowsPerformanceCounterFeed(Builder builder) {
+        List<WindowsPerformanceCounterPollConfig<?>> polls = Lists.newArrayList();
+        for (WindowsPerformanceCounterPollConfig<?> config : builder.polls) {
+            if (!config.isEnabled()) continue;
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            WindowsPerformanceCounterPollConfig<?> configCopy = new WindowsPerformanceCounterPollConfig(config);
+            if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
+            polls.add(configCopy);
+        }
+        config().set(POLLS, polls);
+        initUniqueTag(builder.uniqueTag, polls);
+    }
+
+    @Override
+    protected void preStart() {
+        Collection<WindowsPerformanceCounterPollConfig<?>> polls = getConfig(POLLS);
+        
+        long minPeriod = Integer.MAX_VALUE;
+        List<String> performanceCounterNames = Lists.newArrayList();
+        for (WindowsPerformanceCounterPollConfig<?> config : polls) {
+            minPeriod = Math.min(minPeriod, config.getPeriod());
+            performanceCounterNames.add(config.getPerformanceCounterName());
+        }
+        
+        Iterable<String> allParams = ImmutableList.<String>builder()
+                .add("(Get-Counter")
+                .add("-Counter")
+                .add(JOINER_ON_COMMA.join(Iterables.transform(performanceCounterNames, QuoteStringFunction.INSTANCE)))
+                .add("-SampleInterval")
+                .add("2") // TODO: extract SampleInterval as a config key
+                .add(").CounterSamples")
+                .add("|")
+                .add("Format-Table")
+                .add(String.format("@{Expression={$_.Path};width=%d},@{Expression={$_.CookedValue};width=%<d}", OUTPUT_COLUMN_WIDTH))
+                .add("-HideTableHeaders")
+                .add("|")
+                .add("Out-String")
+                .add("-Width")
+                .add(String.valueOf(OUTPUT_COLUMN_WIDTH * 2))
+                .build();
+        String command = JOINER_ON_SPACE.join(allParams);
+        log.debug("Windows performance counter poll command for {} will be: {}", entity, command);
+
+        GetPerformanceCountersJob<WinRmToolResponse> job = new GetPerformanceCountersJob(getEntity(), command);
+        getPoller().scheduleAtFixedRate(
+                new CallInEntityExecutionContext(entity, job),
+                new SendPerfCountersToSensors(getEntity(), polls),
+                minPeriod);
+    }
+
+    private static class GetPerformanceCountersJob<T> implements Callable<T> {
+
+        private final Entity entity;
+        private final String command;
+
+        GetPerformanceCountersJob(Entity entity, String command) {
+            this.entity = entity;
+            this.command = command;
+        }
+
+        @Override
+        public T call() throws Exception {
+            WinRmMachineLocation machine = EffectorTasks.getWinRmMachine(entity);
+            WinRmToolResponse response = machine.executePsScript(command);
+            return (T)response;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Poller<WinRmToolResponse> getPoller() {
+        return (Poller<WinRmToolResponse>) super.getPoller();
+    }
+
+    /**
+     * A {@link java.util.concurrent.Callable} that wraps another {@link java.util.concurrent.Callable}, where the
+     * inner {@link java.util.concurrent.Callable} is executed in the context of a
+     * specific entity.
+     *
+     * @param <T> The type of the {@link java.util.concurrent.Callable}.
+     */
+    private static class CallInEntityExecutionContext<T> implements Callable<T> {
+        private final Callable<T> job;
+        private EntityLocal entity;
+
+        private CallInEntityExecutionContext(EntityLocal entity, Callable<T> job) {
+            this.job = job;
+            this.entity = entity;
+        }
+
+        @Override
+        public T call() throws Exception {
+            ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext();
+            return executionContext.submit(Maps.newHashMap(), job).get();
+        }
+    }
+
+    @VisibleForTesting
+    static class SendPerfCountersToSensors implements PollHandler<WinRmToolResponse> {
+        private final EntityLocal entity;
+        private final List<WindowsPerformanceCounterPollConfig<?>> polls;
+        private final Set<AttributeSensor<?>> failedAttributes = Sets.newLinkedHashSet();
+        private static final Pattern MACHINE_NAME_LOOKBACK_PATTERN = Pattern.compile(String.format("(?<=\\\\\\\\.{0,%d})\\\\.*", OUTPUT_COLUMN_WIDTH));
+        
+        public SendPerfCountersToSensors(EntityLocal entity, Collection<WindowsPerformanceCounterPollConfig<?>> polls) {
+            this.entity = entity;
+            this.polls = ImmutableList.copyOf(polls);
+        }
+
+        @Override
+        public boolean checkSuccess(WinRmToolResponse val) {
+            // TODO not just using statusCode; also looking at absence of stderr.
+            // Status code is (empirically) unreliable: it returns 0 sometimes even when failed 
+            // (but never returns non-zero on success).
+            if (val.getStatusCode() != 0) return false;
+            String stderr = val.getStdErr();
+            if (stderr == null || stderr.length() != 0) return false;
+            String out = val.getStdOut();
+            if (out == null || out.length() == 0) return false;
+            return true;
+        }
+
+        @Override
+        public void onSuccess(WinRmToolResponse val) {
+            for (String pollResponse : val.getStdOut().split("\r\n")) {
+                if (Strings.isNullOrEmpty(pollResponse)) {
+                    continue;
+                }
+                String path = pollResponse.substring(0, OUTPUT_COLUMN_WIDTH - 1);
+                // The performance counter output prepends the sensor name with "\\<machinename>" so we need to remove it
+                Matcher machineNameLookbackMatcher = MACHINE_NAME_LOOKBACK_PATTERN.matcher(path);
+                if (!machineNameLookbackMatcher.find()) {
+                    continue;
+                }
+                String name = machineNameLookbackMatcher.group(0).trim();
+                String rawValue = pollResponse.substring(OUTPUT_COLUMN_WIDTH).replaceAll("^\\s+", "");
+                WindowsPerformanceCounterPollConfig<?> config = getPollConfig(name);
+                Class<?> clazz = config.getSensor().getType();
+                AttributeSensor<Object> attribute = (AttributeSensor<Object>) Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription());
+                try {
+                    Object value = TypeCoercions.coerce(rawValue, TypeToken.of(clazz));
+                    entity.setAttribute(attribute, value);
+                } catch (Exception e) {
+                    Exceptions.propagateIfFatal(e);
+                    if (failedAttributes.add(attribute)) {
+                        log.warn("Failed to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute});
+                    } else {
+                        if (log.isTraceEnabled()) log.trace("Failed (repeatedly) to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute});
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void onFailure(WinRmToolResponse val) {
+            log.error("Windows Performance Counter query did not respond as expected. exitcode={} stdout={} stderr={}",
+                    new Object[]{val.getStatusCode(), val.getStdOut(), val.getStdErr()});
+            for (WindowsPerformanceCounterPollConfig<?> config : polls) {
+                Class<?> clazz = config.getSensor().getType();
+                AttributeSensor<?> attribute = Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription());
+                entity.setAttribute(attribute, null);
+            }
+        }
+
+        @Override
+        public void onException(Exception exception) {
+            log.error("Detected exception while retrieving Windows Performance Counters from entity " +
+                    entity.getDisplayName(), exception);
+            for (WindowsPerformanceCounterPollConfig<?> config : polls) {
+                entity.setAttribute(Sensors.newSensor(config.getSensor().getClass(), config.getPerformanceCounterName(), config.getDescription()), null);
+            }
+        }
+
+        @Override
+        public String getDescription() {
+            return "" + polls;
+        }
+
+        @Override
+        public String toString() {
+            return super.toString()+"["+getDescription()+"]";
+        }
+
+        private WindowsPerformanceCounterPollConfig<?> getPollConfig(String sensorName) {
+            for (WindowsPerformanceCounterPollConfig<?> poll : polls) {
+                if (poll.getPerformanceCounterName().equalsIgnoreCase(sensorName)) {
+                    return poll;
+                }
+            }
+            throw new IllegalStateException(String.format("%s not found in configured polls: %s", sensorName, polls));
+        }
+    }
+
+    static class PerfCounterValueIterator implements Iterator<String> {
+
+        // This pattern matches the contents of the first field, and optionally matches the rest of the line as
+        // further fields. Feed the second match back into the pattern again to get the next field, and repeat until
+        // all fields are discovered.
+        protected static final Pattern splitPerfData = Pattern.compile("^\"([^\\\"]*)\"((,\"[^\\\"]*\")*)$");
+
+        private Matcher matcher;
+
+        public PerfCounterValueIterator(String input) {
+            matcher = splitPerfData.matcher(input);
+            // Throw away the first element (the timestamp) (and also confirm that we have a pattern match)
+            checkArgument(hasNext(), "input "+input+" does not match expected pattern "+splitPerfData.pattern());
+            next();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return matcher != null && matcher.find();
+        }
+
+        @Override
+        public String next() {
+            String next = matcher.group(1);
+
+            String remainder = matcher.group(2);
+            if (!Strings.isNullOrEmpty(remainder)) {
+                assert remainder.startsWith(",");
+                remainder = remainder.substring(1);
+                matcher = splitPerfData.matcher(remainder);
+            } else {
+                matcher = null;
+            }
+
+            return next;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static enum QuoteStringFunction implements Function<String, String> {
+        INSTANCE;
+
+        @Nullable
+        @Override
+        public String apply(@Nullable String input) {
+            return input != null ? "\"" + input + "\"" : null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java
new file mode 100644
index 0000000..1391c3e
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.windows;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.PollConfig;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+public class WindowsPerformanceCounterPollConfig<T> extends PollConfig<Object, T, WindowsPerformanceCounterPollConfig<T>>{
+
+    private String performanceCounterName;
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public WindowsPerformanceCounterPollConfig(AttributeSensor<T> sensor) {
+        super(sensor);
+        description(sensor.getDescription());
+        onSuccess((Function)Functions.identity());
+    }
+
+    public WindowsPerformanceCounterPollConfig(WindowsPerformanceCounterPollConfig<T> other) {
+        super(other);
+        this.performanceCounterName = other.performanceCounterName;
+    }
+
+    public String getPerformanceCounterName() {
+        return performanceCounterName;
+    }
+    
+    public WindowsPerformanceCounterPollConfig<T> performanceCounterName(String val) {
+        this.performanceCounterName = val; return this;
+    }
+
+    @Override protected String toStringPollSource() { return performanceCounterName; }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java
deleted file mode 100644
index 327181a..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Collection;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
-import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento;
-import org.apache.brooklyn.api.sensor.Feed;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.BrooklynFeatureEnablement;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.mgmt.rebind.BasicFeedRebindSupport;
-import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
-import org.apache.brooklyn.util.javalang.JavaClassNames;
-import org.apache.brooklyn.util.text.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** 
- * Captures common fields and processes for sensor feeds.
- * These generally poll or subscribe to get sensor values for an entity.
- * They make it easy to poll over http, jmx, etc.
- */
-public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed {
-
-    private static final Logger log = LoggerFactory.getLogger(AbstractFeed.class);
-
-    public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("feed.onlyIfServiceUp", "", false);
-    
-    private final Object pollerStateMutex = new Object();
-    private transient volatile Poller<?> poller;
-    private transient volatile boolean activated;
-    private transient volatile boolean suspended;
-
-    public AbstractFeed() {
-    }
-    
-    /**
-     * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)}
-     */
-    @Deprecated
-    public AbstractFeed(EntityLocal entity) {
-        this(entity, false);
-    }
-    
-    /**
-     * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)} and {@code setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp)}
-     */
-    @Deprecated
-    public AbstractFeed(EntityLocal entity, boolean onlyIfServiceUp) {
-        this.entity = checkNotNull(entity, "entity");
-        setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp);
-    }
-
-    // Ensure idempotent, as called in builders (in case not registered with entity), and also called
-    // when registering with entity
-    @Override
-    public void setEntity(EntityLocal entity) {
-        super.setEntity(entity);
-        if (BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_FEED_REGISTRATION_PROPERTY)) {
-            ((EntityInternal)entity).feeds().addFeed(this);
-        }
-    }
-
-    protected void initUniqueTag(String uniqueTag, Object ...valsForDefault) {
-        if (Strings.isNonBlank(uniqueTag)) this.uniqueTag = uniqueTag;
-        else this.uniqueTag = getDefaultUniqueTag(valsForDefault);
-    }
-
-    protected String getDefaultUniqueTag(Object ...valsForDefault) {
-        StringBuilder sb = new StringBuilder();
-        sb.append(JavaClassNames.simpleClassName(this));
-        if (valsForDefault.length==0) {
-            sb.append("@");
-            sb.append(hashCode());
-        } else if (valsForDefault.length==1 && valsForDefault[0] instanceof Collection){
-            sb.append(Strings.toUniqueString(valsForDefault[0], 80));
-        } else {
-            sb.append("[");
-            boolean first = true;
-            for (Object x: valsForDefault) {
-                if (!first) sb.append(";");
-                else first = false;
-                sb.append(Strings.toUniqueString(x, 80));
-            }
-            sb.append("]");
-        }
-        return sb.toString(); 
-    }
-
-    @Override
-    public void start() {
-        if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity);
-        if (activated) { 
-            throw new IllegalStateException(String.format("Attempt to start feed %s of entity %s when already running", 
-                    this, entity));
-        }
-        if (poller != null) {
-            throw new IllegalStateException(String.format("Attempt to re-start feed %s of entity %s", this, entity));
-        }
-        
-        poller = new Poller<Object>(entity, getConfig(ONLY_IF_SERVICE_UP));
-        activated = true;
-        preStart();
-        synchronized (pollerStateMutex) {
-            // don't start poller if we are suspended
-            if (!suspended) {
-                poller.start();
-            }
-        }
-    }
-
-    @Override
-    public void suspend() {
-        synchronized (pollerStateMutex) {
-            if (activated && !suspended) {
-                poller.stop();
-            }
-            suspended = true;
-        }
-    }
-    
-    @Override
-    public void resume() {
-        synchronized (pollerStateMutex) {
-            if (activated && suspended) {
-                poller.start();
-            }
-            suspended = false;
-        }
-    }
-    
-    @Override
-    public void destroy() {
-        stop();
-    }
-
-    @Override
-    public void stop() {
-        if (!activated) { 
-            log.debug("Ignoring attempt to stop feed {} of entity {} when not running", this, entity);
-            return;
-        }
-        if (log.isDebugEnabled()) log.debug("stopping feed {} for {}", this, entity);
-        
-        activated = false;
-        preStop();
-        synchronized (pollerStateMutex) {
-            if (!suspended) {
-                poller.stop();
-            }
-        }
-        postStop();
-        super.destroy();
-    }
-
-    @Override
-    public boolean isActivated() {
-        return activated;
-    }
-    
-    public EntityLocal getEntity() {
-        return entity;
-    }
-    
-    protected boolean isConnected() {
-        // TODO Default impl will result in multiple logs for same error if becomes unreachable
-        // (e.g. if ssh gets NoRouteToHostException, then every AttributePollHandler for that
-        // feed will log.warn - so if polling for 10 sensors/attributes will get 10 log messages).
-        // Would be nice if reduced this logging duplication.
-        // (You can reduce it by providing a better 'isConnected' implementation of course.)
-        return isRunning() && entity!=null && !((EntityInternal)entity).getManagementSupport().isNoLongerManaged();
-    }
-
-    @Override
-    public boolean isSuspended() {
-        return suspended;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return isActivated() && !isSuspended() && !isDestroyed() && getPoller()!=null && getPoller().isRunning();
-    }
-
-    @Override
-    public RebindSupport<FeedMemento> getRebindSupport() {
-        return new BasicFeedRebindSupport(this);
-    }
-
-    @Override
-    protected void onChanged() {
-        // TODO Auto-generated method stub
-    }
-
-    /**
-     * For overriding.
-     */
-    protected void preStart() {
-    }
-    
-    /**
-     * For overriding.
-     */
-    protected void preStop() {
-    }
-    
-    /**
-     * For overriding.
-     */
-    protected void postStop() {
-    }
-    
-    /**
-     * For overriding, where sub-class can change return-type generics!
-     */
-    protected Poller<?> getPoller() {
-        return poller;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java
deleted file mode 100644
index eac972e..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
-import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition;
-import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-
-/**
- * Handler for when polling an entity's attribute. On each poll result the entity's attribute is set.
- * 
- * Calls to onSuccess and onError will happen sequentially, but may be called from different threads 
- * each time. Note that no guarantees of a synchronized block exist, so additional synchronization 
- * would be required for the Java memory model's "happens before" relationship.
- * 
- * @author aled
- */
-public class AttributePollHandler<V> implements PollHandler<V> {
-
-    public static final Logger log = LoggerFactory.getLogger(AttributePollHandler.class);
-
-    private final FeedConfig<V,?,?> config;
-    private final EntityLocal entity;
-    @SuppressWarnings("rawtypes")
-    private final AttributeSensor sensor;
-    private final AbstractFeed feed;
-    private final boolean suppressDuplicates;
-    
-    // allow 30 seconds before logging at WARN, if there has been no success yet;
-    // after success WARN immediately
-    // TODO these should both be configurable
-    private Duration logWarningGraceTimeOnStartup = Duration.THIRTY_SECONDS;
-    private Duration logWarningGraceTime = Duration.millis(0);
-    
-    // internal state to look after whether to log warnings
-    private volatile Long lastSuccessTime = null;
-    private volatile Long currentProblemStartTime = null;
-    private volatile boolean currentProblemLoggedAsWarning = false;
-    private volatile boolean lastWasProblem = false;
-
-    
-    public AttributePollHandler(FeedConfig<V,?,?> config, EntityLocal entity, AbstractFeed feed) {
-        this.config = checkNotNull(config, "config");
-        this.entity = checkNotNull(entity, "entity");
-        this.sensor = checkNotNull(config.getSensor(), "sensor");
-        this.feed = checkNotNull(feed, "feed");
-        this.suppressDuplicates = config.getSupressDuplicates();
-    }
-
-    @Override
-    public boolean checkSuccess(V val) {
-        // Always true if no checkSuccess predicate was configured.
-        return !config.hasCheckSuccessHandler() || config.getCheckSuccess().apply(val);
-    }
-
-    @Override
-    public void onSuccess(V val) {
-        if (lastWasProblem) {
-            if (currentProblemLoggedAsWarning) { 
-                log.info("Success (following previous problem) reading "+getBriefDescription());
-            } else {
-                log.debug("Success (following previous problem) reading "+getBriefDescription());
-            }
-            lastWasProblem = false;
-            currentProblemStartTime = null;
-            currentProblemLoggedAsWarning = false;
-        }
-        lastSuccessTime = System.currentTimeMillis();
-        if (log.isTraceEnabled()) log.trace("poll for {} got: {}", new Object[] {getBriefDescription(), val});
-        
-        try {
-            setSensor(transformValueOnSuccess(val));
-        } catch (Exception e) {
-            if (feed.isConnected()) {
-                log.warn("unable to compute "+getBriefDescription()+"; on val="+val, e);
-            } else {
-                if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; val="+val+" (when inactive)", e);
-            }
-        }
-    }
-
-    /** allows post-processing, such as applying a success handler; 
-     * default applies the onSuccess handler (which is recommended) */
-    protected Object transformValueOnSuccess(V val) {
-        return config.hasSuccessHandler() ? config.getOnSuccess().apply(val) : val;
-    }
-
-    @Override
-    public void onFailure(V val) {
-        if (!config.hasFailureHandler()) {
-            onException(new Exception("checkSuccess of "+this+" for "+getBriefDescription()+" was false but poller has no failure handler"));
-        } else {
-            logProblem("failure", val);
-
-            try {
-                setSensor(config.hasFailureHandler() ? config.getOnFailure().apply((V)val) : val);
-            } catch (Exception e) {
-                if (feed.isConnected()) {
-                    log.warn("Error computing " + getBriefDescription() + "; val=" + val+": "+ e, e);
-                } else {
-                    if (log.isDebugEnabled())
-                        log.debug("Error computing " + getBriefDescription() + "; val=" + val + " (when inactive)", e);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void onException(Exception exception) {
-        if (!feed.isConnected()) {
-            if (log.isTraceEnabled()) log.trace("Read of {} in {} gave exception (while not connected or not yet connected): {}", new Object[] {this, getBriefDescription(), exception});
-        } else {
-            logProblem("exception", exception);
-        }
-
-        if (config.hasExceptionHandler()) {
-            try {
-                setSensor( config.getOnException().apply(exception) );
-            } catch (Exception e) {
-                if (feed.isConnected()) {
-                    log.warn("unable to compute "+getBriefDescription()+"; on exception="+exception, e);
-                } else {
-                    if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; exception="+exception+" (when inactive)", e);
-                }
-            }
-        }
-    }
-
-    protected void logProblem(String type, Object val) {
-        if (lastWasProblem && currentProblemLoggedAsWarning) {
-            if (log.isTraceEnabled())
-                log.trace("Recurring {} reading {} in {}: {}", new Object[] {type, this, getBriefDescription(), val});
-        } else {
-            long nowTime = System.currentTimeMillis();
-            // get a non-volatile value
-            Long currentProblemStartTimeCache = currentProblemStartTime;
-            long expiryTime = 
-                    (lastSuccessTime!=null && !isTransitioningOrStopped()) ? lastSuccessTime+logWarningGraceTime.toMilliseconds() :
-                    currentProblemStartTimeCache!=null ? currentProblemStartTimeCache+logWarningGraceTimeOnStartup.toMilliseconds() :
-                    nowTime+logWarningGraceTimeOnStartup.toMilliseconds();
-            if (!lastWasProblem) {
-                if (expiryTime <= nowTime) {
-                    currentProblemLoggedAsWarning = true;
-                    if (entity==null || !Entities.isNoLongerManaged(entity)) {
-                        log.warn("Read of " + getBriefDescription() + " gave " + type + ": " + val);
-                    } else {
-                        log.debug("Read of " + getBriefDescription() + " gave " + type + ": " + val);
-                    }
-                    if (log.isDebugEnabled() && val instanceof Throwable)
-                        log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
-                } else {
-                    if (log.isDebugEnabled())
-                        log.debug("Read of " + getBriefDescription() + " gave " + type + " (in grace period): " + val);
-                }
-                lastWasProblem = true;
-                currentProblemStartTime = nowTime;
-            } else {
-                if (expiryTime <= nowTime) {
-                    currentProblemLoggedAsWarning = true;
-                    log.warn("Read of " + getBriefDescription() + " gave " + type + 
-                            " (grace period expired, occurring for "+Duration.millis(nowTime - currentProblemStartTimeCache)+
-                            (config.hasExceptionHandler() ? "" : ", no exception handler set for sensor")+
-                            ")"+
-                            ": " + val);
-                    if (log.isDebugEnabled() && val instanceof Throwable)
-                        log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
-                } else {
-                    if (log.isDebugEnabled()) 
-                        log.debug("Recurring {} reading {} in {} (still in grace period): {}", new Object[] {type, this, getBriefDescription(), val});
-                }
-            }
-        }
-    }
-
-    protected boolean isTransitioningOrStopped() {
-        if (entity==null) return false;
-        Transition expected = entity.getAttribute(Attributes.SERVICE_STATE_EXPECTED);
-        if (expected==null) return false;
-        return (expected.getState()==Lifecycle.STARTING || expected.getState()==Lifecycle.STOPPING || expected.getState()==Lifecycle.STOPPED);
-    }
-
-    @SuppressWarnings("unchecked")
-    protected void setSensor(Object v) {
-        if (Entities.isNoLongerManaged(entity)) {
-            if (Tasks.isInterrupted()) return;
-            log.warn(""+entity+" is not managed; feed "+this+" setting "+sensor+" to "+v+" at this time is not supported ("+Tasks.current()+")");
-        }
-        
-        if (v == FeedConfig.UNCHANGED) {
-            // nothing
-        } else if (v == FeedConfig.REMOVE) {
-            ((EntityInternal)entity).removeAttribute(sensor);
-        } else if (sensor == FeedConfig.NO_SENSOR) {
-            // nothing
-        } else {
-            Object coercedV = TypeCoercions.coerce(v, sensor.getType());
-            if (suppressDuplicates && Objects.equal(coercedV, entity.getAttribute(sensor))) {
-                // no change; nothing
-            } else {
-                entity.setAttribute(sensor, coercedV);
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        return super.toString()+"["+getDescription()+"]";
-    }
-    
-    @Override
-    public String getDescription() {
-        return sensor.getName()+" @ "+entity.getId()+" <- "+config;
-    }
-    
-    protected String getBriefDescription() {
-        return ""+entity+"->"+(sensor==FeedConfig.NO_SENSOR ? "(dynamic sensors)" : ""+sensor);
-    }
-        
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java
deleted file mode 100644
index 7938cc4..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.mgmt.ManagementContext;
-import org.apache.brooklyn.api.sensor.Sensor;
-import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.TemplatedStringAttributeSensorAndConfigKey;
-
-
-/** Simple config adapter for setting {@link AttributeSensorAndConfigKey} sensor values from the config value or config default */ 
-public class ConfigToAttributes {
-
-    //normally just applied once, statically, not registered...
-    public static void apply(EntityLocal entity) {
-        for (Sensor<?> it : entity.getEntityType().getSensors()) {
-            if (it instanceof AttributeSensorAndConfigKey) {
-                apply(entity, (AttributeSensorAndConfigKey<?,?>)it);
-            }
-        }
-    }
-
-    /**
-     * Convenience for ensuring an individual sensor is set from its config key
-     * (e.g. sub-classes of DynamicWebAppCluster that don't want to set HTTP_PORT etc!)
-     */
-    public static <T> T apply(EntityLocal entity, AttributeSensorAndConfigKey<?,T> key) {
-        T v = entity.getAttribute(key);
-        if (v!=null) return v;
-        v = key.getAsSensorValue(entity);
-        if (v!=null) entity.setAttribute(key, v);
-        return v;
-    }
-
-    /**
-     * Convenience for transforming a config value (e.g. processing a {@link TemplatedStringAttributeSensorAndConfigKey}),
-     * outside of the context of an entity.
-     */
-    public static <T> T transform(ManagementContext managementContext, AttributeSensorAndConfigKey<?,T> key) {
-        return key.getAsSensorValue(managementContext);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java
deleted file mode 100644
index 4433e83..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * A poll handler that delegates each call to a set of poll handlers.
- * 
- * @author aled
- */
-public class DelegatingPollHandler<V> implements PollHandler<V> {
-
-    private final List<AttributePollHandler<? super V>> delegates;
-
-    public DelegatingPollHandler(Iterable<AttributePollHandler<? super V>> delegates) {
-        super();
-        this.delegates = ImmutableList.copyOf(delegates);
-    }
-
-    @Override
-    public boolean checkSuccess(V val) {
-        for (AttributePollHandler<? super V> delegate : delegates) {
-            if (!delegate.checkSuccess(val))
-                return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void onSuccess(V val) {
-        for (AttributePollHandler<? super V> delegate : delegates) {
-            delegate.onSuccess(val);
-        }
-    }
-
-    @Override
-    public void onFailure(V val) {
-        for (AttributePollHandler<? super V> delegate : delegates) {
-            delegate.onFailure(val);
-        }
-    }
-
-    @Override
-    public void onException(Exception exception) {
-        for (AttributePollHandler<? super V> delegate : delegates) {
-            delegate.onException(exception);
-        }
-    }
-    
-    @Override
-    public String toString() {
-        return super.toString()+"["+getDescription()+"]";
-    }
-    
-    @Override
-    public String getDescription() {
-        if (delegates.isEmpty())
-            return "(empty delegate list)";
-        if (delegates.size()==1) 
-            return delegates.get(0).getDescription();
-        StringBuilder sb = new StringBuilder();
-        sb.append("[");
-        int count = 0;
-        for (AttributePollHandler<? super V> delegate : delegates) {
-            if (count>0) sb.append("; ");
-            sb.append(delegate.getDescription());
-            if (count>2) {
-                sb.append("; ...");
-                break;
-            }
-            count++;
-        }
-        sb.append("]");
-        return sb.toString();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java
deleted file mode 100644
index 32ea2ab..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.sensor.feed.http.HttpPollConfig;
-import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.javalang.JavaClassNames;
-import org.apache.brooklyn.util.text.Strings;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Objects;
-import com.google.common.base.Predicate;
-
-/**
- * Configuration for a poll, or a subscription etc, that is being added to a feed.
- * 
- * @author aled
- */
-public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
-
-    /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. 
-     * @deprecated since 0.7.0 use UNCHANGED */
-    public static final Object UNSET = Entities.UNCHANGED;
-    /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. */ 
-    public static final Object UNCHANGED = Entities.UNCHANGED;
-    /** The onSuccess or onError functions can return this value to indicate that the sensor value should be removed
-     * (cf 'null', but useful in dynamic situations) */ 
-    public static final Object REMOVE = Entities.REMOVE;
-    
-    /** Indicates that no sensor is being used here. This sensor is suppressed,
-     * but is useful where you want to use the feeds with custom success/exception/failure functions
-     * which directly set multiple sensors, e.g. dynamically based on the poll response.
-     * <p>
-     * See {@link HttpPollConfig#forMultiple()} and its usages.
-     * (It can work for any poll config, but conveniences have not been supplied for others.)  */
-    public static final AttributeSensor<Void> NO_SENSOR = Sensors.newSensor(Void.class, "brooklyn.no.sensor");
-    
-    private final AttributeSensor<T> sensor;
-    private Function<? super V, T> onsuccess;
-    private Function<? super V, T> onfailure;
-    private Function<? super Exception, T> onexception;
-    private Predicate<? super V> checkSuccess;
-    private boolean suppressDuplicates;
-    private boolean enabled = true;
-    
-    public FeedConfig(AttributeSensor<T> sensor) {
-        this.sensor = checkNotNull(sensor, "sensor");
-    }
-
-    public FeedConfig(FeedConfig<V, T, F> other) {
-        this.sensor = other.sensor;
-        this.onsuccess = other.onsuccess;
-        this.onfailure = other.onfailure;
-        this.onexception = other.onexception;
-        this.checkSuccess = other.checkSuccess;
-        this.suppressDuplicates = other.suppressDuplicates;
-        this.enabled = other.enabled;
-    }
-
-    @SuppressWarnings("unchecked")
-    protected F self() {
-        return (F) this;
-    }
-    
-    public AttributeSensor<T> getSensor() {
-        return sensor;
-    }
-
-    public Predicate<? super V> getCheckSuccess() {
-        return checkSuccess;
-    }
-    
-    public Function<? super V, T> getOnSuccess() {
-        return onsuccess;
-    }
-
-    public Function<? super V, T> getOnFailure() {
-        return onfailure;
-    }
-    
-    public Function<? super Exception, T> getOnException() {
-        return onexception;
-    }
-
-    public boolean getSupressDuplicates() {
-        return suppressDuplicates;
-    }
-    
-    public boolean isEnabled() {
-        return enabled;
-    }
-    
-    /** sets the predicate used to check whether a feed run is successful */
-    public F checkSuccess(Predicate<? super V> val) {
-        this.checkSuccess = checkNotNull(val, "checkSuccess");
-        return self();
-    }
-    /** as {@link #checkSuccess(Predicate)} */
-    public F checkSuccess(final Function<? super V,Boolean> val) {
-        return checkSuccess(Functionals.predicate(val));
-    }
-    @SuppressWarnings("unused")
-    /** @deprecated since 0.7.0, kept for rebind */ @Deprecated
-    private F checkSuccessLegacy(final Function<? super V,Boolean> val) {
-        return checkSuccess(new Predicate<V>() {
-            @Override
-            public boolean apply(V input) {
-                return val.apply(input);
-            }
-        });
-    }
-
-    public F onSuccess(Function<? super V,T> val) {
-        this.onsuccess = checkNotNull(val, "onSuccess");
-        return self();
-    }
-    
-    public F setOnSuccess(T val) {
-        return onSuccess(Functions.constant(val));
-    }
-    
-    /** a failure is when the connection is fine (no exception) but the other end returns a result object V 
-     * which the feed can tell indicates a failure (e.g. HTTP code 404) */
-    public F onFailure(Function<? super V,T> val) {
-        this.onfailure = checkNotNull(val, "onFailure");
-        return self();
-    }
-
-    public F setOnFailure(T val) {
-        return onFailure(Functions.constant(val));
-    }
-
-    /** registers a callback to be used {@link #onSuccess(Function)} and {@link #onFailure(Function)}, 
-     * i.e. whenever a result comes back, but not in case of exceptions being thrown (ie problems communicating) */
-    public F onResult(Function<? super V, T> val) {
-        onSuccess(val);
-        return onFailure(val);
-    }
-
-    public F setOnResult(T val) {
-        return onResult(Functions.constant(val));
-    }
-
-    /** an exception is when there is an error in the communication */
-    public F onException(Function<? super Exception,T> val) {
-        this.onexception = checkNotNull(val, "onException");
-        return self();
-    }
-    
-    public F setOnException(T val) {
-        return onException(Functions.constant(val));
-    }
-
-    /** convenience for indicating a behaviour to occur for both
-     * {@link #onException(Function)}
-     * (error connecting) and 
-     * {@link #onFailure(Function)} 
-     * (successful communication but failure report from remote end) */
-    public F onFailureOrException(Function<Object,T> val) {
-        onFailure(val);
-        return onException(val);
-    }
-    
-    public F setOnFailureOrException(T val) {
-        return onFailureOrException(Functions.constant(val));
-    }
-
-    public F suppressDuplicates(boolean val) {
-        suppressDuplicates = val;
-        return self();
-    }
-    
-    /**
-     * Whether this feed is enabled (defaulting to true).
-     */
-    public F enabled(boolean val) {
-        enabled = val;
-        return self();
-    }
-    
-    public boolean hasSuccessHandler() {
-        return this.onsuccess != null;
-    }
-
-    public boolean hasFailureHandler() {
-        return this.onfailure != null;
-    }
-
-    public boolean hasExceptionHandler() {
-        return this.onexception != null;
-    }
-
-    public boolean hasCheckSuccessHandler() {
-        return this.checkSuccess != null;
-    }
-
-    
-    @Override
-    public String toString() {
-        StringBuilder result = new StringBuilder();
-        result.append(toStringBaseName());
-        result.append("[");
-        boolean contents = false;
-        Object source = toStringPollSource();
-        AttributeSensor<T> s = getSensor();
-        if (Strings.isNonBlank(Strings.toString(source))) {
-            result.append(Strings.toUniqueString(source, 40));
-            if (s!=null) {
-                result.append("->");
-                result.append(s.getName());
-            }
-            contents = true;
-        } else if (s!=null) {
-            result.append(s.getName());
-            contents = true;
-        }
-        MutableList<Object> fields = toStringOtherFields();
-        if (fields!=null) {
-            for (Object field: fields) {
-                if (Strings.isNonBlank(Strings.toString(field))) {
-                    if (contents) result.append(";");
-                    contents = true;
-                    result.append(field);
-                }
-            }
-        }
-        result.append("]");
-        return result.toString();
-    }
-
-    /** can be overridden to supply a simpler base name than the class name */
-    protected String toStringBaseName() {
-        return JavaClassNames.simpleClassName(this);
-    }
-    /** can be overridden to supply add'l info for the {@link #toString()}; subclasses can add to the returned value */
-    protected MutableList<Object> toStringOtherFields() {
-        return MutableList.<Object>of();
-    }
-    /** can be overridden to supply add'l info for the {@link #toString()}, placed before the sensor with -> */
-    protected Object toStringPollSource() {
-        return null;
-    }
-    /** all configs should supply a unique tag element, inserted into the feed */
-    protected String getUniqueTag() {
-        return toString();
-    }
-
-    /** returns fields which should be used for equality, including by default {@link #toStringOtherFields()} and {@link #toStringPollSource()};
-     * subclasses can add to the returned value */
-    protected MutableList<Object> equalsFields() {
-        MutableList<Object> result = MutableList.of().appendIfNotNull(getSensor()).appendIfNotNull(toStringPollSource());
-        for (Object field: toStringOtherFields()) result.appendIfNotNull(field);
-        return result;
-    }
-
-    @Override
-    public int hashCode() { 
-        int hc = super.hashCode();
-        for (Object f: equalsFields())
-            hc = Objects.hashCode(hc, f);
-        return hc;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) return true;
-        if (!super.equals(obj)) return false;
-        PollConfig<?,?,?> other = (PollConfig<?,?,?>) obj;
-        if (!Objects.equal(getUniqueTag(), other.getUniqueTag())) return false;
-        if (!Objects.equal(equalsFields(), other.equalsFields())) return false;
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java
deleted file mode 100644
index 01a561b..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.time.Duration;
-
-/**
- * Configuration for polling, which is being added to a feed (e.g. to poll a given URL over http).
- * 
- * @author aled
- */
-public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> {
-
-    private long period = -1;
-    private String description;
-
-    public PollConfig(AttributeSensor<T> sensor) {
-        super(sensor);
-    }
-
-    public PollConfig(PollConfig<V,T,F> other) {
-        super(other);
-        this.period = other.period;
-    }
-
-    public long getPeriod() {
-        return period;
-    }
-    
-    public F period(Duration val) {
-        checkArgument(val.toMilliseconds() >= 0, "period must be greater than or equal to zero");
-        this.period = val.toMilliseconds();
-        return self();
-    }
-    
-    public F period(long val) {
-        checkArgument(val >= 0, "period must be greater than or equal to zero");
-        this.period = val; return self();
-    }
-    
-    public F period(long val, TimeUnit units) {
-        checkArgument(val >= 0, "period must be greater than or equal to zero");
-        return period(units.toMillis(val));
-    }
-    
-    public F description(String description) {
-        this.description = description;
-        return self();
-    }
-    
-    public String getDescription() {
-        return description;
-    }
-    
-    @Override protected MutableList<Object> toStringOtherFields() {
-        return super.toStringOtherFields().appendIfNotNull(description);
-    }
-
-    @Override
-    protected MutableList<Object> equalsFields() {
-        return super.equalsFields().appendIfNotNull(period);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java
deleted file mode 100644
index 175c76f..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-/**
- * Notified by the Poller of the result for each job, on each poll.
- * 
- * @author aled
- */
-public interface PollHandler<V> {
-
-    public boolean checkSuccess(V val);
-
-    public void onSuccess(V val);
-
-    public void onFailure(V val);
-
-    public void onException(Exception exception);
-
-    public String getDescription();
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java
deleted file mode 100644
index f6e8e24..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
-import org.apache.brooklyn.util.core.task.ScheduledTask;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-
-
-/** 
- * For executing periodic polls.
- * Jobs are added to the schedule, and then the poller is started.
- * The jobs will then be executed periodically, and the handler called for the result/failure.
- * 
- * Assumes the schedule+start will be done single threaded, and that stop will not be done concurrently.
- */
-public class Poller<V> {
-    public static final Logger log = LoggerFactory.getLogger(Poller.class);
-
-    private final EntityLocal entity;
-    private final boolean onlyIfServiceUp;
-    private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>();
-    private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>();
-    private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>();
-    private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>();
-    private volatile boolean started = false;
-    
-    private static class PollJob<V> {
-        final PollHandler<? super V> handler;
-        final Duration pollPeriod;
-        final Runnable wrappedJob;
-        private boolean loggedPreviousException = false;
-        
-        PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) {
-            this.handler = handler;
-            this.pollPeriod = period;
-            
-            wrappedJob = new Runnable() {
-                public void run() {
-                    try {
-                        V val = job.call();
-                        loggedPreviousException = false;
-                        if (handler.checkSuccess(val)) {
-                            handler.onSuccess(val);
-                        } else {
-                            handler.onFailure(val);
-                        }
-                    } catch (Exception e) {
-                        if (loggedPreviousException) {
-                            if (log.isTraceEnabled()) log.trace("PollJob for {}, repeated consecutive failures, handling {} using {}", new Object[] {job, e, handler});
-                        } else {
-                            if (log.isDebugEnabled()) log.debug("PollJob for {} handling {} using {}", new Object[] {job, e, handler});
-                            loggedPreviousException = true;
-                        }
-                        handler.onException(e);
-                    }
-                }
-            };
-        }
-    }
-    
-    /** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */
-    @Deprecated
-    public Poller(EntityLocal entity) {
-        this(entity, false);
-    }
-    public Poller(EntityLocal entity, boolean onlyIfServiceUp) {
-        this.entity = entity;
-        this.onlyIfServiceUp = onlyIfServiceUp;
-    }
-    
-    /** Submits a one-off poll job; recommended that callers supply to-String so that task has a decent description */
-    public void submit(Callable<?> job) {
-        if (started) {
-            throw new IllegalStateException("Cannot submit additional tasks after poller has started");
-        }
-        oneOffJobs.add(job);
-    }
-
-    public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long period) {
-        scheduleAtFixedRate(job, handler, Duration.millis(period));
-    }
-    public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) {
-        if (started) {
-            throw new IllegalStateException("Cannot schedule additional tasks after poller has started");
-        }
-        PollJob<V> foo = new PollJob<V>(job, handler, period);
-        pollJobs.add(foo);
-    }
-
-    @SuppressWarnings({ "unchecked" })
-    public void start() {
-        // TODO Previous incarnation of this logged this logged polledSensors.keySet(), but we don't know that anymore
-        // Is that ok, are can we do better?
-        
-        if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this});
-        if (started) { 
-            throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", 
-                    this, entity));
-        }
-        
-        started = true;
-        
-        for (final Callable<?> oneOffJob : oneOffJobs) {
-            Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).name("Poll").description("One-time poll job "+oneOffJob).build();
-            oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
-        }
-        
-        for (final PollJob<V> pollJob : pollJobs) {
-            final String scheduleName = pollJob.handler.getDescription();
-            if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
-                Callable<Task<?>> pollingTaskFactory = new Callable<Task<?>>() {
-                    public Task<?> call() {
-                        DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), 
-                            new Callable<Void>() { public Void call() {
-                                if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
-                                        return null;
-                                }
-                                pollJob.wrappedJob.run();
-                                return null; 
-                            } } );
-                        BrooklynTaskTags.setTransient(task);
-                        return task;
-                    }
-                };
-                ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollJob.pollPeriod, "displayName", "scheduled:"+scheduleName), pollingTaskFactory);
-                tasks.add((ScheduledTask)Entities.submit(entity, task));
-            } else {
-                if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
-            }
-        }
-    }
-    
-    public void stop() {
-        if (log.isDebugEnabled()) log.debug("Stopping poll for {} (using {})", new Object[] {entity, this});
-        if (!started) { 
-            throw new IllegalStateException(String.format("Attempt to stop poller %s of entity %s when not running", 
-                    this, entity));
-        }
-        
-        started = false;
-        for (Task<?> task : oneOffTasks) {
-            if (task != null) task.cancel(true);
-        }
-        for (ScheduledTask task : tasks) {
-            if (task != null) task.cancel();
-        }
-        oneOffTasks.clear();
-        tasks.clear();
-    }
-
-    public boolean isRunning() {
-        boolean hasActiveTasks = false;
-        for (Task<?> task: tasks) {
-            if (task.isBegun() && !task.isDone()) {
-                hasActiveTasks = true;
-                break;
-            }
-        }
-        if (!started && hasActiveTasks) {
-            log.warn("Poller should not be running, but has active tasks, tasks: "+tasks);
-        }
-        return started && hasActiveTasks;
-    }
-    
-    protected boolean isEmpty() {
-        return pollJobs.isEmpty();
-    }
-    
-    public String toString() {
-        return Objects.toStringHelper(this).add("entity", entity).toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java
deleted file mode 100644
index 1cb6861..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed.function;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
-import org.apache.brooklyn.sensor.feed.AttributePollHandler;
-import org.apache.brooklyn.sensor.feed.DelegatingPollHandler;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-
-/**
- * Provides a feed of attribute values, by periodically invoking functions.
- * 
- * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
- * <pre>
- * {@code
- * private FunctionFeed feed;
- * 
- * //@Override
- * protected void connectSensors() {
- *   super.connectSensors();
- *   
- *   feed = FunctionFeed.builder()
- *     .entity(this)
- *     .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
- *         .period(500, TimeUnit.MILLISECONDS)
- *         .callable(new Callable<Boolean>() {
- *             public Boolean call() throws Exception {
- *               return getDriver().isRunning();
- *             }
- *         })
- *         .onExceptionOrFailure(Functions.constant(Boolan.FALSE))
- *     .build();
- * }
- * 
- * {@literal @}Override
- * protected void disconnectSensors() {
- *   super.disconnectSensors();
- *   if (feed != null) feed.stop();
- * }
- * }
- * </pre>
- * 
- * @author aled
- */
-public class FunctionFeed extends AbstractFeed {
-
-    private static final Logger log = LoggerFactory.getLogger(FunctionFeed.class);
-
-    // Treat as immutable once built
-    @SuppressWarnings("serial")
-    public static final ConfigKey<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>> POLLS = ConfigKeys.newConfigKey(
-            new TypeToken<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>>() {},
-            "polls");
-
-    public static Builder builder() {
-        return new Builder();
-    }
-    
-    public static Builder builder(String uniqueTag) {
-        return new Builder().uniqueTag(uniqueTag);
-    }
-    
-    public static class Builder {
-        private EntityLocal entity;
-        private boolean onlyIfServiceUp = false;
-        private long period = 500;
-        private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
-        private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList();
-        private String uniqueTag;
-        private volatile boolean built;
-
-        public Builder entity(EntityLocal val) {
-            this.entity = val;
-            return this;
-        }
-        public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
-        public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { 
-            this.onlyIfServiceUp = onlyIfServiceUp; 
-            return this; 
-        }
-        public Builder period(Duration d) {
-            return period(d.toMilliseconds(), TimeUnit.MILLISECONDS);
-        }
-        public Builder period(long millis) {
-            return period(millis, TimeUnit.MILLISECONDS);
-        }
-        public Builder period(long val, TimeUnit units) {
-            this.period = val;
-            this.periodUnits = units;
-            return this;
-        }
-        public Builder poll(FunctionPollConfig<?,?> config) {
-            polls.add(config);
-            return this;
-        }
-        public Builder uniqueTag(String uniqueTag) {
-            this.uniqueTag = uniqueTag;
-            return this;
-        }
-        public FunctionFeed build() {
-            built = true;
-            FunctionFeed result = new FunctionFeed(this);
-            result.setEntity(checkNotNull(entity, "entity"));
-            result.start();
-            return result;
-        }
-        @Override
-        protected void finalize() {
-            if (!built) log.warn("FunctionFeed.Builder created, but build() never called");
-        }
-    }
-    
-    private static class FunctionPollIdentifier {
-        final Callable<?> job;
-
-        private FunctionPollIdentifier(Callable<?> job) {
-            this.job = checkNotNull(job, "job");
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(job);
-        }
-        
-        @Override
-        public boolean equals(Object other) {
-            return (other instanceof FunctionPollIdentifier) && Objects.equal(job, ((FunctionPollIdentifier)other).job);
-        }
-    }
-
-    /**
-     * For rebind; do not call directly; use builder
-     */
-    public FunctionFeed() {
-    }
-    
-    protected FunctionFeed(Builder builder) {
-        setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
-        
-        SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>> polls = HashMultimap.<FunctionPollIdentifier,FunctionPollConfig<?,?>>create();
-        for (FunctionPollConfig<?,?> config : builder.polls) {
-            if (!config.isEnabled()) continue;
-            @SuppressWarnings({ "rawtypes", "unchecked" })
-            FunctionPollConfig<?,?> configCopy = new FunctionPollConfig(config);
-            if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
-            Callable<?> job = config.getCallable();
-            polls.put(new FunctionPollIdentifier(job), configCopy);
-        }
-        setConfig(POLLS, polls);
-        initUniqueTag(builder.uniqueTag, polls.values());
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Override
-    protected void preStart() {
-        SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?, ?>> polls = getConfig(POLLS);
-        for (final FunctionPollIdentifier pollInfo : polls.keySet()) {
-            Set<FunctionPollConfig<?,?>> configs = polls.get(pollInfo);
-            long minPeriod = Integer.MAX_VALUE;
-            Set<AttributePollHandler<?>> handlers = Sets.newLinkedHashSet();
-
-            for (FunctionPollConfig<?,?> config : configs) {
-                handlers.add(new AttributePollHandler(config, entity, this));
-                if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
-            }
-            
-            getPoller().scheduleAtFixedRate(
-                    (Callable)pollInfo.job,
-                    new DelegatingPollHandler(handlers), 
-                    minPeriod);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java
deleted file mode 100644
index 7b91988..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed.function;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import groovy.lang.Closure;
-
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.sensor.feed.FeedConfig;
-import org.apache.brooklyn.sensor.feed.PollConfig;
-import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.javalang.JavaClassNames;
-
-import com.google.common.base.Supplier;
-
-public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfig<S, T>> {
-
-    private Callable<?> callable;
-    
-    public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> sensor) {
-        return new FunctionPollConfig<Object, T>(sensor);
-    }
-    
-    public FunctionPollConfig(AttributeSensor<T> sensor) {
-        super(sensor);
-    }
-
-    public FunctionPollConfig(FunctionPollConfig<S, T> other) {
-        super(other);
-        callable = other.callable;
-    }
-    
-    public Callable<? extends Object> getCallable() {
-        return callable;
-    }
-    
-    /**
-     * The {@link Callable} to be invoked on each poll.
-     * <p>
-     * Note this <em>must</em> use generics, otherwise the return type of subsequent chained
-     * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will
-     * return the wrong type.
-     */
-    @SuppressWarnings("unchecked")
-    public <newS> FunctionPollConfig<newS, T> callable(Callable<? extends newS> val) {
-        this.callable = checkNotNull(val, "callable");
-        return (FunctionPollConfig<newS, T>) this;
-    }
-    
-    /**
-     * Supplies the value to be returned by each poll.
-     * <p>
-     * Note this <em>must</em> use generics, otherwise the return type of subsequent chained
-     * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will
-     * return the wrong type.
-     */
-    @SuppressWarnings("unchecked")
-    public <newS> FunctionPollConfig<newS, T> supplier(final Supplier<? extends newS> val) {
-        this.callable = Functionals.callable( checkNotNull(val, "supplier") );
-        return (FunctionPollConfig<newS, T>) this;
-    }
-    
-    /** @deprecated since 0.7.0, kept for legacy compatibility when deserializing */
-    @SuppressWarnings({ "unchecked", "unused" })
-    private <newS> FunctionPollConfig<newS, T> supplierLegacy(final Supplier<? extends newS> val) {
-        checkNotNull(val, "supplier");
-        this.callable = new Callable<newS>() {
-            @Override
-            public newS call() throws Exception {
-                return val.get();
-            }
-        };
-        return (FunctionPollConfig<newS, T>) this;
-    }
-
-    public FunctionPollConfig<S, T> closure(Closure<?> val) {
-        this.callable = GroovyJavaMethods.callableFromClosure(checkNotNull(val, "closure"));
-        return this;
-    }
-
-    @Override protected String toStringBaseName() { return "fn"; }
-    @Override protected String toStringPollSource() {
-        if (callable==null) return null;
-        String cs = callable.toString();
-        if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) {
-            return cs;
-        }
-        // if hashcode is in callable it's probably a custom internal; return class name
-        return JavaClassNames.simpleClassName(callable);
-    }
-
-}