You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:11 UTC

[23/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to o.a.b.feed and o.a.b.core.feed

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterFeed.java
deleted file mode 100644
index e9767d9..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterFeed.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed.windows;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import io.cloudsoft.winrm4j.winrm.WinRmToolResponse;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.mgmt.ExecutionContext;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.effector.EffectorTasks;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.location.winrm.WinRmMachineLocation;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
-import org.apache.brooklyn.sensor.feed.PollHandler;
-import org.apache.brooklyn.sensor.feed.Poller;
-import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-
-/**
- * A sensor feed that retrieves performance counters from a Windows host and posts the values to sensors.
- *
- * <p>To use this feed, you must provide the entity, and a collection of mappings between Windows performance counter
- * names and Brooklyn attribute sensors.</p>
- *
- * <p>This feed uses SSH to invoke the windows utility <tt>typeperf</tt> to query for a specific set of performance
- * counters, by name. The values are extracted from the response, and published to the entity's sensors.</p>
- *
- * <p>Example:</p>
- *
- * {@code
- * @Override
- * protected void connectSensors() {
- *     WindowsPerformanceCounterFeed feed = WindowsPerformanceCounterFeed.builder()
- *         .entity(entity)
- *         .addSensor("\\Processor(_total)\\% Idle Time", CPU_IDLE_TIME)
- *         .addSensor("\\Memory\\Available MBytes", AVAILABLE_MEMORY)
- *         .build();
- * }
- * }
- *
- * @since 0.6.0
- * @author richardcloudsoft
- */
-public class WindowsPerformanceCounterFeed extends AbstractFeed {
-
-    private static final Logger log = LoggerFactory.getLogger(WindowsPerformanceCounterFeed.class);
-
-    // This pattern matches CSV line(s) with the date in the first field, and at least one further field.
-    protected static final Pattern lineWithPerfData = Pattern.compile("^\"[\\d:/\\-. ]+\",\".*\"$", Pattern.MULTILINE);
-    private static final Joiner JOINER_ON_SPACE = Joiner.on(' ');
-    private static final Joiner JOINER_ON_COMMA = Joiner.on(',');
-    private static final int OUTPUT_COLUMN_WIDTH = 100;
-
-    @SuppressWarnings("serial")
-    public static final ConfigKey<Collection<WindowsPerformanceCounterPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
-            new TypeToken<Collection<WindowsPerformanceCounterPollConfig<?>>>() {},
-            "polls");
-
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    public static class Builder {
-        private EntityLocal entity;
-        private Set<WindowsPerformanceCounterPollConfig<?>> polls = Sets.newLinkedHashSet();
-        private Duration period = Duration.of(30, TimeUnit.SECONDS);
-        private String uniqueTag;
-        private volatile boolean built;
-
-        public Builder entity(EntityLocal val) {
-            this.entity = checkNotNull(val, "entity");
-            return this;
-        }
-        public Builder addSensor(WindowsPerformanceCounterPollConfig<?> config) {
-            polls.add(config);
-            return this;
-        }
-        public Builder addSensor(String performanceCounterName, AttributeSensor<?> sensor) {
-            return addSensor(new WindowsPerformanceCounterPollConfig(sensor).performanceCounterName(checkNotNull(performanceCounterName, "performanceCounterName")));
-        }
-        public Builder addSensors(Map<String, AttributeSensor> sensors) {
-            for (Map.Entry<String, AttributeSensor> entry : sensors.entrySet()) {
-                addSensor(entry.getKey(), entry.getValue());
-            }
-            return this;
-        }
-        public Builder period(Duration period) {
-            this.period = checkNotNull(period, "period");
-            return this;
-        }
-        public Builder period(long millis) {
-            return period(millis, TimeUnit.MILLISECONDS);
-        }
-        public Builder period(long val, TimeUnit units) {
-            return period(Duration.of(val, units));
-        }
-        public Builder uniqueTag(String uniqueTag) {
-            this.uniqueTag = uniqueTag;
-            return this;
-        }
-        public WindowsPerformanceCounterFeed build() {
-            built = true;
-            WindowsPerformanceCounterFeed result = new WindowsPerformanceCounterFeed(this);
-            result.setEntity(checkNotNull(entity, "entity"));
-            result.start();
-            return result;
-        }
-        @Override
-        protected void finalize() {
-            if (!built) log.warn("WindowsPerformanceCounterFeed.Builder created, but build() never called");
-        }
-    }
-
-    /**
-     * For rebind; do not call directly; use builder
-     */
-    public WindowsPerformanceCounterFeed() {
-    }
-
-    protected WindowsPerformanceCounterFeed(Builder builder) {
-        List<WindowsPerformanceCounterPollConfig<?>> polls = Lists.newArrayList();
-        for (WindowsPerformanceCounterPollConfig<?> config : builder.polls) {
-            if (!config.isEnabled()) continue;
-            @SuppressWarnings({ "unchecked", "rawtypes" })
-            WindowsPerformanceCounterPollConfig<?> configCopy = new WindowsPerformanceCounterPollConfig(config);
-            if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
-            polls.add(configCopy);
-        }
-        config().set(POLLS, polls);
-        initUniqueTag(builder.uniqueTag, polls);
-    }
-
-    @Override
-    protected void preStart() {
-        Collection<WindowsPerformanceCounterPollConfig<?>> polls = getConfig(POLLS);
-        
-        long minPeriod = Integer.MAX_VALUE;
-        List<String> performanceCounterNames = Lists.newArrayList();
-        for (WindowsPerformanceCounterPollConfig<?> config : polls) {
-            minPeriod = Math.min(minPeriod, config.getPeriod());
-            performanceCounterNames.add(config.getPerformanceCounterName());
-        }
-        
-        Iterable<String> allParams = ImmutableList.<String>builder()
-                .add("(Get-Counter")
-                .add("-Counter")
-                .add(JOINER_ON_COMMA.join(Iterables.transform(performanceCounterNames, QuoteStringFunction.INSTANCE)))
-                .add("-SampleInterval")
-                .add("2") // TODO: extract SampleInterval as a config key
-                .add(").CounterSamples")
-                .add("|")
-                .add("Format-Table")
-                .add(String.format("@{Expression={$_.Path};width=%d},@{Expression={$_.CookedValue};width=%<d}", OUTPUT_COLUMN_WIDTH))
-                .add("-HideTableHeaders")
-                .add("|")
-                .add("Out-String")
-                .add("-Width")
-                .add(String.valueOf(OUTPUT_COLUMN_WIDTH * 2))
-                .build();
-        String command = JOINER_ON_SPACE.join(allParams);
-        log.debug("Windows performance counter poll command for {} will be: {}", entity, command);
-
-        GetPerformanceCountersJob<WinRmToolResponse> job = new GetPerformanceCountersJob(getEntity(), command);
-        getPoller().scheduleAtFixedRate(
-                new CallInEntityExecutionContext(entity, job),
-                new SendPerfCountersToSensors(getEntity(), polls),
-                minPeriod);
-    }
-
-    private static class GetPerformanceCountersJob<T> implements Callable<T> {
-
-        private final Entity entity;
-        private final String command;
-
-        GetPerformanceCountersJob(Entity entity, String command) {
-            this.entity = entity;
-            this.command = command;
-        }
-
-        @Override
-        public T call() throws Exception {
-            WinRmMachineLocation machine = EffectorTasks.getWinRmMachine(entity);
-            WinRmToolResponse response = machine.executePsScript(command);
-            return (T)response;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    protected Poller<WinRmToolResponse> getPoller() {
-        return (Poller<WinRmToolResponse>) super.getPoller();
-    }
-
-    /**
-     * A {@link java.util.concurrent.Callable} that wraps another {@link java.util.concurrent.Callable}, where the
-     * inner {@link java.util.concurrent.Callable} is executed in the context of a
-     * specific entity.
-     *
-     * @param <T> The type of the {@link java.util.concurrent.Callable}.
-     */
-    private static class CallInEntityExecutionContext<T> implements Callable<T> {
-        private final Callable<T> job;
-        private EntityLocal entity;
-
-        private CallInEntityExecutionContext(EntityLocal entity, Callable<T> job) {
-            this.job = job;
-            this.entity = entity;
-        }
-
-        @Override
-        public T call() throws Exception {
-            ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext();
-            return executionContext.submit(Maps.newHashMap(), job).get();
-        }
-    }
-
-    @VisibleForTesting
-    static class SendPerfCountersToSensors implements PollHandler<WinRmToolResponse> {
-        private final EntityLocal entity;
-        private final List<WindowsPerformanceCounterPollConfig<?>> polls;
-        private final Set<AttributeSensor<?>> failedAttributes = Sets.newLinkedHashSet();
-        private static final Pattern MACHINE_NAME_LOOKBACK_PATTERN = Pattern.compile(String.format("(?<=\\\\\\\\.{0,%d})\\\\.*", OUTPUT_COLUMN_WIDTH));
-        
-        public SendPerfCountersToSensors(EntityLocal entity, Collection<WindowsPerformanceCounterPollConfig<?>> polls) {
-            this.entity = entity;
-            this.polls = ImmutableList.copyOf(polls);
-        }
-
-        @Override
-        public boolean checkSuccess(WinRmToolResponse val) {
-            // TODO not just using statusCode; also looking at absence of stderr.
-            // Status code is (empirically) unreliable: it returns 0 sometimes even when failed 
-            // (but never returns non-zero on success).
-            if (val.getStatusCode() != 0) return false;
-            String stderr = val.getStdErr();
-            if (stderr == null || stderr.length() != 0) return false;
-            String out = val.getStdOut();
-            if (out == null || out.length() == 0) return false;
-            return true;
-        }
-
-        @Override
-        public void onSuccess(WinRmToolResponse val) {
-            for (String pollResponse : val.getStdOut().split("\r\n")) {
-                if (Strings.isNullOrEmpty(pollResponse)) {
-                    continue;
-                }
-                String path = pollResponse.substring(0, OUTPUT_COLUMN_WIDTH - 1);
-                // The performance counter output prepends the sensor name with "\\<machinename>" so we need to remove it
-                Matcher machineNameLookbackMatcher = MACHINE_NAME_LOOKBACK_PATTERN.matcher(path);
-                if (!machineNameLookbackMatcher.find()) {
-                    continue;
-                }
-                String name = machineNameLookbackMatcher.group(0).trim();
-                String rawValue = pollResponse.substring(OUTPUT_COLUMN_WIDTH).replaceAll("^\\s+", "");
-                WindowsPerformanceCounterPollConfig<?> config = getPollConfig(name);
-                Class<?> clazz = config.getSensor().getType();
-                AttributeSensor<Object> attribute = (AttributeSensor<Object>) Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription());
-                try {
-                    Object value = TypeCoercions.coerce(rawValue, TypeToken.of(clazz));
-                    entity.setAttribute(attribute, value);
-                } catch (Exception e) {
-                    Exceptions.propagateIfFatal(e);
-                    if (failedAttributes.add(attribute)) {
-                        log.warn("Failed to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute});
-                    } else {
-                        if (log.isTraceEnabled()) log.trace("Failed (repeatedly) to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute});
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void onFailure(WinRmToolResponse val) {
-            log.error("Windows Performance Counter query did not respond as expected. exitcode={} stdout={} stderr={}",
-                    new Object[]{val.getStatusCode(), val.getStdOut(), val.getStdErr()});
-            for (WindowsPerformanceCounterPollConfig<?> config : polls) {
-                Class<?> clazz = config.getSensor().getType();
-                AttributeSensor<?> attribute = Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription());
-                entity.setAttribute(attribute, null);
-            }
-        }
-
-        @Override
-        public void onException(Exception exception) {
-            log.error("Detected exception while retrieving Windows Performance Counters from entity " +
-                    entity.getDisplayName(), exception);
-            for (WindowsPerformanceCounterPollConfig<?> config : polls) {
-                entity.setAttribute(Sensors.newSensor(config.getSensor().getClass(), config.getPerformanceCounterName(), config.getDescription()), null);
-            }
-        }
-
-        @Override
-        public String getDescription() {
-            return "" + polls;
-        }
-
-        @Override
-        public String toString() {
-            return super.toString()+"["+getDescription()+"]";
-        }
-
-        private WindowsPerformanceCounterPollConfig<?> getPollConfig(String sensorName) {
-            for (WindowsPerformanceCounterPollConfig<?> poll : polls) {
-                if (poll.getPerformanceCounterName().equalsIgnoreCase(sensorName)) {
-                    return poll;
-                }
-            }
-            throw new IllegalStateException(String.format("%s not found in configured polls: %s", sensorName, polls));
-        }
-    }
-
-    static class PerfCounterValueIterator implements Iterator<String> {
-
-        // This pattern matches the contents of the first field, and optionally matches the rest of the line as
-        // further fields. Feed the second match back into the pattern again to get the next field, and repeat until
-        // all fields are discovered.
-        protected static final Pattern splitPerfData = Pattern.compile("^\"([^\\\"]*)\"((,\"[^\\\"]*\")*)$");
-
-        private Matcher matcher;
-
-        public PerfCounterValueIterator(String input) {
-            matcher = splitPerfData.matcher(input);
-            // Throw away the first element (the timestamp) (and also confirm that we have a pattern match)
-            checkArgument(hasNext(), "input "+input+" does not match expected pattern "+splitPerfData.pattern());
-            next();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return matcher != null && matcher.find();
-        }
-
-        @Override
-        public String next() {
-            String next = matcher.group(1);
-
-            String remainder = matcher.group(2);
-            if (!Strings.isNullOrEmpty(remainder)) {
-                assert remainder.startsWith(",");
-                remainder = remainder.substring(1);
-                matcher = splitPerfData.matcher(remainder);
-            } else {
-                matcher = null;
-            }
-
-            return next;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private static enum QuoteStringFunction implements Function<String, String> {
-        INSTANCE;
-
-        @Nullable
-        @Override
-        public String apply(@Nullable String input) {
-            return input != null ? "\"" + input + "\"" : null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterPollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterPollConfig.java
deleted file mode 100644
index 34bc08c..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterPollConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed.windows;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.sensor.feed.PollConfig;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-
-public class WindowsPerformanceCounterPollConfig<T> extends PollConfig<Object, T, WindowsPerformanceCounterPollConfig<T>>{
-
-    private String performanceCounterName;
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    public WindowsPerformanceCounterPollConfig(AttributeSensor<T> sensor) {
-        super(sensor);
-        description(sensor.getDescription());
-        onSuccess((Function)Functions.identity());
-    }
-
-    public WindowsPerformanceCounterPollConfig(WindowsPerformanceCounterPollConfig<T> other) {
-        super(other);
-        this.performanceCounterName = other.performanceCounterName;
-    }
-
-    public String getPerformanceCounterName() {
-        return performanceCounterName;
-    }
-    
-    public WindowsPerformanceCounterPollConfig<T> performanceCounterName(String val) {
-        this.performanceCounterName = val; return this;
-    }
-
-    @Override protected String toStringPollSource() { return performanceCounterName; }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java b/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java
index 9e8e061..ad768f7 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.brooklyn.sensor.feed.http.HttpPollValue;
+import org.apache.brooklyn.feed.http.HttpPollValue;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.stream.Streams;
 import org.apache.brooklyn.util.time.Duration;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/core/feed/ConfigToAttributesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/ConfigToAttributesTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/ConfigToAttributesTest.java
new file mode 100644
index 0000000..1cc48df
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/ConfigToAttributesTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.feed.ConfigToAttributes;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
+import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
+import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.sensor.TemplatedStringAttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ConfigToAttributesTest {
+
+    private ManagementContextInternal managementContext;
+
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        managementContext = new LocalManagementContext();
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (managementContext != null) Entities.destroyAll(managementContext);
+    }
+    
+    @Test
+    public void testApplyTemplatedConfigWithEntity() {
+        TestApplication app = managementContext.getEntityManager().createEntity(EntitySpec.create(TestApplication.class)
+                .configure(TestEntity.CONF_NAME, "myval"));
+        Entities.startManagement(app, managementContext);
+        
+        BasicAttributeSensorAndConfigKey<String> key = new TemplatedStringAttributeSensorAndConfigKey("mykey", "my descr", "${config['test.confName']!'notfound'}");
+        String val = ConfigToAttributes.apply(app, key);
+        assertEquals(app.getAttribute(key), val);
+        assertEquals(val, "myval");
+
+    }
+    
+    @Test
+    public void testApplyTemplatedConfigWithManagementContext() {
+        managementContext.getBrooklynProperties().put(TestEntity.CONF_NAME, "myglobalval");
+        BasicAttributeSensorAndConfigKey<String> key = new TemplatedStringAttributeSensorAndConfigKey("mykey", "my descr", "${config['test.confName']!'notfound'}");
+        String val = ConfigToAttributes.transform(managementContext, key);
+        assertEquals(val, "myglobalval");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
new file mode 100644
index 0000000..0f2c1ce
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.feed.PollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PollerTest extends BrooklynAppUnitTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PollerTest.class);
+
+    private TestEntity entity;
+    private Poller<Integer> poller;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        poller = new Poller<Integer>(entity, false);
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        if (poller != null) poller.stop();
+        super.tearDown();
+    }
+    
+    @Test(groups={"Integration", "WIP"}) // because takes > 1 second
+    public void testPollingSubTaskFailsOnceKeepsGoing() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+        poller.scheduleAtFixedRate(
+                new Callable<Integer>() {
+                    @Override public Integer call() throws Exception {
+                        int result = counter.incrementAndGet();
+                        if (result % 2 == 0) {
+                            DynamicTasks.queue("in-poll", new Runnable() {
+                                public void run() {
+                                    throw new IllegalStateException("Simulating error in sub-task for poll");
+                                }});
+                        }
+                        return result;
+                    }
+                },
+                new PollHandler<Integer>() {
+                    @Override public boolean checkSuccess(Integer val) {
+                        return true;
+                    }
+                    @Override public void onSuccess(Integer val) {
+                        
+                    }
+                    @Override public void onFailure(Integer val) {
+                    }
+                    @Override
+                    public void onException(Exception exception) {
+                        LOG.info("Exception in test poller", exception);
+                    }
+                    @Override public String getDescription() {
+                        return "mypollhandler";
+                    }
+                }, 
+                new Duration(10, TimeUnit.MILLISECONDS));
+        poller.start();
+        
+        Asserts.succeedsContinually(MutableMap.of("timeout", 2*1000, "period", 500), new Runnable() {
+            int oldCounter = -1;
+            @Override public void run() {
+                assertTrue(counter.get() > oldCounter);
+                oldCounter = counter.get();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java b/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java
index a5b4294..b8d8c35 100644
--- a/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java
+++ b/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java
@@ -21,11 +21,11 @@ package org.apache.brooklyn.core.location;
 import static org.testng.Assert.assertEquals;
 
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.feed.ConfigToAttributes;
 import org.apache.brooklyn.core.location.PortRanges;
 import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
-import org.apache.brooklyn.sensor.feed.ConfigToAttributes;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java
index f0c6551..4a724e4 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java
@@ -35,14 +35,14 @@ import org.apache.brooklyn.core.mgmt.internal.BrooklynGarbageCollector;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.test.entity.TestEntityImpl.TestEntityWithoutEnrichers;
-import org.apache.brooklyn.sensor.feed.function.FunctionFeed;
-import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig;
-import org.apache.brooklyn.sensor.feed.http.HttpFeed;
-import org.apache.brooklyn.sensor.feed.http.HttpPollConfig;
-import org.apache.brooklyn.sensor.feed.http.HttpValueFunctions;
-import org.apache.brooklyn.sensor.feed.ssh.SshFeed;
-import org.apache.brooklyn.sensor.feed.ssh.SshPollConfig;
-import org.apache.brooklyn.sensor.feed.ssh.SshValueFunctions;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.feed.ssh.SshFeed;
+import org.apache.brooklyn.feed.ssh.SshPollConfig;
+import org.apache.brooklyn.feed.ssh.SshValueFunctions;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.test.EntityTestUtils;
 import org.apache.brooklyn.util.collections.MutableList;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/function/FunctionFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/function/FunctionFeedTest.java b/core/src/test/java/org/apache/brooklyn/feed/function/FunctionFeedTest.java
new file mode 100644
index 0000000..c362e4e6
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/function/FunctionFeedTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.function;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Feed;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.EntityInternal.FeedSupport;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionFeedTest;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicates;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Callables;
+
+public class FunctionFeedTest extends BrooklynAppUnitTestSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(FunctionFeedTest.class);
+    
+    final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString", "");
+    final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor("aLong", "");
+
+    private Location loc;
+    private EntityLocal entity;
+    private FunctionFeed feed;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        loc = new LocalhostMachineProvisioningLocation();
+        entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        app.start(ImmutableList.of(loc));
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        if (feed != null) feed.stop();
+        super.tearDown();
+    }
+    
+    @Test
+    public void testPollsFunctionRepeatedlyToSetAttribute() throws Exception {
+        feed = FunctionFeed.builder()
+                .entity(entity)
+                .poll(new FunctionPollConfig<Integer,Integer>(SENSOR_INT)
+                        .period(1)
+                        .callable(new IncrementingCallable())
+                        //.onSuccess((Function<Object,Integer>)(Function)Functions.identity()))
+                        )
+                .build();
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override
+            public void run() {
+                Integer val = entity.getAttribute(SENSOR_INT);
+                assertTrue(val != null && val > 2, "val=" + val);
+            }
+        });
+    }
+    
+    @Test
+    public void testFeedDeDupe() throws Exception {
+        testPollsFunctionRepeatedlyToSetAttribute();
+        entity.addFeed(feed);
+        log.info("Feed 0 is: "+feed);
+        Feed feed0 = feed;
+        
+        testPollsFunctionRepeatedlyToSetAttribute();
+        entity.addFeed(feed);
+        log.info("Feed 1 is: "+feed);
+        Feed feed1 = feed;
+        Assert.assertFalse(feed1==feed0);
+
+        FeedSupport feeds = ((EntityInternal)entity).feeds();
+        Assert.assertEquals(feeds.getFeeds().size(), 1, "Wrong feed count: "+feeds.getFeeds());
+
+        // a couple extra checks, compared to the de-dupe test in other *FeedTest classes
+        Feed feedAdded = Iterables.getOnlyElement(feeds.getFeeds());
+        Assert.assertTrue(feedAdded==feed1);
+        Assert.assertFalse(feedAdded==feed0);
+    }
+    
+    @Test
+    public void testFeedDeDupeIgnoresSameObject() throws Exception {
+        testPollsFunctionRepeatedlyToSetAttribute();
+        entity.addFeed(feed);
+        assertFeedIsPolling();
+        entity.addFeed(feed);
+        assertFeedIsPollingContinuously();
+    }
+
+    @Test
+    public void testCallsOnSuccessWithResultOfCallable() throws Exception {
+        feed = FunctionFeed.builder()
+                .entity(entity)
+                .poll(new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+                        .period(1)
+                        .callable(Callables.returning(123))
+                        .onSuccess(new AddOneFunction()))
+                .build();
+
+        EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, 124);
+    }
+    
+    @Test
+    public void testCallsOnExceptionWithExceptionFromCallable() throws Exception {
+        final String errMsg = "my err msg";
+        
+        feed = FunctionFeed.builder()
+                .entity(entity)
+                .poll(new FunctionPollConfig<Object, String>(SENSOR_STRING)
+                        .period(1)
+                        .callable(new ExceptionCallable(errMsg))
+                        .onException(new ToStringFunction()))
+                .build();
+
+        Asserts.succeedsEventually(new Runnable() {
+            @Override
+            public void run() {
+                String val = entity.getAttribute(SENSOR_STRING);
+                assertTrue(val != null && val.contains(errMsg), "val=" + val);
+            }
+        });
+    }
+
+    @Test
+    public void testCallsOnFailureWithResultOfCallable() throws Exception {
+        feed = FunctionFeed.builder()
+                .entity(entity)
+                .poll(new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+                        .period(1)
+                        .callable(Callables.returning(1))
+                        .checkSuccess(Predicates.alwaysFalse())
+                        .onSuccess(new AddOneFunction())
+                        .onFailure(Functions.constant(-1)))
+                .build();
+
+        EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, -1);
+    }
+
+    @Test
+    public void testCallsOnExceptionWhenCheckSuccessIsFalseButNoFailureHandler() throws Exception {
+        feed = FunctionFeed.builder()
+                .entity(entity)
+                .poll(new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+                        .period(1)
+                        .callable(Callables.returning(1))
+                        .checkSuccess(Predicates.alwaysFalse())
+                        .onSuccess(new AddOneFunction())
+                        .onException(Functions.constant(-1)))
+                .build();
+
+        EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, -1);
+    }
+    
+    @Test
+    public void testSharesFunctionWhenMultiplePostProcessors() throws Exception {
+        final IncrementingCallable incrementingCallable = new IncrementingCallable();
+        final List<Integer> ints = new CopyOnWriteArrayList<Integer>();
+        final List<String> strings = new CopyOnWriteArrayList<String>();
+        
+        entity.subscribe(entity, SENSOR_INT, new SensorEventListener<Integer>() {
+                @Override public void onEvent(SensorEvent<Integer> event) {
+                    ints.add(event.getValue());
+                }});
+        entity.subscribe(entity, SENSOR_STRING, new SensorEventListener<String>() {
+                @Override public void onEvent(SensorEvent<String> event) {
+                    strings.add(event.getValue());
+                }});
+        
+        feed = FunctionFeed.builder()
+                .entity(entity)
+                .poll(new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+                        .period(10)
+                        .callable(incrementingCallable))
+                .poll(new FunctionPollConfig<Integer, String>(SENSOR_STRING)
+                        .period(10)
+                        .callable(incrementingCallable)
+                        .onSuccess(new ToStringFunction()))
+                .build();
+
+        Asserts.succeedsEventually(new Runnable() {
+            @Override
+            public void run() {
+                assertEquals(ints.subList(0, 2), ImmutableList.of(0, 1));
+                assertTrue(strings.size()>=2, "wrong strings list: "+strings);
+                assertEquals(strings.subList(0, 2), ImmutableList.of("0", "1"), "wrong strings list: "+strings);
+            }});
+    }
+    
+    @Test
+    @SuppressWarnings("unused")
+    public void testFunctionPollConfigBuilding() throws Exception {
+        FunctionPollConfig<Integer, Integer> typeFromCallable = FunctionPollConfig.forSensor(SENSOR_INT)
+                .period(1)
+                .callable(Callables.returning(1))
+                .onSuccess(Functions.constant(-1));
+
+        FunctionPollConfig<Integer, Integer> typeFromSupplier = FunctionPollConfig.forSensor(SENSOR_INT)
+                .period(1)
+                .supplier(Suppliers.ofInstance(1))
+                .onSuccess(Functions.constant(-1));
+
+        FunctionPollConfig<Integer, Integer> usingConstructor = new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+                .period(1)
+                .supplier(Suppliers.ofInstance(1))
+                .onSuccess(Functions.constant(-1));
+
+        FunctionPollConfig<Integer, Integer> usingConstructorWithFailureOrException = new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+                .period(1)
+                .supplier(Suppliers.ofInstance(1))
+                .onFailureOrException(Functions.<Integer>constant(null));
+    }
+    
+    
+    private void assertFeedIsPolling() {
+        final Integer val = entity.getAttribute(SENSOR_INT);
+        Asserts.succeedsEventually(new Runnable() {
+            @Override
+            public void run() {
+                assertNotEquals(val, entity.getAttribute(SENSOR_INT));
+            }
+        });
+    }
+    
+    private void assertFeedIsPollingContinuously() {
+        Asserts.succeedsContinually(new Runnable() {
+            @Override
+            public void run() {
+                assertFeedIsPolling();
+            }
+        });
+    }
+
+    private static class IncrementingCallable implements Callable<Integer> {
+        private final AtomicInteger next = new AtomicInteger(0);
+        
+        @Override public Integer call() {
+            return next.getAndIncrement();
+        }
+    }
+    
+    private static class AddOneFunction implements Function<Integer, Integer> {
+        @Override public Integer apply(@Nullable Integer input) {
+            return (input != null) ? (input + 1) : null;
+        }
+    }
+    
+    private static class ExceptionCallable implements Callable<Void> {
+        private final String msg;
+        ExceptionCallable(String msg) {
+            this.msg = msg;
+        }
+        @Override public Void call() {
+            throw new RuntimeException(msg);
+        }
+    }
+    
+    public static class ToStringFunction implements Function<Object, String> {
+        @Override public String apply(@Nullable Object input) {
+            return (input != null) ? (input.toString()) : null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedIntegrationTest.java
new file mode 100644
index 0000000..ee7e226
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedIntegrationTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.net.URI;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.location.PortRanges;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.HttpService;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableList;
+
+public class HttpFeedIntegrationTest extends BrooklynAppUnitTestSupport {
+
+    final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString", "");
+    final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor("aLong", "");
+
+    private HttpService httpService;
+
+    private Location loc;
+    private EntityLocal entity;
+    private HttpFeed feed;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        loc = new LocalhostMachineProvisioningLocation();
+        entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        app.start(ImmutableList.of(loc));
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        if (feed != null) feed.stop();
+        if (httpService != null) httpService.shutdown();
+        super.tearDown();
+    }
+
+    @Test(groups = {"Integration"})
+    public void testPollsAndParsesHttpGetResponseWithSsl() throws Exception {
+        httpService = new HttpService(PortRanges.fromString("9000+"), true).start();
+        URI baseUrl = new URI(httpService.getUrl());
+
+        assertEquals(baseUrl.getScheme(), "https", "baseUrl="+baseUrl);
+        
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUri(baseUrl)
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode()))
+                .poll(new HttpPollConfig<String>(SENSOR_STRING)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.stringContentsFunction()))
+                .build();
+        
+        EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, 200);
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                String val = entity.getAttribute(SENSOR_STRING);
+                assertTrue(val != null && val.contains("Hello, World"), "val="+val);
+            }});
+    }
+
+    @Test(groups = {"Integration"})
+    public void testPollsAndParsesHttpGetResponseWithBasicAuthentication() throws Exception {
+        final String username = "brooklyn";
+        final String password = "hunter2";
+        httpService = new HttpService(PortRanges.fromString("9000+"))
+                .basicAuthentication(username, password)
+                .start();
+        URI baseUrl = new URI(httpService.getUrl());
+        assertEquals(baseUrl.getScheme(), "http", "baseUrl="+baseUrl);
+
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUri(baseUrl)
+                .credentials(username, password)
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode()))
+                .poll(new HttpPollConfig<String>(SENSOR_STRING)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.stringContentsFunction()))
+                .build();
+
+        EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, 200);
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                String val = entity.getAttribute(SENSOR_STRING);
+                assertTrue(val != null && val.contains("Hello, World"), "val="+val);
+            }});
+    }
+
+    @Test(groups = {"Integration"})
+    public void testPollWithInvalidCredentialsFails() throws Exception {
+        httpService = new HttpService(PortRanges.fromString("9000+"))
+                .basicAuthentication("brooklyn", "hunter2")
+                .start();
+
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUri(httpService.getUrl())
+                .credentials("brooklyn", "9876543210")
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode())
+                        .onFailure(HttpValueFunctions.responseCode()))
+                .poll(new HttpPollConfig<String>(SENSOR_STRING)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.stringContentsFunction())
+                        .onException(Functions.constant("Failed!")))
+                .build();
+
+        EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, 401);
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                String val = entity.getAttribute(SENSOR_STRING);
+                assertTrue(val != null && val.equals("Failed!"), "val=" + val);
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedTest.java b/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedTest.java
new file mode 100644
index 0000000..d8ac492
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityFunctions;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.EntityInternal.FeedSupport;
+import org.apache.brooklyn.core.feed.FeedConfig;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.http.BetterMockWebServer;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.net.Networking;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.mockwebserver.MockResponse;
+import com.google.mockwebserver.SocketPolicy;
+
+public class HttpFeedTest extends BrooklynAppUnitTestSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(HttpFeedTest.class);
+    
+    final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString", "");
+    final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor( "aLong", "");
+
+    private static final long TIMEOUT_MS = 10*1000;
+    
+    private BetterMockWebServer server;
+    private URL baseUrl;
+    
+    private Location loc;
+    private EntityLocal entity;
+    private HttpFeed feed;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        server = BetterMockWebServer.newInstanceLocalhost();
+        for (int i = 0; i < 100; i++) {
+            server.enqueue(new MockResponse().setResponseCode(200).addHeader("content-type: application/json").setBody("{\"foo\":\"myfoo\"}"));
+        }
+        server.play();
+        baseUrl = server.getUrl("/");
+
+        loc = app.newLocalhostProvisioningLocation();
+        entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        app.start(ImmutableList.of(loc));
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        if (feed != null) feed.stop();
+        if (server != null) server.shutdown();
+        feed = null;
+        super.tearDown();
+    }
+    
+    @Test
+    public void testPollsAndParsesHttpGetResponse() throws Exception {
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(baseUrl)
+                .poll(HttpPollConfig.forSensor(SENSOR_INT)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode()))
+                .poll(HttpPollConfig.forSensor(SENSOR_STRING)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.stringContentsFunction()))
+                .build();
+        
+        assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+        assertSensorEventually(SENSOR_STRING, "{\"foo\":\"myfoo\"}", TIMEOUT_MS);
+    }
+    
+    @Test
+    public void testFeedDeDupe() throws Exception {
+        testPollsAndParsesHttpGetResponse();
+        entity.addFeed(feed);
+        log.info("Feed 0 is: "+feed);
+        
+        testPollsAndParsesHttpGetResponse();
+        log.info("Feed 1 is: "+feed);
+        entity.addFeed(feed);
+                
+        FeedSupport feeds = ((EntityInternal)entity).feeds();
+        Assert.assertEquals(feeds.getFeeds().size(), 1, "Wrong feed count: "+feeds.getFeeds());
+    }
+    
+    @Test
+    public void testSetsConnectionTimeout() throws Exception {
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(baseUrl)
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .period(100)
+                        .connectionTimeout(Duration.TEN_SECONDS)
+                        .socketTimeout(Duration.TEN_SECONDS)
+                        .onSuccess(HttpValueFunctions.responseCode()))
+                .build();
+        
+        assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+    }
+    
+    // TODO How to cause the other end to just freeze (similar to aws-ec2 when securityGroup port is not open)?
+    @Test
+    public void testSetsConnectionTimeoutWhenServerDisconnects() throws Exception {
+        if (server != null) server.shutdown();
+        server = BetterMockWebServer.newInstanceLocalhost();
+        for (int i = 0; i < 100; i++) {
+            server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START));
+        }
+        server.play();
+        baseUrl = server.getUrl("/");
+
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(baseUrl)
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .period(100)
+                        .connectionTimeout(Duration.TEN_SECONDS)
+                        .socketTimeout(Duration.TEN_SECONDS)
+                        .onSuccess(HttpValueFunctions.responseCode())
+                        .onException(Functions.constant(-1)))
+                .build();
+        
+        assertSensorEventually(SENSOR_INT, -1, TIMEOUT_MS);
+    }
+    
+    
+    @Test
+    public void testPollsAndParsesHttpPostResponse() throws Exception {
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(baseUrl)
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .method("post")
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode()))
+                .poll(new HttpPollConfig<String>(SENSOR_STRING)
+                        .method("post")
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.stringContentsFunction()))
+                .build();
+        
+        assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+        assertSensorEventually(SENSOR_STRING, "{\"foo\":\"myfoo\"}", TIMEOUT_MS);
+    }
+
+    @Test
+    public void testUsesFailureHandlerOn4xx() throws Exception {
+        server = BetterMockWebServer.newInstanceLocalhost();
+        for (int i = 0; i < 100; i++) {
+            server.enqueue(new MockResponse()
+                    .setResponseCode(401)
+                    .setBody("Unauthorised"));
+        }
+        server.play();
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(server.getUrl("/"))
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode())
+                        .onFailure(HttpValueFunctions.responseCode()))
+                .poll(new HttpPollConfig<String>(SENSOR_STRING)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.stringContentsFunction())
+                        .onFailure(Functions.constant("Failed")))
+                .build();
+
+        assertSensorEventually(SENSOR_INT, 401, TIMEOUT_MS);
+        assertSensorEventually(SENSOR_STRING, "Failed", TIMEOUT_MS);
+
+        server.shutdown();
+    }
+
+    @Test
+    public void testUsesExceptionHandlerOn4xxAndNoFailureHandler() throws Exception {
+        server = BetterMockWebServer.newInstanceLocalhost();
+        for (int i = 0; i < 100; i++) {
+            server.enqueue(new MockResponse()
+                    .setResponseCode(401)
+                    .setBody("Unauthorised"));
+        }
+        server.play();
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(server.getUrl("/"))
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode())
+                        .onException(Functions.constant(-1)))
+                .build();
+
+        assertSensorEventually(SENSOR_INT, -1, TIMEOUT_MS);
+
+        server.shutdown();
+    }
+
+    @Test(groups="Integration")
+    // marked integration as it takes a wee while
+    public void testSuspendResume() throws Exception {
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(baseUrl)
+                .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode()))
+                .poll(new HttpPollConfig<String>(SENSOR_STRING)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.stringContentsFunction()))
+                .build();
+        assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+        feed.suspend();
+        final int countWhenSuspended = server.getRequestCount();
+        
+        Thread.sleep(500);
+        if (server.getRequestCount() > countWhenSuspended+1)
+            Assert.fail("Request count continued to increment while feed was suspended, from "+countWhenSuspended+" to "+server.getRequestCount());
+        
+        feed.resume();
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                assertTrue(server.getRequestCount() > countWhenSuspended + 1,
+                        "Request count failed to increment when feed was resumed, from " + countWhenSuspended + ", still at " + server.getRequestCount());
+            }
+        });
+    }
+
+    @Test(groups="Integration")
+    // marked integration as it takes a wee while
+    public void testStartSuspended() throws Exception {
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(baseUrl)
+                .poll(HttpPollConfig.forSensor(SENSOR_INT)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.responseCode()))
+                .poll(HttpPollConfig.forSensor(SENSOR_STRING)
+                        .period(100)
+                        .onSuccess(HttpValueFunctions.stringContentsFunction()))
+                .suspended()
+                .build();
+        Asserts.continually(MutableMap.of("timeout", 500),
+                Entities.attributeSupplier(entity, SENSOR_INT), Predicates.<Integer>equalTo(null));
+        int countWhenSuspended = server.getRequestCount();
+        feed.resume();
+        Asserts.eventually(Entities.attributeSupplier(entity, SENSOR_INT), Predicates.<Integer>equalTo(200));
+        if (server.getRequestCount() <= countWhenSuspended)
+            Assert.fail("Request count failed to increment when feed was resumed, from "+countWhenSuspended+", still at "+server.getRequestCount());
+        log.info("RUN: "+countWhenSuspended+" - "+server.getRequestCount());
+    }
+
+
+    @Test
+    public void testPollsAndParsesHttpErrorResponseLocal() throws Exception {
+        int unboundPort = Networking.nextAvailablePort(10000);
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUri("http://localhost:" + unboundPort + "/path/should/not/exist")
+                .poll(new HttpPollConfig<String>(SENSOR_STRING)
+                        .onSuccess(Functions.constant("success"))
+                        .onFailure(Functions.constant("failure"))
+                        .onException(Functions.constant("error")))
+                .build();
+        
+        assertSensorEventually(SENSOR_STRING, "error", TIMEOUT_MS);
+    }
+
+    @Test
+    public void testPollsMulti() throws Exception {
+        newMultiFeed(baseUrl);
+        assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+        assertSensorEventually(SENSOR_STRING, "{\"foo\":\"myfoo\"}", TIMEOUT_MS);
+    }
+
+    // because takes a wee while
+    @SuppressWarnings("rawtypes")
+    @Test(groups="Integration")
+    public void testPollsMultiClearsOnSubsequentFailure() throws Exception {
+        server = BetterMockWebServer.newInstanceLocalhost();
+        for (int i = 0; i < 10; i++) {
+            server.enqueue(new MockResponse()
+                    .setResponseCode(200)
+                    .setBody("Hello World"));
+        }
+        for (int i = 0; i < 10; i++) {
+            server.enqueue(new MockResponse()
+                    .setResponseCode(401)
+                    .setBody("Unauthorised"));
+        }
+        server.play();
+
+        newMultiFeed(server.getUrl("/"));
+        
+        assertSensorEventually(SENSOR_INT, 200, TIMEOUT_MS);
+        assertSensorEventually(SENSOR_STRING, "Hello World", TIMEOUT_MS);
+        
+        assertSensorEventually(SENSOR_INT, -1, TIMEOUT_MS);
+        assertSensorEventually(SENSOR_STRING, null, TIMEOUT_MS);
+        
+        List<String> attrs = Lists.transform(MutableList.copyOf( ((EntityInternal)entity).getAllAttributes().keySet() ),
+            new Function<AttributeSensor,String>() {
+                @Override public String apply(AttributeSensor input) { return input.getName(); } });
+        Assert.assertTrue(!attrs.contains(SENSOR_STRING.getName()), "attrs contained "+SENSOR_STRING);
+        Assert.assertTrue(!attrs.contains(FeedConfig.NO_SENSOR.getName()), "attrs contained "+FeedConfig.NO_SENSOR);
+        
+        server.shutdown();
+    }
+
+    private void newMultiFeed(URL baseUrl) {
+        feed = HttpFeed.builder()
+                .entity(entity)
+                .baseUrl(baseUrl)
+                
+                .poll(HttpPollConfig.forMultiple()
+                    .onSuccess(new Function<HttpToolResponse,Void>() {
+                        public Void apply(HttpToolResponse response) {
+                            entity.setAttribute(SENSOR_INT, response.getResponseCode());
+                            if (response.getResponseCode()==200)
+                                entity.setAttribute(SENSOR_STRING, response.getContentAsString());
+                            return null;
+                        }
+                    })
+                    .onFailureOrException(Functionals.function(EntityFunctions.settingSensorsConstant(entity, MutableMap.<AttributeSensor<?>,Object>of(
+                        SENSOR_INT, -1, 
+                        SENSOR_STRING, PollConfig.REMOVE))))
+                .period(100))
+                .build();
+    }
+    
+
+    private <T> void assertSensorEventually(final AttributeSensor<T> sensor, final T expectedVal, long timeout) {
+        Asserts.succeedsEventually(ImmutableMap.of("timeout", timeout), new Callable<Void>() {
+            public Void call() {
+                assertEquals(entity.getAttribute(sensor), expectedVal);
+                return null;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/http/HttpValueFunctionsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/http/HttpValueFunctionsTest.java b/core/src/test/java/org/apache/brooklyn/feed/http/HttpValueFunctionsTest.java
new file mode 100644
index 0000000..23ffae3
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/http/HttpValueFunctionsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.NoSuchElementException;
+
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+
+public class HttpValueFunctionsTest {
+
+    private int responseCode = 200;
+    private long fullLatency = 1000;
+    private String headerName = "my_header";
+    private String headerVal = "my_header_val";
+    private String bodyKey = "mykey";
+    private String bodyVal = "myvalue";
+    private String body = "{"+bodyKey+":"+bodyVal+"}";
+    private long now;
+    private HttpToolResponse response;
+    
+    @BeforeMethod
+    public void setUp() throws Exception {
+        now = System.currentTimeMillis();
+        response = new HttpToolResponse(responseCode, ImmutableMap.of(headerName, ImmutableList.of(headerVal)), 
+                body.getBytes(), now-fullLatency, fullLatency / 2, fullLatency);
+    }
+    
+    @Test
+    public void testResponseCode() throws Exception {
+        assertEquals(HttpValueFunctions.responseCode().apply(response), Integer.valueOf(responseCode));
+    }
+
+    @Test
+    public void testContainsHeader() throws Exception {
+        assertTrue(HttpValueFunctions.containsHeader(headerName).apply(response));
+        assertFalse(HttpValueFunctions.containsHeader("wrong_header").apply(response));
+    }
+    
+    @Test
+    public void testStringContents() throws Exception {
+        assertEquals(HttpValueFunctions.stringContentsFunction().apply(response), body);
+    }
+
+    @Test
+    public void testJsonContents() throws Exception {
+        JsonElement json = HttpValueFunctions.jsonContents().apply(response);
+        assertTrue(json.isJsonObject());
+        assertEquals(json.getAsJsonObject().entrySet(), ImmutableMap.of(bodyKey, new JsonPrimitive(bodyVal)).entrySet());
+    }
+
+    @Test
+    public void testJsonContentsGettingElement() throws Exception {
+        assertEquals(HttpValueFunctions.jsonContents(bodyKey, String.class).apply(response), bodyVal);
+    }
+
+    @Test(expectedExceptions=NoSuchElementException.class)
+    public void testJsonContentsGettingMissingElement() throws Exception {
+        assertNull(HttpValueFunctions.jsonContents("wrongkey", String.class).apply(response));
+    }
+
+    @Test
+    public void testLatency() throws Exception {
+        assertEquals(HttpValueFunctions.latency().apply(response), Long.valueOf(fullLatency));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/http/JsonFunctionsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/http/JsonFunctionsTest.java b/core/src/test/java/org/apache/brooklyn/feed/http/JsonFunctionsTest.java
new file mode 100644
index 0000000..928035e
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/http/JsonFunctionsTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.util.NoSuchElementException;
+
+import org.apache.brooklyn.feed.http.JsonFunctions;
+import org.apache.brooklyn.util.collections.Jsonya;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.Jsonya.Navigator;
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.jayway.jsonpath.PathNotFoundException;
+
+public class JsonFunctionsTest {
+
+    public static JsonElement europeMap() {
+        Navigator<MutableMap<Object, Object>> europe = Jsonya.newInstance().at("europe", "uk", "edinburgh")
+                .put("population", 500*1000)
+                .put("weather", "wet", "lighting", "dark")
+                .root().at("europe").at("france").put("population", 80*1000*1000)
+                .root();
+        return new JsonParser().parse( europe.toString() );
+    }
+
+    @Test
+    public void testWalk1() {
+        JsonElement pop = JsonFunctions.walk("europe", "france", "population").apply(europeMap());
+        Assert.assertEquals( (int)JsonFunctions.cast(Integer.class).apply(pop), 80*1000*1000 );
+    }
+
+    @Test
+    public void testWalk2() {
+        String weather = Functionals.chain(
+            JsonFunctions.walk("europe.uk.edinburgh.weather"),
+            JsonFunctions.cast(String.class) ).apply(europeMap());
+        Assert.assertEquals(weather, "wet");
+    }
+
+    @Test(expectedExceptions=NoSuchElementException.class)
+    public void testWalkWrong() {
+        Functionals.chain(
+            JsonFunctions.walk("europe", "spain", "barcelona"),
+            JsonFunctions.cast(String.class) ).apply(europeMap());
+    }
+
+
+    @Test
+    public void testWalkM() {
+        Maybe<JsonElement> pop = JsonFunctions.walkM("europe", "france", "population").apply( Maybe.of(europeMap()) );
+        Assert.assertEquals( (int)JsonFunctions.castM(Integer.class).apply(pop), 80*1000*1000 );
+    }
+
+    @Test
+    public void testWalkMWrong1() {
+        Maybe<JsonElement> m = JsonFunctions.walkM("europe", "spain", "barcelona").apply( Maybe.of( europeMap()) );
+        Assert.assertTrue(m.isAbsent());
+    }
+
+    @Test(expectedExceptions=Exception.class)
+    public void testWalkMWrong2() {
+        Maybe<JsonElement> m = JsonFunctions.walkM("europe", "spain", "barcelona").apply( Maybe.of( europeMap()) );
+        JsonFunctions.castM(String.class).apply(m);
+    }
+
+    
+    @Test
+    public void testWalkN() {
+        JsonElement pop = JsonFunctions.walkN("europe", "france", "population").apply( europeMap() );
+        Assert.assertEquals( (int)JsonFunctions.cast(Integer.class).apply(pop), 80*1000*1000 );
+    }
+
+    @Test
+    public void testWalkNWrong1() {
+        JsonElement m = JsonFunctions.walkN("europe", "spain", "barcelona").apply( europeMap() );
+        Assert.assertNull(m);
+    }
+
+    public void testWalkNWrong2() {
+        JsonElement m = JsonFunctions.walkN("europe", "spain", "barcelona").apply( europeMap() );
+        String n = JsonFunctions.cast(String.class).apply(m);
+        Assert.assertNull(n);
+    }
+
+    @Test
+    public void testGetPath1(){
+        Integer obj = (Integer) JsonFunctions.getPath("$.europe.uk.edinburgh.population").apply(europeMap());
+        Assert.assertEquals((int) obj, 500*1000);
+    }
+
+    @Test
+    public void testGetPath2(){
+        String obj = (String) JsonFunctions.getPath("$.europe.uk.edinburgh.lighting").apply(europeMap());
+        Assert.assertEquals(obj, "dark");
+    }
+
+    @Test
+    public void testGetMissingPathIsNullOrThrows(){
+        try {
+            // TODO is there a way to force this to return null if not found?
+            // for me (Alex) it throws but for others it seems to return null
+            Object obj = JsonFunctions.getPath("$.europe.spain.malaga").apply(europeMap());
+            Assert.assertNull(obj);
+        } catch (PathNotFoundException e) {
+            // not unexpected
+        }
+    }
+    
+}