You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:14 UTC

[26/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to o.a.b.feed and o.a.b.core.feed

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
new file mode 100644
index 0000000..e8cfc9f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.util.core.http.HttpTool;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.brooklyn.util.core.http.HttpTool.HttpClientBuilder;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.HttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by polling over http.
+ * 
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private HttpFeed feed;
+ * 
+ * //@Override
+ * protected void connectSensors() {
+ *   super.connectSensors();
+ *   
+ *   feed = HttpFeed.builder()
+ *       .entity(this)
+ *       .period(200)
+ *       .baseUri(String.format("http://%s:%s/management/subsystem/web/connector/http/read-resource", host, port))
+ *       .baseUriVars(ImmutableMap.of("include-runtime","true"))
+ *       .poll(new HttpPollConfig<Boolean>(SERVICE_UP)
+ *           .onSuccess(HttpValueFunctions.responseCodeEquals(200))
+ *           .onError(Functions.constant(false)))
+ *       .poll(new HttpPollConfig<Integer>(REQUEST_COUNT)
+ *           .onSuccess(HttpValueFunctions.jsonContents("requestCount", Integer.class)))
+ *       .build();
+ * }
+ * 
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ *   super.disconnectSensors();
+ *   if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ * <p>
+ *  
+ * This also supports giving a Supplier for the URL 
+ * (e.g. {@link Entities#attributeSupplier(org.apache.brooklyn.api.entity.Entity, org.apache.brooklyn.api.event.AttributeSensor)})
+ * from a sensor.  Note however that if a Supplier-based sensor is *https*,
+ * https-specific initialization may not occur if the URL is not available at start time,
+ * and it may report errors if that sensor is not available.
+ * Some guidance for controlling enablement of a feed based on availability of a sensor
+ * can be seen in HttpLatencyDetector (in brooklyn-policy). 
+ * 
+ * @author aled
+ */
+public class HttpFeed extends AbstractFeed {
+
+    public static final Logger log = LoggerFactory.getLogger(HttpFeed.class);
+
+    @SuppressWarnings("serial")
+    public static final ConfigKey<SetMultimap<HttpPollIdentifier, HttpPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+            new TypeToken<SetMultimap<HttpPollIdentifier, HttpPollConfig<?>>>() {},
+            "polls");
+
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static class Builder {
+        private EntityLocal entity;
+        private boolean onlyIfServiceUp = false;
+        private Supplier<URI> baseUriProvider;
+        private Duration period = Duration.millis(500);
+        private List<HttpPollConfig<?>> polls = Lists.newArrayList();
+        private URI baseUri;
+        private Map<String, String> baseUriVars = Maps.newLinkedHashMap();
+        private Map<String, String> headers = Maps.newLinkedHashMap();
+        private boolean suspended = false;
+        private Credentials credentials;
+        private String uniqueTag;
+        private volatile boolean built;
+
+        public Builder entity(EntityLocal val) {
+            this.entity = val;
+            return this;
+        }
+        public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
+        public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { 
+            this.onlyIfServiceUp = onlyIfServiceUp; 
+            return this; 
+        }
+        public Builder baseUri(Supplier<URI> val) {
+            if (baseUri!=null && val!=null)
+                throw new IllegalStateException("Builder cannot take both a URI and a URI Provider");
+            this.baseUriProvider = val;
+            return this;
+        }
+        public Builder baseUri(URI val) {
+            if (baseUriProvider!=null && val!=null)
+                throw new IllegalStateException("Builder cannot take both a URI and a URI Provider");
+            this.baseUri = val;
+            return this;
+        }
+        public Builder baseUrl(URL val) {
+            return baseUri(URI.create(val.toString()));
+        }
+        public Builder baseUri(String val) {
+            return baseUri(URI.create(val));
+        }
+        public Builder baseUriVars(Map<String,String> vals) {
+            baseUriVars.putAll(vals);
+            return this;
+        }
+        public Builder baseUriVar(String key, String val) {
+            baseUriVars.put(key, val);
+            return this;
+        }
+        public Builder headers(Map<String,String> vals) {
+            headers.putAll(vals);
+            return this;
+        }
+        public Builder header(String key, String val) {
+            headers.put(key, val);
+            return this;
+        }
+        public Builder period(Duration duration) {
+            this.period = duration;
+            return this;
+        }
+        public Builder period(long millis) {
+            return period(millis, TimeUnit.MILLISECONDS);
+        }
+        public Builder period(long val, TimeUnit units) {
+            return period(Duration.of(val, units));
+        }
+        public Builder poll(HttpPollConfig<?> config) {
+            polls.add(config);
+            return this;
+        }
+        public Builder suspended() {
+            return suspended(true);
+        }
+        public Builder suspended(boolean startsSuspended) {
+            this.suspended = startsSuspended;
+            return this;
+        }
+        public Builder credentials(String username, String password) {
+            this.credentials = new UsernamePasswordCredentials(username, password);
+            return this;
+        }
+        public Builder credentialsIfNotNull(String username, String password) {
+            if (username != null && password != null) {
+                this.credentials = new UsernamePasswordCredentials(username, password);
+            }
+            return this;
+        }
+        public Builder uniqueTag(String uniqueTag) {
+            this.uniqueTag = uniqueTag;
+            return this;
+        }
+        public HttpFeed build() {
+            built = true;
+            HttpFeed result = new HttpFeed(this);
+            result.setEntity(checkNotNull(entity, "entity"));
+            if (suspended) result.suspend();
+            result.start();
+            return result;
+        }
+        @Override
+        protected void finalize() {
+            if (!built) log.warn("HttpFeed.Builder created, but build() never called");
+        }
+    }
+    
+    private static class HttpPollIdentifier {
+        final String method;
+        final Supplier<URI> uriProvider;
+        final Map<String,String> headers;
+        final byte[] body;
+        final Optional<Credentials> credentials;
+        final Duration connectionTimeout;
+        final Duration socketTimeout;
+        private HttpPollIdentifier(String method, Supplier<URI> uriProvider, Map<String, String> headers, byte[] body,
+                                   Optional<Credentials> credentials, Duration connectionTimeout, Duration socketTimeout) {
+            this.method = checkNotNull(method, "method").toLowerCase();
+            this.uriProvider = checkNotNull(uriProvider, "uriProvider");
+            this.headers = checkNotNull(headers, "headers");
+            this.body = body;
+            this.credentials = checkNotNull(credentials, "credentials");
+            this.connectionTimeout = connectionTimeout;
+            this.socketTimeout = socketTimeout;
+            
+            if (!(this.method.equals("get") || this.method.equals("post"))) {
+                throw new IllegalArgumentException("Unsupported HTTP method (only supports GET and POST): "+method);
+            }
+            if (body != null && method.equalsIgnoreCase("get")) {
+                throw new IllegalArgumentException("Must not set body for http GET method");
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(method, uriProvider, headers, body, credentials);
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof HttpPollIdentifier)) {
+                return false;
+            }
+            HttpPollIdentifier o = (HttpPollIdentifier) other;
+            return Objects.equal(method, o.method) &&
+                    Objects.equal(uriProvider, o.uriProvider) &&
+                    Objects.equal(headers, o.headers) &&
+                    Objects.equal(body, o.body) &&
+                    Objects.equal(credentials, o.credentials);
+        }
+    }
+    
+    /**
+     * For rebind; do not call directly; use builder
+     */
+    public HttpFeed() {
+    }
+    
+    protected HttpFeed(Builder builder) {
+        setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
+        Map<String,String> baseHeaders = ImmutableMap.copyOf(checkNotNull(builder.headers, "headers"));
+        
+        SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = HashMultimap.<HttpPollIdentifier,HttpPollConfig<?>>create();
+        for (HttpPollConfig<?> config : builder.polls) {
+            if (!config.isEnabled()) continue;
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            HttpPollConfig<?> configCopy = new HttpPollConfig(config);
+            if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
+            String method = config.getMethod();
+            Map<String,String> headers = config.buildHeaders(baseHeaders);
+            byte[] body = config.getBody();
+            Duration connectionTimeout = config.getConnectionTimeout();
+            Duration socketTimeout = config.getSocketTimeout();
+            
+            Optional<Credentials> credentials = Optional.fromNullable(builder.credentials);
+            
+            Supplier<URI> baseUriProvider = builder.baseUriProvider;
+            if (builder.baseUri!=null) {
+                if (baseUriProvider!=null)
+                    throw new IllegalStateException("Not permitted to supply baseUri and baseUriProvider");
+                Map<String,String> baseUriVars = ImmutableMap.copyOf(checkNotNull(builder.baseUriVars, "baseUriVars"));
+                URI uri = config.buildUri(builder.baseUri, baseUriVars);
+                baseUriProvider = Suppliers.ofInstance(uri);
+            } else if (!builder.baseUriVars.isEmpty()) {
+                throw new IllegalStateException("Not permitted to supply URI vars when using a URI provider; pass the vars to the provider instead");
+            }
+            checkNotNull(baseUriProvider);
+
+            polls.put(new HttpPollIdentifier(method, baseUriProvider, headers, body, credentials, connectionTimeout, socketTimeout), configCopy);
+        }
+        setConfig(POLLS, polls);
+        initUniqueTag(builder.uniqueTag, polls.values());
+    }
+
+    @Override
+    protected void preStart() {
+        SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = getConfig(POLLS);
+        
+        for (final HttpPollIdentifier pollInfo : polls.keySet()) {
+            // Though HttpClients are thread safe and can take advantage of connection pooling
+            // and authentication caching, the httpcomponents documentation says:
+            //    "While HttpClient instances are thread safe and can be shared between multiple
+            //     threads of execution, it is highly recommended that each thread maintains its
+            //     own dedicated instance of HttpContext.
+            //  http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
+            final HttpClient httpClient = createHttpClient(pollInfo);
+
+            Set<HttpPollConfig<?>> configs = polls.get(pollInfo);
+            long minPeriod = Integer.MAX_VALUE;
+            Set<AttributePollHandler<? super HttpToolResponse>> handlers = Sets.newLinkedHashSet();
+
+            for (HttpPollConfig<?> config : configs) {
+                handlers.add(new AttributePollHandler<HttpToolResponse>(config, entity, this));
+                if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+            }
+
+            Callable<HttpToolResponse> pollJob;
+            
+            if (pollInfo.method.equals("get")) {
+                pollJob = new Callable<HttpToolResponse>() {
+                    public HttpToolResponse call() throws Exception {
+                        if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
+                        return HttpTool.httpGet(httpClient, pollInfo.uriProvider.get(), pollInfo.headers);
+                    }};
+            } else if (pollInfo.method.equals("post")) {
+                pollJob = new Callable<HttpToolResponse>() {
+                    public HttpToolResponse call() throws Exception {
+                        if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
+                        return HttpTool.httpPost(httpClient, pollInfo.uriProvider.get(), pollInfo.headers, pollInfo.body);
+                    }};
+            } else if (pollInfo.method.equals("head")) {
+                pollJob = new Callable<HttpToolResponse>() {
+                    public HttpToolResponse call() throws Exception {
+                        if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
+                        return HttpTool.httpHead(httpClient, pollInfo.uriProvider.get(), pollInfo.headers);
+                    }};
+            } else {
+                throw new IllegalStateException("Unexpected http method: "+pollInfo.method);
+            }
+            
+            getPoller().scheduleAtFixedRate(pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod);
+        }
+    }
+
+    // TODO Should we really trustAll for https? Make configurable?
+    private HttpClient createHttpClient(HttpPollIdentifier pollIdentifier) {
+        URI uri = pollIdentifier.uriProvider.get();
+        HttpClientBuilder builder = HttpTool.httpClientBuilder()
+                .trustAll()
+                .laxRedirect(true);
+        if (uri != null) builder.uri(uri);
+        if (uri != null) builder.credential(pollIdentifier.credentials);
+        if (pollIdentifier.connectionTimeout != null) {
+            builder.connectionTimeout(pollIdentifier.connectionTimeout);
+        }
+        if (pollIdentifier.socketTimeout != null) {
+            builder.socketTimeout(pollIdentifier.socketTimeout);
+        }
+        return builder.build();
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Poller<HttpToolResponse> getPoller() {
+        return (Poller<HttpToolResponse>) super.getPoller();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java
new file mode 100644
index 0000000..e019293
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.net.URI;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.http.HttpTool;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+
+public class HttpPollConfig<T> extends PollConfig<HttpToolResponse, T, HttpPollConfig<T>> {
+
+    private String method = "GET";
+    private String suburl = "";
+    private Map<String, String> vars = ImmutableMap.<String,String>of();
+    private Map<String, String> headers = ImmutableMap.<String,String>of();
+    private byte[] body;
+    private Duration connectionTimeout;
+    private Duration socketTimeout;
+    
+    public static final Predicate<HttpToolResponse> DEFAULT_SUCCESS = new Predicate<HttpToolResponse>() {
+        @Override
+        public boolean apply(@Nullable HttpToolResponse input) {
+            return input != null && input.getResponseCode() >= 200 && input.getResponseCode() <= 399;
+        }};
+    
+    public static <T> HttpPollConfig<T> forSensor(AttributeSensor<T> sensor) {
+        return new HttpPollConfig<T>(sensor);
+    }
+    
+    public static HttpPollConfig<Void> forMultiple() {
+        return new HttpPollConfig<Void>(PollConfig.NO_SENSOR);
+    }
+    
+    public HttpPollConfig(AttributeSensor<T> sensor) {
+        super(sensor);
+        super.checkSuccess(DEFAULT_SUCCESS);
+    }
+
+    public HttpPollConfig(HttpPollConfig<T> other) {
+        super(other);
+        suburl = other.suburl;
+        vars = other.vars;
+        method = other.method;
+        headers = other.headers;
+    }
+    
+    public String getSuburl() {
+        return suburl;
+    }
+    
+    public Map<String, String> getVars() {
+        return vars;
+    }
+    
+    public Duration getConnectionTimeout() {
+        return connectionTimeout;
+    }
+    
+    public Duration getSocketTimeout() {
+        return socketTimeout;
+    }
+    
+    public String getMethod() {
+        return method;
+    }
+    
+    public byte[] getBody() {
+        return body;
+    }
+    
+    public HttpPollConfig<T> method(String val) {
+        this.method = val; return this;
+    }
+    
+    public HttpPollConfig<T> suburl(String val) {
+        this.suburl = val; return this;
+    }
+    
+    public HttpPollConfig<T> vars(Map<String,String> val) {
+        this.vars = val; return this;
+    }
+    
+    public HttpPollConfig<T> headers(Map<String,String> val) {
+        this.headers = val; return this;
+    }
+    
+    public HttpPollConfig<T> body(byte[] val) {
+        this.body = val; return this;
+    }
+    public HttpPollConfig<T> connectionTimeout(Duration val) {
+        this.connectionTimeout = val;
+        return this;
+    }
+    public HttpPollConfig<T> socketTimeout(Duration val) {
+        this.socketTimeout = val;
+        return this;
+    }
+    public URI buildUri(URI baseUri, Map<String,String> baseUriVars) {
+        String uri = (baseUri != null ? baseUri.toString() : "") + (suburl != null ? suburl : "");
+        Map<String,String> allvars = concat(baseUriVars, vars);
+        
+        if (allvars != null && allvars.size() > 0) {
+            uri += "?" + HttpTool.encodeUrlParams(allvars);
+        }
+        
+        return URI.create(uri);
+    }
+
+    public Map<String, String> buildHeaders(Map<String, String> baseHeaders) {
+        return MutableMap.<String,String>builder()
+                .putAll(baseHeaders)
+                .putAll(headers)
+                .build();
+    }
+    
+    @SuppressWarnings("unchecked")
+    private <K,V> Map<K,V> concat(Map<? extends K,? extends V> map1, Map<? extends K,? extends V> map2) {
+        if (map1 == null || map1.isEmpty()) return (Map<K,V>) map2;
+        if (map2 == null || map2.isEmpty()) return (Map<K,V>) map1;
+        
+        // TODO Not using Immutable builder, because that fails if duplicates in map1 and map2
+        return MutableMap.<K,V>builder().putAll(map1).putAll(map2).build();
+    }
+
+    @Override protected String toStringBaseName() { return "http"; }
+    @Override protected String toStringPollSource() { return suburl; }
+    @Override
+    protected MutableList<Object> equalsFields() {
+        return super.equalsFields().appendIfNotNull(method).appendIfNotNull(vars).appendIfNotNull(headers)
+            .appendIfNotNull(body).appendIfNotNull(connectionTimeout).appendIfNotNull(socketTimeout);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java
new file mode 100644
index 0000000..5414cc4
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+
+/** @deprecated since 0.7.0, use {@link HttpToolResponse}.
+ * the old {@link HttpPollValue} concrete class has been renamed {@link HttpToolResponse}
+ * because it has nothing specific to polls. this is now just a transitional interface. */
+@Deprecated
+public interface HttpPollValue {
+
+    public int getResponseCode();
+    public String getReasonPhrase();
+    public long getStartTime();
+    public long getLatencyFullContent();
+    public long getLatencyFirstResponse();
+    public Map<String, List<String>> getHeaderLists();
+    public byte[] getContent();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java
new file mode 100644
index 0000000..10c3b3e
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.net.URI;
+
+import org.apache.brooklyn.util.core.http.HttpTool;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * @deprecated since 0.7; use {@link HttpTool}
+ */
+@Deprecated
+public class HttpPolls {
+
+    public static HttpToolResponse executeSimpleGet(URI uri) {
+        return HttpTool.httpGet(new DefaultHttpClient(), uri, ImmutableMap.<String,String>of());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java
new file mode 100644
index 0000000..75dab74
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.util.List;
+
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.brooklyn.util.guava.Functionals;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Lists;
+import com.google.gson.JsonElement;
+
+public class HttpValueFunctions {
+
+    private HttpValueFunctions() {} // instead use static utility methods
+    
+    public static Function<HttpToolResponse, Integer> responseCode() {
+        return new ResponseCode();
+    }
+    
+    /** @deprecated since 0.7.0; only here for deserialization of persisted state */
+    private static Function<HttpToolResponse, Integer> responseCodeLegacy() {
+        return new Function<HttpToolResponse, Integer>() {
+            @Override public Integer apply(HttpToolResponse input) {
+                return input.getResponseCode();
+            }
+        };
+    }
+
+    private static class ResponseCode implements Function<HttpToolResponse, Integer> {
+        @Override public Integer apply(HttpToolResponse input) {
+            return input.getResponseCode();
+        }
+    }
+
+    public static Function<HttpToolResponse, Boolean> responseCodeEquals(final int expected) {
+        return Functionals.chain(HttpValueFunctions.responseCode(), Functions.forPredicate(Predicates.equalTo(expected)));
+    }
+    
+    public static Function<HttpToolResponse, Boolean> responseCodeEquals(final int... expected) {
+        List<Integer> expectedList = Lists.newArrayList();
+        for (int e : expected) {
+            expectedList.add((Integer)e);
+        }
+        return Functionals.chain(HttpValueFunctions.responseCode(), Functions.forPredicate(Predicates.in(expectedList)));
+    }
+
+    public static Function<HttpToolResponse, String> stringContentsFunction() {
+        return new StringContents();
+    }
+    
+    /** @deprecated since 0.7.0; only here for deserialization of persisted state */
+    private static Function<HttpToolResponse, String> stringContentsFunctionLegacy() {
+        return new Function<HttpToolResponse, String>() {
+            @Override public String apply(HttpToolResponse input) {
+                return input.getContentAsString();
+            }
+        };
+    }
+
+    private static class StringContents implements Function<HttpToolResponse, String> {
+        @Override public String apply(HttpToolResponse input) {
+            return input.getContentAsString();
+        }
+    }
+
+    public static Function<HttpToolResponse, JsonElement> jsonContents() {
+        return Functionals.chain(stringContentsFunction(), JsonFunctions.asJson());
+    }
+    
+    public static <T> Function<HttpToolResponse, T> jsonContents(String element, Class<T> expected) {
+        return jsonContents(new String[] {element}, expected);
+    }
+    
+    public static <T> Function<HttpToolResponse, T> jsonContents(String[] elements, Class<T> expected) {
+        return Functionals.chain(jsonContents(), JsonFunctions.walk(elements), JsonFunctions.cast(expected));
+    }
+
+    public static <T> Function<HttpToolResponse, T> jsonContentsFromPath(String path){
+        return Functionals.chain(jsonContents(), JsonFunctions.<T>getPath(path));
+    }
+    
+    public static Function<HttpToolResponse, Long> latency() {
+        return new Latency();
+    }
+
+    /** @deprecated since 0.7.0; only here for deserialization of persisted state */
+    private static Function<HttpToolResponse, Long> latencyLegacy() {
+        return new Function<HttpToolResponse, Long>() {
+            public Long apply(HttpToolResponse input) {
+                return input.getLatencyFullContent();
+            }
+        };
+    }
+
+    private static class Latency implements Function<HttpToolResponse, Long> {
+        public Long apply(HttpToolResponse input) {
+            return input.getLatencyFullContent();
+        }
+    };
+
+    public static Function<HttpToolResponse, Boolean> containsHeader(String header) {
+        return new ContainsHeader(header);
+    }
+
+    private static class ContainsHeader implements Function<HttpToolResponse, Boolean> {
+        private final String header;
+
+        public ContainsHeader(String header) {
+            this.header = header;
+        }
+        @Override
+        public Boolean apply(HttpToolResponse input) {
+            List<String> actual = input.getHeaderLists().get(header);
+            return actual != null && actual.size() > 0;
+        }
+    }
+    
+
+    /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function)} */ @Deprecated
+    public static <A,B,C> Function<A,C> chain(final Function<A,? extends B> f1, final Function<B,C> f2) {
+        return Functionals.chain(f1, f2);
+    }
+    
+    /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function, Function)} */ @Deprecated
+    public static <A,B,C,D> Function<A,D> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,D> f3) {
+        return Functionals.chain(f1, f2, f3);
+    }
+
+    /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function, Function, Function)} */ @Deprecated
+    public static <A,B,C,D,E> Function<A,E> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,? extends D> f3, final Function<D,E> f4) {
+        return Functionals.chain(f1, f2, f3, f4);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java
new file mode 100644
index 0000000..a3e04cd
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.guava.MaybeFunctions;
+
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.gson.*;
+import com.jayway.jsonpath.JsonPath;
+
+public class JsonFunctions {
+
+    private JsonFunctions() {} // instead use static utility methods
+    
+    public static Function<String, JsonElement> asJson() {
+        return new Function<String, JsonElement>() {
+            @Override public JsonElement apply(String input) {
+                return new JsonParser().parse(input);
+            }
+        };
+    }
+
+    public static <T> Function<JsonElement, List<T>> forEach(final Function<JsonElement, T> func) {
+        return new Function<JsonElement, List<T>>() {
+            @Override public List<T> apply(JsonElement input) {
+                JsonArray array = (JsonArray) input;
+                List<T> result = Lists.newArrayList();
+                for (int i = 0; i < array.size(); i++) {
+                    result.add(func.apply(array.get(i)));
+                }
+                return result;
+            }
+        };
+    }
+
+    
+    /** as {@link #walkM(Iterable)} taking a single string consisting of a dot separated path */
+    public static Function<JsonElement, JsonElement> walk(String elementOrDotSeparatedElements) {
+        return walk( Splitter.on('.').split(elementOrDotSeparatedElements) );
+    }
+
+    /** as {@link #walkM(Iterable)} taking a series of strings (dot separators not respected here) */
+    public static Function<JsonElement, JsonElement> walk(final String... elements) {
+        return walk(Arrays.asList(elements));
+    }
+
+    /** returns a function which traverses the supplied path of entries in a json object (maps of maps of maps...), 
+     * @throws NoSuchElementException if any path is not present as a key in that map */
+    public static Function<JsonElement, JsonElement> walk(final Iterable<String> elements) {
+        // could do this instead, pointing at Maybe for this, and for walkN, but it's slightly less efficient
+//      return Functionals.chain(MaybeFunctions.<JsonElement>wrap(), walkM(elements), MaybeFunctions.<JsonElement>get());
+
+        return new Function<JsonElement, JsonElement>() {
+            @Override public JsonElement apply(JsonElement input) {
+                JsonElement curr = input;
+                for (String element : elements) {
+                    JsonObject jo = curr.getAsJsonObject();
+                    curr = jo.get(element);
+                    if (curr==null)
+                        throw new NoSuchElementException("No element '"+element+" in JSON, when walking "+elements);
+                }
+                return curr;
+            }
+        };
+    }
+
+    
+    /** as {@link #walk(String)} but if any element is not found it simply returns null */
+    public static Function<JsonElement, JsonElement> walkN(@Nullable String elements) {
+        return walkN( Splitter.on('.').split(elements) );
+    }
+
+    /** as {@link #walk(String...))} but if any element is not found it simply returns null */
+    public static Function<JsonElement, JsonElement> walkN(final String... elements) {
+        return walkN(Arrays.asList(elements));
+    }
+
+    /** as {@link #walk(Iterable))} but if any element is not found it simply returns null */
+    public static Function<JsonElement, JsonElement> walkN(final Iterable<String> elements) {
+        return new Function<JsonElement, JsonElement>() {
+            @Override public JsonElement apply(JsonElement input) {
+                JsonElement curr = input;
+                for (String element : elements) {
+                    if (curr==null) return null;
+                    JsonObject jo = curr.getAsJsonObject();
+                    curr = jo.get(element);
+                }
+                return curr;
+            }
+        };
+    }
+
+    /** as {@link #walk(String))} and {@link #walk(Iterable)} */
+    public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(@Nullable String elements) {
+        return walkM( Splitter.on('.').split(elements) );
+    }
+
+    /** as {@link #walk(String...))} and {@link #walk(Iterable)} */
+    public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(final String... elements) {
+        return walkM(Arrays.asList(elements));
+    }
+
+    /** as {@link #walk(Iterable))} but working with objects which {@link Maybe} contain {@link JsonElement},
+     * simply preserving a {@link Maybe#absent()} object if additional walks are requested upon it
+     * (cf jquery) */
+    public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(final Iterable<String> elements) {
+        return new Function<Maybe<JsonElement>, Maybe<JsonElement>>() {
+            @Override public Maybe<JsonElement> apply(Maybe<JsonElement> input) {
+                Maybe<JsonElement> curr = input;
+                for (String element : elements) {
+                    if (curr.isAbsent()) return curr;
+                    JsonObject jo = curr.get().getAsJsonObject();
+                    JsonElement currO = jo.get(element);
+                    if (currO==null) return Maybe.absent("No element '"+element+" in JSON, when walking "+elements);
+                    curr = Maybe.of(currO);
+                }
+                return curr;
+            }
+        };
+    }
+
+    /**
+     * returns an element from a single json primitive value given a full path {@link com.jayway.jsonpath.JsonPath}
+     */
+    public static <T> Function<JsonElement,T> getPath(final String path) {
+        return new Function<JsonElement, T>() {
+            @SuppressWarnings("unchecked")
+            @Override public T apply(JsonElement input) {
+                String jsonString = input.toString();
+                Object rawElement = JsonPath.read(jsonString, path);
+                return (T) rawElement;
+            }
+        };
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> Function<JsonElement, T> cast(final Class<T> expected) {
+        return new Function<JsonElement, T>() {
+            @Override public T apply(JsonElement input) {
+                if (input == null) {
+                    return (T) null;
+                } else if (input.isJsonNull()) {
+                    return (T) null;
+                } else if (expected == boolean.class || expected == Boolean.class) {
+                    return (T) (Boolean) input.getAsBoolean();
+                } else if (expected == char.class || expected == Character.class) {
+                    return (T) (Character) input.getAsCharacter();
+                } else if (expected == byte.class || expected == Byte.class) {
+                    return (T) (Byte) input.getAsByte();
+                } else if (expected == short.class || expected == Short.class) {
+                    return (T) (Short) input.getAsShort();
+                } else if (expected == int.class || expected == Integer.class) {
+                    return (T) (Integer) input.getAsInt();
+                } else if (expected == long.class || expected == Long.class) {
+                    return (T) (Long) input.getAsLong();
+                } else if (expected == float.class || expected == Float.class) {
+                    return (T) (Float) input.getAsFloat();
+                } else if (expected == double.class || expected == Double.class) {
+                    return (T) (Double) input.getAsDouble();
+                } else if (expected == BigDecimal.class) {
+                    return (T) input.getAsBigDecimal();
+                } else if (expected == BigInteger.class) {
+                    return (T) input.getAsBigInteger();
+                } else if (Number.class.isAssignableFrom(expected)) {
+                    // TODO Will result in a class-cast if it's an unexpected sub-type of Number not handled above
+                    return (T) input.getAsNumber();
+                } else if (expected == String.class) {
+                    return (T) input.getAsString();
+                } else if (expected.isArray()) {
+                    JsonArray array = input.getAsJsonArray();
+                    Class<?> componentType = expected.getComponentType();
+                    if (JsonElement.class.isAssignableFrom(componentType)) {
+                        JsonElement[] result = new JsonElement[array.size()];
+                        for (int i = 0; i < array.size(); i++) {
+                            result[i] = array.get(i);
+                        }
+                        return (T) result;
+                    } else {
+                        Object[] result = (Object[]) Array.newInstance(componentType, array.size());
+                        for (int i = 0; i < array.size(); i++) {
+                            result[i] = cast(componentType).apply(array.get(i));
+                        }
+                        return (T) result;
+                    }
+                } else {
+                    throw new IllegalArgumentException("Cannot cast json element to type "+expected);
+                }
+            }
+        };
+    }
+    
+    public static <T> Function<Maybe<JsonElement>, T> castM(final Class<T> expected) {
+        return Functionals.chain(MaybeFunctions.<JsonElement>get(), cast(expected));
+    }
+    
+    public static <T> Function<Maybe<JsonElement>, T> castM(final Class<T> expected, final T defaultValue) {
+        return new Function<Maybe<JsonElement>, T>() {
+            @Override
+            public T apply(Maybe<JsonElement> input) {
+                if (input.isAbsent()) return defaultValue;
+                return cast(expected).apply(input.get());
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
new file mode 100644
index 0000000..caeb21a
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.shell;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.ssh.SshFeed;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.core.task.system.internal.SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by executing shell commands (on the local machine where 
+ * this instance of brooklyn is running). Useful e.g. for paas tools such as Cloud Foundry vmc 
+ * which operate against a remote target.
+ * 
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private ShellFeed feed;
+ * 
+ * //@Override
+ * protected void connectSensors() {
+ *   super.connectSensors();
+ *   
+ *   feed = ShellFeed.builder()
+ *       .entity(this)
+ *       .machine(mySshMachineLachine)
+ *       .poll(new ShellPollConfig<Long>(DISK_USAGE)
+ *           .command("df -P | grep /dev")
+ *           .failOnNonZeroResultCode(true)
+ *           .onSuccess(new Function<SshPollValue, Long>() {
+ *                public Long apply(SshPollValue input) {
+ *                  String[] parts = input.getStdout().split("[ \\t]+");
+ *                  return Long.parseLong(parts[2]);
+ *                }}))
+ *       .build();
+ * }
+ * 
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ *   super.disconnectSensors();
+ *   if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ * 
+ * @see SshFeed (to run on remote machines)
+ * @see FunctionFeed (for arbitrary functions)
+ * 
+ * @author aled
+ */
+public class ShellFeed extends AbstractFeed {
+
+    public static final Logger log = LoggerFactory.getLogger(ShellFeed.class);
+
+    @SuppressWarnings("serial")
+    private static final ConfigKey<SetMultimap<ShellPollIdentifier, ShellPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+            new TypeToken<SetMultimap<ShellPollIdentifier, ShellPollConfig<?>>>() {},
+            "polls");
+
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static class Builder {
+        private EntityLocal entity;
+        private long period = 500;
+        private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
+        private List<ShellPollConfig<?>> polls = Lists.newArrayList();
+        private String uniqueTag;
+        private volatile boolean built;
+        
+        public Builder entity(EntityLocal val) {
+            this.entity = val;
+            return this;
+        }
+        public Builder period(long millis) {
+            return period(millis, TimeUnit.MILLISECONDS);
+        }
+        public Builder period(long val, TimeUnit units) {
+            this.period = val;
+            this.periodUnits = units;
+            return this;
+        }
+        public Builder poll(ShellPollConfig<?> config) {
+            polls.add(config);
+            return this;
+        }
+        public Builder uniqueTag(String uniqueTag) {
+            this.uniqueTag = uniqueTag;
+            return this;
+        }
+        public ShellFeed build() {
+            built = true;
+            ShellFeed result = new ShellFeed(this);
+            result.setEntity(checkNotNull(entity, "entity"));
+            result.start();
+            return result;
+        }
+        @Override
+        protected void finalize() {
+            if (!built) log.warn("ShellFeed.Builder created, but build() never called");
+        }
+    }
+    
+    private static class ShellPollIdentifier {
+        final String command;
+        final Map<String, String> env;
+        final File dir;
+        final String input;
+        final String context;
+        final long timeout;
+
+        private ShellPollIdentifier(String command, Map<String, String> env, File dir, String input, String context, long timeout) {
+            this.command = checkNotNull(command, "command");
+            this.env = checkNotNull(env, "env");
+            this.dir = dir;
+            this.input = input;
+            this.context = checkNotNull(context, "context");
+            this.timeout = timeout;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(command, env, dir, input, timeout);
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof ShellPollIdentifier)) {
+                return false;
+            }
+            ShellPollIdentifier o = (ShellPollIdentifier) other;
+            return Objects.equal(command, o.command) &&
+                    Objects.equal(env, o.env) &&
+                    Objects.equal(dir, o.dir) &&
+                    Objects.equal(input, o.input) &&
+                    Objects.equal(timeout, o.timeout);
+        }
+    }
+    
+    /**
+     * For rebind; do not call directly; use builder
+     */
+    public ShellFeed() {
+    }
+
+    protected ShellFeed(Builder builder) {
+        super();
+
+        SetMultimap<ShellPollIdentifier, ShellPollConfig<?>> polls = HashMultimap.<ShellPollIdentifier,ShellPollConfig<?>>create();
+        for (ShellPollConfig<?> config : builder.polls) {
+            if (!config.isEnabled()) continue;
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            ShellPollConfig<?> configCopy = new ShellPollConfig(config);
+            if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
+            String command = config.getCommand();
+            Map<String, String> env = config.getEnv();
+            File dir = config.getDir();
+            String input = config.getInput();
+            String context = config.getSensor().getName();
+            long timeout = config.getTimeout();
+
+            polls.put(new ShellPollIdentifier(command, env, dir, input, context, timeout), configCopy);
+        }
+        setConfig(POLLS, polls);
+        initUniqueTag(builder.uniqueTag, polls.values());
+    }
+
+    @Override
+    protected void preStart() {
+        SetMultimap<ShellPollIdentifier, ShellPollConfig<?>> polls = getConfig(POLLS);
+        
+        for (final ShellPollIdentifier pollInfo : polls.keySet()) {
+            Set<ShellPollConfig<?>> configs = polls.get(pollInfo);
+            long minPeriod = Integer.MAX_VALUE;
+            Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
+
+            for (ShellPollConfig<?> config : configs) {
+                handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
+                if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+            }
+
+            final ProcessTaskFactory<?> taskFactory = newTaskFactory(pollInfo.command, pollInfo.env, pollInfo.dir, 
+                    pollInfo.input, pollInfo.context, pollInfo.timeout);
+            final ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext();
+
+            getPoller().scheduleAtFixedRate(
+                    new Callable<SshPollValue>() {
+                        @Override public SshPollValue call() throws Exception {
+                            ProcessTaskWrapper<?> taskWrapper = taskFactory.newTask();
+                            executionContext.submit(taskWrapper);
+                            taskWrapper.block();
+                            Optional<Integer> exitCode = Optional.fromNullable(taskWrapper.getExitCode());
+                            return new SshPollValue(null, exitCode.or(-1), taskWrapper.getStdout(), taskWrapper.getStderr());
+                        }}, 
+                    new DelegatingPollHandler<SshPollValue>(handlers), 
+                    minPeriod);
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected Poller<SshPollValue> getPoller() {
+        return (Poller<SshPollValue>) super.getPoller();
+    }
+    
+    /**
+     * Executes the given command (using `bash -l -c $command`, so as to have a good path set).
+     * 
+     * @param command The command to execute
+     * @param env     Environment variable settings, in format name=value
+     * @param dir     Working directory, or null to inherit from current process
+     * @param input   Input to send to the command (if not null)
+     */
+    protected ProcessTaskFactory<?> newTaskFactory(final String command, Map<String,String> env, File dir, String input, final String summary, final long timeout) {
+        // FIXME Add generic timeout() support to task ExecutionManager
+        if (timeout > 0) {
+            log.warn("Timeout ({}ms) not currently supported for ShellFeed {}", timeout, this);
+        }
+
+        return new ConcreteSystemProcessTaskFactory<Object>(command)
+                .environmentVariables(env)
+                .loginShell(true)
+                .directory(dir)
+                .runAsCommand()
+                .summary(summary);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java
new file mode 100644
index 0000000..e1147c3
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.shell;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.util.collections.MutableList;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+
+public class ShellPollConfig<T> extends PollConfig<SshPollValue, T, ShellPollConfig<T>> {
+
+    private String command;
+    private Map<String,String> env = Maps.newLinkedHashMap();
+    private long timeout = -1;
+    private File dir;
+    private String input;
+
+    public static final Predicate<SshPollValue> DEFAULT_SUCCESS = new Predicate<SshPollValue>() {
+        @Override
+        public boolean apply(@Nullable SshPollValue input) {
+            return input != null && input.getExitStatus() == 0;
+        }};
+
+    public ShellPollConfig(AttributeSensor<T> sensor) {
+        super(sensor);
+        super.checkSuccess(DEFAULT_SUCCESS);
+    }
+
+    public ShellPollConfig(ShellPollConfig<T> other) {
+        super(other);
+        command = other.command;
+        env = other.env;
+        timeout = other.timeout;
+        dir = other.dir;
+        input = other.input;
+    }
+    
+    public String getCommand() {
+        return command;
+    }
+    
+    public Map<String, String> getEnv() {
+        return env;
+    }
+
+    public File getDir() {
+        return dir;
+    }
+
+    public String getInput() {
+        return input;
+    }
+    
+    public long getTimeout() {
+        return timeout;
+    }
+    
+    public ShellPollConfig<T> command(String val) {
+        this.command = val;
+        return this;
+    }
+
+    public ShellPollConfig<T> env(String key, String val) {
+        env.put(checkNotNull(key, "key"), checkNotNull(val, "val"));
+        return this;
+    }
+    
+    public ShellPollConfig<T> env(Map<String,String> val) {
+        for (Map.Entry<String, String> entry : checkNotNull(val, "map").entrySet()) {
+            env(entry.getKey(), entry.getValue());
+        }
+        return this;
+    }
+    
+    public ShellPollConfig<T> dir(File val) {
+        this.dir = val;
+        return this;
+    }
+    
+    public ShellPollConfig<T> input(String val) {
+        this.input = val;
+        return this;
+    }
+    
+    public ShellPollConfig<T> timeout(long timeout) {
+        return timeout(timeout, TimeUnit.MILLISECONDS);
+    }
+    
+    public ShellPollConfig<T> timeout(long timeout, TimeUnit units) {
+        this.timeout = units.toMillis(timeout);
+        return this;
+    }
+
+    @Override protected String toStringBaseName() { return "shell"; }
+    @Override protected String toStringPollSource() { return command; }
+    @Override protected MutableList<Object> equalsFields() { return super.equalsFields().appendIfNotNull(command); }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
new file mode 100644
index 0000000..8663137
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.core.location.Locations;
+import org.apache.brooklyn.core.location.Machines;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.internal.ssh.SshTool;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by polling over ssh.
+ * 
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private SshFeed feed;
+ * 
+ * //@Override
+ * protected void connectSensors() {
+ *   super.connectSensors();
+ *   
+ *   feed = SshFeed.builder()
+ *       .entity(this)
+ *       .machine(mySshMachineLachine)
+ *       .poll(new SshPollConfig<Boolean>(SERVICE_UP)
+ *           .command("rabbitmqctl -q status")
+ *           .onSuccess(new Function<SshPollValue, Boolean>() {
+ *               public Boolean apply(SshPollValue input) {
+ *                 return (input.getExitStatus() == 0);
+ *               }}))
+ *       .build();
+ * }
+ * 
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ *   super.disconnectSensors();
+ *   if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ * 
+ * @author aled
+ */
+public class SshFeed extends AbstractFeed {
+
+    public static final Logger log = LoggerFactory.getLogger(SshFeed.class);
+    
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Supplier<SshMachineLocation>> MACHINE = ConfigKeys.newConfigKey(
+            new TypeToken<Supplier<SshMachineLocation>>() {},
+            "machine");
+    
+    public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand");
+    
+    @SuppressWarnings("serial")
+    public static final ConfigKey<SetMultimap<SshPollIdentifier, SshPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+            new TypeToken<SetMultimap<SshPollIdentifier, SshPollConfig<?>>>() {},
+            "polls");
+    
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static class Builder {
+        private EntityLocal entity;
+        private boolean onlyIfServiceUp = false;
+        private Supplier<SshMachineLocation> machine;
+        private Duration period = Duration.of(500, TimeUnit.MILLISECONDS);
+        private List<SshPollConfig<?>> polls = Lists.newArrayList();
+        private boolean execAsCommand = false;
+        private String uniqueTag;
+        private volatile boolean built;
+        
+        public Builder entity(EntityLocal val) {
+            this.entity = val;
+            return this;
+        }
+        public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
+        public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { 
+            this.onlyIfServiceUp = onlyIfServiceUp; 
+            return this; 
+        }
+        /** optional, to force a machine; otherwise it is inferred from the entity */
+        public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); }
+        /** optional, to force a machine; otherwise it is inferred from the entity */
+        public Builder machine(Supplier<SshMachineLocation> val) {
+            this.machine = val;
+            return this;
+        }
+        public Builder period(Duration period) {
+            this.period = period;
+            return this;
+        }
+        public Builder period(long millis) {
+            return period(Duration.of(millis, TimeUnit.MILLISECONDS));
+        }
+        public Builder period(long val, TimeUnit units) {
+            return period(Duration.of(val, units));
+        }
+        public Builder poll(SshPollConfig<?> config) {
+            polls.add(config);
+            return this;
+        }
+        public Builder execAsCommand() {
+            execAsCommand = true;
+            return this;
+        }
+        public Builder execAsScript() {
+            execAsCommand = false;
+            return this;
+        }
+        public Builder uniqueTag(String uniqueTag) {
+            this.uniqueTag = uniqueTag;
+            return this;
+        }
+        public SshFeed build() {
+            built = true;
+            SshFeed result = new SshFeed(this);
+            result.setEntity(checkNotNull(entity, "entity"));
+            result.start();
+            return result;
+        }
+        @Override
+        protected void finalize() {
+            if (!built) log.warn("SshFeed.Builder created, but build() never called");
+        }
+    }
+    
+    private static class SshPollIdentifier {
+        final Supplier<String> command;
+        final Supplier<Map<String, String>> env;
+
+        private SshPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) {
+            this.command = checkNotNull(command, "command");
+            this.env = checkNotNull(env, "env");
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(command, env);
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof SshPollIdentifier)) {
+                return false;
+            }
+            SshPollIdentifier o = (SshPollIdentifier) other;
+            return Objects.equal(command, o.command) &&
+                    Objects.equal(env, o.env);
+        }
+    }
+    
+    /** @deprecated since 0.7.0, use static convenience on {@link Locations} */
+    @Deprecated
+    public static SshMachineLocation getMachineOfEntity(Entity entity) {
+        return Machines.findUniqueSshMachineLocation(entity.getLocations()).orNull();
+    }
+
+    /**
+     * For rebind; do not call directly; use builder
+     */
+    public SshFeed() {
+    }
+    
+    protected SshFeed(final Builder builder) {
+        setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
+        setConfig(MACHINE, builder.machine != null ? builder.machine : null);
+        setConfig(EXEC_AS_COMMAND, builder.execAsCommand);
+        
+        SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create();
+        for (SshPollConfig<?> config : builder.polls) {
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            SshPollConfig<?> configCopy = new SshPollConfig(config);
+            if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
+            polls.put(new SshPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy);
+        }
+        setConfig(POLLS, polls);
+        initUniqueTag(builder.uniqueTag, polls.values());
+    }
+
+    protected SshMachineLocation getMachine() {
+        Supplier<SshMachineLocation> supplier = getConfig(MACHINE);
+        if (supplier != null) {
+            return supplier.get();
+        } else {
+            return Locations.findUniqueSshMachineLocation(entity.getLocations()).get();
+        }
+    }
+    
+    @Override
+    protected void preStart() {
+        SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = getConfig(POLLS);
+        
+        for (final SshPollIdentifier pollInfo : polls.keySet()) {
+            Set<SshPollConfig<?>> configs = polls.get(pollInfo);
+            long minPeriod = Integer.MAX_VALUE;
+            Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
+
+            for (SshPollConfig<?> config : configs) {
+                handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
+                if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+            }
+            
+            getPoller().scheduleAtFixedRate(
+                    new Callable<SshPollValue>() {
+                        public SshPollValue call() throws Exception {
+                            return exec(pollInfo.command.get(), pollInfo.env.get());
+                        }}, 
+                    new DelegatingPollHandler<SshPollValue>(handlers),
+                    minPeriod);
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected Poller<SshPollValue> getPoller() {
+        return (Poller<SshPollValue>) super.getPoller();
+    }
+    
+    private SshPollValue exec(String command, Map<String,String> env) throws IOException {
+        SshMachineLocation machine = getMachine();
+        Boolean execAsCommand = getConfig(EXEC_AS_COMMAND);
+        if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env});
+        ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+        ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+
+        int exitStatus;
+        ConfigBag flags = ConfigBag.newInstance()
+            .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true)
+            .configure(SshTool.PROP_OUT_STREAM, stdout)
+            .configure(SshTool.PROP_ERR_STREAM, stderr);
+        if (Boolean.TRUE.equals(execAsCommand)) {
+            exitStatus = machine.execCommands(flags.getAllConfig(),
+                    "ssh-feed", ImmutableList.of(command), env);
+        } else {
+            exitStatus = machine.execScript(flags.getAllConfig(),
+                    "ssh-feed", ImmutableList.of(command), env);
+        }
+
+        return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
new file mode 100644
index 0000000..8fec87f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<T>> {
+
+    private Supplier<String> commandSupplier;
+    private List<Supplier<Map<String,String>>> dynamicEnvironmentSupplier = MutableList.of();
+
+    public static final Predicate<SshPollValue> DEFAULT_SUCCESS = new Predicate<SshPollValue>() {
+        @Override
+        public boolean apply(@Nullable SshPollValue input) {
+            return input != null && input.getExitStatus() == 0;
+        }};
+
+    public SshPollConfig(AttributeSensor<T> sensor) {
+        super(sensor);
+        super.checkSuccess(DEFAULT_SUCCESS);
+    }
+
+    public SshPollConfig(SshPollConfig<T> other) {
+        super(other);
+        commandSupplier = other.commandSupplier;
+    }
+    
+    /** @deprecated since 0.7.0; use {@link #getCommandSupplier()} and resolve just-in-time */
+    public String getCommand() {
+        return getCommandSupplier().get();
+    }
+    public Supplier<String> getCommandSupplier() {
+        return commandSupplier;
+    }
+    
+    /** @deprecated since 0.7.0; use {@link #getEnvSupplier()} and resolve just-in-time */
+    public Map<String, String> getEnv() {
+        return getEnvSupplier().get();
+    }
+    public Supplier<Map<String,String>> getEnvSupplier() {
+        return new Supplier<Map<String,String>>() {
+            @Override
+            public Map<String, String> get() {
+                Map<String,String> result = MutableMap.of();
+                for (Supplier<Map<String, String>> envS: dynamicEnvironmentSupplier) {
+                    if (envS!=null) {
+                        Map<String, String> envM = envS.get();
+                        if (envM!=null) {
+                            mergeEnvMaps(envM, result);
+                        }
+                    }
+                }
+                return result;
+            }
+        };
+    }
+    
+    protected void mergeEnvMaps(Map<String,String> supplied, Map<String,String> target) {
+        if (supplied==null) return;
+        // as the value is a string there is no need to look at deep merge behaviour
+        target.putAll(supplied);
+    }
+
+    public SshPollConfig<T> command(String val) { return command(Suppliers.ofInstance(val)); }
+    public SshPollConfig<T> command(Supplier<String> val) {
+        this.commandSupplier = val;
+        return this;
+    }
+
+    /** add the given env param; sequence is as per {@link #env(Supplier)} */
+    public SshPollConfig<T> env(String key, String val) {
+        return env(Collections.singletonMap(key, val));
+    }
+    
+    /** add the given env params; sequence is as per {@link #env(Supplier)}.
+     * behaviour is undefined if the map supplied here is subsequently changed.
+     * <p>
+     * if a map's contents might change, use {@link #env(Supplier)} */
+    public SshPollConfig<T> env(Map<String,String> val) {
+        if (val==null) return this;
+        return env(Suppliers.ofInstance(val));
+    }
+
+    /** 
+     * adds the given dynamic supplier of environment variables.
+     * <p>
+     * use of a supplier allows env vars to be computed on each execution,
+     * for example to take the most recent sensor values.
+     * <p>
+     * in the case of multiple map suppliers, static maps, or static {@link #env(String, String)} 
+     * key value pairs, the order in which they are specified here is the order
+     * in which they are computed and applied. 
+     **/
+    public SshPollConfig<T> env(Supplier<Map<String,String>> val) {
+        Preconditions.checkNotNull(val);
+        dynamicEnvironmentSupplier.add(val);
+        return this;
+    }
+
+    @Override protected String toStringBaseName() { return "ssh"; }
+    @Override protected Object toStringPollSource() {
+        if (getCommandSupplier()==null) return null;
+        String command = getCommandSupplier().get();
+        return command;
+    }
+    @Override protected MutableList<Object> equalsFields() { 
+        return super.equalsFields()
+            .appendIfNotNull(getCommandSupplier()!=null ? getCommandSupplier().get() : null)
+            .appendIfNotNull(getEnvSupplier()!=null ? getEnvSupplier().get() : null); 
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java
new file mode 100644
index 0000000..af0a8a6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+
+public class SshPollValue {
+
+    private final SshMachineLocation machine;
+    private final int exitStatus;
+    private final String stdout;
+    private final String stderr;
+
+    public SshPollValue(SshMachineLocation machine, int exitStatus, String stdout, String stderr) {
+        this.machine = machine;
+        this.exitStatus = exitStatus;
+        this.stdout = stdout;
+        this.stderr = stderr;
+    }
+    
+    /** The machine the command will run on. */
+    public SshMachineLocation getMachine() {
+        return machine;
+    }
+
+    /** Command exit status, or -1 if error is set. */
+    public int getExitStatus() {
+        return exitStatus;
+    }
+
+    /** Command standard output; may be null if no content available. */
+    @Nullable
+    public String getStdout() {
+        return stdout;
+    }
+
+    /** Command standard error; may be null if no content available. */
+    @Nullable
+    public String getStderr() {
+        return stderr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java
new file mode 100644
index 0000000..370c3ce
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicates;
+
+public class SshValueFunctions {
+
+    public static Function<SshPollValue, Integer> exitStatus() {
+        return new Function<SshPollValue, Integer>() {
+            @Override public Integer apply(SshPollValue input) {
+                return input.getExitStatus();
+            }
+        };
+    }
+
+    public static Function<SshPollValue, String> stdout() {
+        return new Function<SshPollValue, String>() {
+            @Override public String apply(SshPollValue input) {
+                return input.getStdout();
+            }
+        };
+    }
+    
+    public static Function<SshPollValue, String> stderr() {
+        return new Function<SshPollValue, String>() {
+            @Override public String apply(SshPollValue input) {
+                return input.getStderr();
+            }
+        };
+    }
+    
+    public static Function<SshPollValue, Boolean> exitStatusEquals(final int expected) {
+        return chain(SshValueFunctions.exitStatus(), Functions.forPredicate(Predicates.equalTo(expected)));
+    }
+
+    // TODO Do we want these chain methods? Does guava have them already? Duplicated in HttpValueFunctions.
+    public static <A,B,C> Function<A,C> chain(final Function<A,? extends B> f1, final Function<B,C> f2) {
+        return new Function<A,C>() {
+            @Override public C apply(@Nullable A input) {
+                return f2.apply(f1.apply(input));
+            }
+        };
+    }
+    
+    public static <A,B,C,D> Function<A,D> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,D> f3) {
+        return new Function<A,D>() {
+            @Override public D apply(@Nullable A input) {
+                return f3.apply(f2.apply(f1.apply(input)));
+            }
+        };
+    }
+}