You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:14 UTC
[26/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to
o.a.b.feed and o.a.b.core.feed
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
new file mode 100644
index 0000000..e8cfc9f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.util.core.http.HttpTool;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.brooklyn.util.core.http.HttpTool.HttpClientBuilder;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.HttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by polling over http.
+ *
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private HttpFeed feed;
+ *
+ * //@Override
+ * protected void connectSensors() {
+ * super.connectSensors();
+ *
+ * feed = HttpFeed.builder()
+ * .entity(this)
+ * .period(200)
+ * .baseUri(String.format("http://%s:%s/management/subsystem/web/connector/http/read-resource", host, port))
+ * .baseUriVars(ImmutableMap.of("include-runtime","true"))
+ * .poll(new HttpPollConfig<Boolean>(SERVICE_UP)
+ * .onSuccess(HttpValueFunctions.responseCodeEquals(200))
+ * .onError(Functions.constant(false)))
+ * .poll(new HttpPollConfig<Integer>(REQUEST_COUNT)
+ * .onSuccess(HttpValueFunctions.jsonContents("requestCount", Integer.class)))
+ * .build();
+ * }
+ *
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ * super.disconnectSensors();
+ * if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ * <p>
+ *
+ * This also supports giving a Supplier for the URL
+ * (e.g. {@link Entities#attributeSupplier(org.apache.brooklyn.api.entity.Entity, org.apache.brooklyn.api.event.AttributeSensor)})
+ * from a sensor. Note however that if a Supplier-based sensor is *https*,
+ * https-specific initialization may not occur if the URL is not available at start time,
+ * and it may report errors if that sensor is not available.
+ * Some guidance for controlling enablement of a feed based on availability of a sensor
+ * can be seen in HttpLatencyDetector (in brooklyn-policy).
+ *
+ * @author aled
+ */
+public class HttpFeed extends AbstractFeed {
+
+ public static final Logger log = LoggerFactory.getLogger(HttpFeed.class);
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<SetMultimap<HttpPollIdentifier, HttpPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<HttpPollIdentifier, HttpPollConfig<?>>>() {},
+ "polls");
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private EntityLocal entity;
+ private boolean onlyIfServiceUp = false;
+ private Supplier<URI> baseUriProvider;
+ private Duration period = Duration.millis(500);
+ private List<HttpPollConfig<?>> polls = Lists.newArrayList();
+ private URI baseUri;
+ private Map<String, String> baseUriVars = Maps.newLinkedHashMap();
+ private Map<String, String> headers = Maps.newLinkedHashMap();
+ private boolean suspended = false;
+ private Credentials credentials;
+ private String uniqueTag;
+ private volatile boolean built;
+
+ public Builder entity(EntityLocal val) {
+ this.entity = val;
+ return this;
+ }
+ public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
+ public Builder onlyIfServiceUp(boolean onlyIfServiceUp) {
+ this.onlyIfServiceUp = onlyIfServiceUp;
+ return this;
+ }
+ public Builder baseUri(Supplier<URI> val) {
+ if (baseUri!=null && val!=null)
+ throw new IllegalStateException("Builder cannot take both a URI and a URI Provider");
+ this.baseUriProvider = val;
+ return this;
+ }
+ public Builder baseUri(URI val) {
+ if (baseUriProvider!=null && val!=null)
+ throw new IllegalStateException("Builder cannot take both a URI and a URI Provider");
+ this.baseUri = val;
+ return this;
+ }
+ public Builder baseUrl(URL val) {
+ return baseUri(URI.create(val.toString()));
+ }
+ public Builder baseUri(String val) {
+ return baseUri(URI.create(val));
+ }
+ public Builder baseUriVars(Map<String,String> vals) {
+ baseUriVars.putAll(vals);
+ return this;
+ }
+ public Builder baseUriVar(String key, String val) {
+ baseUriVars.put(key, val);
+ return this;
+ }
+ public Builder headers(Map<String,String> vals) {
+ headers.putAll(vals);
+ return this;
+ }
+ public Builder header(String key, String val) {
+ headers.put(key, val);
+ return this;
+ }
+ public Builder period(Duration duration) {
+ this.period = duration;
+ return this;
+ }
+ public Builder period(long millis) {
+ return period(millis, TimeUnit.MILLISECONDS);
+ }
+ public Builder period(long val, TimeUnit units) {
+ return period(Duration.of(val, units));
+ }
+ public Builder poll(HttpPollConfig<?> config) {
+ polls.add(config);
+ return this;
+ }
+ public Builder suspended() {
+ return suspended(true);
+ }
+ public Builder suspended(boolean startsSuspended) {
+ this.suspended = startsSuspended;
+ return this;
+ }
+ public Builder credentials(String username, String password) {
+ this.credentials = new UsernamePasswordCredentials(username, password);
+ return this;
+ }
+ public Builder credentialsIfNotNull(String username, String password) {
+ if (username != null && password != null) {
+ this.credentials = new UsernamePasswordCredentials(username, password);
+ }
+ return this;
+ }
+ public Builder uniqueTag(String uniqueTag) {
+ this.uniqueTag = uniqueTag;
+ return this;
+ }
+ public HttpFeed build() {
+ built = true;
+ HttpFeed result = new HttpFeed(this);
+ result.setEntity(checkNotNull(entity, "entity"));
+ if (suspended) result.suspend();
+ result.start();
+ return result;
+ }
+ @Override
+ protected void finalize() {
+ if (!built) log.warn("HttpFeed.Builder created, but build() never called");
+ }
+ }
+
+ private static class HttpPollIdentifier {
+ final String method;
+ final Supplier<URI> uriProvider;
+ final Map<String,String> headers;
+ final byte[] body;
+ final Optional<Credentials> credentials;
+ final Duration connectionTimeout;
+ final Duration socketTimeout;
+ private HttpPollIdentifier(String method, Supplier<URI> uriProvider, Map<String, String> headers, byte[] body,
+ Optional<Credentials> credentials, Duration connectionTimeout, Duration socketTimeout) {
+ this.method = checkNotNull(method, "method").toLowerCase();
+ this.uriProvider = checkNotNull(uriProvider, "uriProvider");
+ this.headers = checkNotNull(headers, "headers");
+ this.body = body;
+ this.credentials = checkNotNull(credentials, "credentials");
+ this.connectionTimeout = connectionTimeout;
+ this.socketTimeout = socketTimeout;
+
+ if (!(this.method.equals("get") || this.method.equals("post"))) {
+ throw new IllegalArgumentException("Unsupported HTTP method (only supports GET and POST): "+method);
+ }
+ if (body != null && method.equalsIgnoreCase("get")) {
+ throw new IllegalArgumentException("Must not set body for http GET method");
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(method, uriProvider, headers, body, credentials);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof HttpPollIdentifier)) {
+ return false;
+ }
+ HttpPollIdentifier o = (HttpPollIdentifier) other;
+ return Objects.equal(method, o.method) &&
+ Objects.equal(uriProvider, o.uriProvider) &&
+ Objects.equal(headers, o.headers) &&
+ Objects.equal(body, o.body) &&
+ Objects.equal(credentials, o.credentials);
+ }
+ }
+
+ /**
+ * For rebind; do not call directly; use builder
+ */
+ public HttpFeed() {
+ }
+
+ protected HttpFeed(Builder builder) {
+ setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
+ Map<String,String> baseHeaders = ImmutableMap.copyOf(checkNotNull(builder.headers, "headers"));
+
+ SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = HashMultimap.<HttpPollIdentifier,HttpPollConfig<?>>create();
+ for (HttpPollConfig<?> config : builder.polls) {
+ if (!config.isEnabled()) continue;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ HttpPollConfig<?> configCopy = new HttpPollConfig(config);
+ if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
+ String method = config.getMethod();
+ Map<String,String> headers = config.buildHeaders(baseHeaders);
+ byte[] body = config.getBody();
+ Duration connectionTimeout = config.getConnectionTimeout();
+ Duration socketTimeout = config.getSocketTimeout();
+
+ Optional<Credentials> credentials = Optional.fromNullable(builder.credentials);
+
+ Supplier<URI> baseUriProvider = builder.baseUriProvider;
+ if (builder.baseUri!=null) {
+ if (baseUriProvider!=null)
+ throw new IllegalStateException("Not permitted to supply baseUri and baseUriProvider");
+ Map<String,String> baseUriVars = ImmutableMap.copyOf(checkNotNull(builder.baseUriVars, "baseUriVars"));
+ URI uri = config.buildUri(builder.baseUri, baseUriVars);
+ baseUriProvider = Suppliers.ofInstance(uri);
+ } else if (!builder.baseUriVars.isEmpty()) {
+ throw new IllegalStateException("Not permitted to supply URI vars when using a URI provider; pass the vars to the provider instead");
+ }
+ checkNotNull(baseUriProvider);
+
+ polls.put(new HttpPollIdentifier(method, baseUriProvider, headers, body, credentials, connectionTimeout, socketTimeout), configCopy);
+ }
+ setConfig(POLLS, polls);
+ initUniqueTag(builder.uniqueTag, polls.values());
+ }
+
+ @Override
+ protected void preStart() {
+ SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = getConfig(POLLS);
+
+ for (final HttpPollIdentifier pollInfo : polls.keySet()) {
+ // Though HttpClients are thread safe and can take advantage of connection pooling
+ // and authentication caching, the httpcomponents documentation says:
+ // "While HttpClient instances are thread safe and can be shared between multiple
+ // threads of execution, it is highly recommended that each thread maintains its
+ // own dedicated instance of HttpContext.
+ // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
+ final HttpClient httpClient = createHttpClient(pollInfo);
+
+ Set<HttpPollConfig<?>> configs = polls.get(pollInfo);
+ long minPeriod = Integer.MAX_VALUE;
+ Set<AttributePollHandler<? super HttpToolResponse>> handlers = Sets.newLinkedHashSet();
+
+ for (HttpPollConfig<?> config : configs) {
+ handlers.add(new AttributePollHandler<HttpToolResponse>(config, entity, this));
+ if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+ }
+
+ Callable<HttpToolResponse> pollJob;
+
+ if (pollInfo.method.equals("get")) {
+ pollJob = new Callable<HttpToolResponse>() {
+ public HttpToolResponse call() throws Exception {
+ if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
+ return HttpTool.httpGet(httpClient, pollInfo.uriProvider.get(), pollInfo.headers);
+ }};
+ } else if (pollInfo.method.equals("post")) {
+ pollJob = new Callable<HttpToolResponse>() {
+ public HttpToolResponse call() throws Exception {
+ if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
+ return HttpTool.httpPost(httpClient, pollInfo.uriProvider.get(), pollInfo.headers, pollInfo.body);
+ }};
+ } else if (pollInfo.method.equals("head")) {
+ pollJob = new Callable<HttpToolResponse>() {
+ public HttpToolResponse call() throws Exception {
+ if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo);
+ return HttpTool.httpHead(httpClient, pollInfo.uriProvider.get(), pollInfo.headers);
+ }};
+ } else {
+ throw new IllegalStateException("Unexpected http method: "+pollInfo.method);
+ }
+
+ getPoller().scheduleAtFixedRate(pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod);
+ }
+ }
+
+ // TODO Should we really trustAll for https? Make configurable?
+ private HttpClient createHttpClient(HttpPollIdentifier pollIdentifier) {
+ URI uri = pollIdentifier.uriProvider.get();
+ HttpClientBuilder builder = HttpTool.httpClientBuilder()
+ .trustAll()
+ .laxRedirect(true);
+ if (uri != null) builder.uri(uri);
+ if (uri != null) builder.credential(pollIdentifier.credentials);
+ if (pollIdentifier.connectionTimeout != null) {
+ builder.connectionTimeout(pollIdentifier.connectionTimeout);
+ }
+ if (pollIdentifier.socketTimeout != null) {
+ builder.socketTimeout(pollIdentifier.socketTimeout);
+ }
+ return builder.build();
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Poller<HttpToolResponse> getPoller() {
+ return (Poller<HttpToolResponse>) super.getPoller();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java
new file mode 100644
index 0000000..e019293
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.net.URI;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.http.HttpTool;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+
+public class HttpPollConfig<T> extends PollConfig<HttpToolResponse, T, HttpPollConfig<T>> {
+
+ private String method = "GET";
+ private String suburl = "";
+ private Map<String, String> vars = ImmutableMap.<String,String>of();
+ private Map<String, String> headers = ImmutableMap.<String,String>of();
+ private byte[] body;
+ private Duration connectionTimeout;
+ private Duration socketTimeout;
+
+ public static final Predicate<HttpToolResponse> DEFAULT_SUCCESS = new Predicate<HttpToolResponse>() {
+ @Override
+ public boolean apply(@Nullable HttpToolResponse input) {
+ return input != null && input.getResponseCode() >= 200 && input.getResponseCode() <= 399;
+ }};
+
+ public static <T> HttpPollConfig<T> forSensor(AttributeSensor<T> sensor) {
+ return new HttpPollConfig<T>(sensor);
+ }
+
+ public static HttpPollConfig<Void> forMultiple() {
+ return new HttpPollConfig<Void>(PollConfig.NO_SENSOR);
+ }
+
+ public HttpPollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ super.checkSuccess(DEFAULT_SUCCESS);
+ }
+
+ public HttpPollConfig(HttpPollConfig<T> other) {
+ super(other);
+ suburl = other.suburl;
+ vars = other.vars;
+ method = other.method;
+ headers = other.headers;
+ }
+
+ public String getSuburl() {
+ return suburl;
+ }
+
+ public Map<String, String> getVars() {
+ return vars;
+ }
+
+ public Duration getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public Duration getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public byte[] getBody() {
+ return body;
+ }
+
+ public HttpPollConfig<T> method(String val) {
+ this.method = val; return this;
+ }
+
+ public HttpPollConfig<T> suburl(String val) {
+ this.suburl = val; return this;
+ }
+
+ public HttpPollConfig<T> vars(Map<String,String> val) {
+ this.vars = val; return this;
+ }
+
+ public HttpPollConfig<T> headers(Map<String,String> val) {
+ this.headers = val; return this;
+ }
+
+ public HttpPollConfig<T> body(byte[] val) {
+ this.body = val; return this;
+ }
+ public HttpPollConfig<T> connectionTimeout(Duration val) {
+ this.connectionTimeout = val;
+ return this;
+ }
+ public HttpPollConfig<T> socketTimeout(Duration val) {
+ this.socketTimeout = val;
+ return this;
+ }
+ public URI buildUri(URI baseUri, Map<String,String> baseUriVars) {
+ String uri = (baseUri != null ? baseUri.toString() : "") + (suburl != null ? suburl : "");
+ Map<String,String> allvars = concat(baseUriVars, vars);
+
+ if (allvars != null && allvars.size() > 0) {
+ uri += "?" + HttpTool.encodeUrlParams(allvars);
+ }
+
+ return URI.create(uri);
+ }
+
+ public Map<String, String> buildHeaders(Map<String, String> baseHeaders) {
+ return MutableMap.<String,String>builder()
+ .putAll(baseHeaders)
+ .putAll(headers)
+ .build();
+ }
+
+ @SuppressWarnings("unchecked")
+ private <K,V> Map<K,V> concat(Map<? extends K,? extends V> map1, Map<? extends K,? extends V> map2) {
+ if (map1 == null || map1.isEmpty()) return (Map<K,V>) map2;
+ if (map2 == null || map2.isEmpty()) return (Map<K,V>) map1;
+
+ // TODO Not using Immutable builder, because that fails if duplicates in map1 and map2
+ return MutableMap.<K,V>builder().putAll(map1).putAll(map2).build();
+ }
+
+ @Override protected String toStringBaseName() { return "http"; }
+ @Override protected String toStringPollSource() { return suburl; }
+ @Override
+ protected MutableList<Object> equalsFields() {
+ return super.equalsFields().appendIfNotNull(method).appendIfNotNull(vars).appendIfNotNull(headers)
+ .appendIfNotNull(body).appendIfNotNull(connectionTimeout).appendIfNotNull(socketTimeout);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java
new file mode 100644
index 0000000..5414cc4
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+
+/** @deprecated since 0.7.0, use {@link HttpToolResponse}.
+ * the old {@link HttpPollValue} concrete class has been renamed {@link HttpToolResponse}
+ * because it has nothing specific to polls. this is now just a transitional interface. */
+@Deprecated
+public interface HttpPollValue {
+
+ public int getResponseCode();
+ public String getReasonPhrase();
+ public long getStartTime();
+ public long getLatencyFullContent();
+ public long getLatencyFirstResponse();
+ public Map<String, List<String>> getHeaderLists();
+ public byte[] getContent();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java
new file mode 100644
index 0000000..10c3b3e
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.net.URI;
+
+import org.apache.brooklyn.util.core.http.HttpTool;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * @deprecated since 0.7; use {@link HttpTool}
+ */
+@Deprecated
+public class HttpPolls {
+
+ public static HttpToolResponse executeSimpleGet(URI uri) {
+ return HttpTool.httpGet(new DefaultHttpClient(), uri, ImmutableMap.<String,String>of());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java
new file mode 100644
index 0000000..75dab74
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.util.List;
+
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.brooklyn.util.guava.Functionals;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Lists;
+import com.google.gson.JsonElement;
+
+public class HttpValueFunctions {
+
+ private HttpValueFunctions() {} // instead use static utility methods
+
+ public static Function<HttpToolResponse, Integer> responseCode() {
+ return new ResponseCode();
+ }
+
+ /** @deprecated since 0.7.0; only here for deserialization of persisted state */
+ private static Function<HttpToolResponse, Integer> responseCodeLegacy() {
+ return new Function<HttpToolResponse, Integer>() {
+ @Override public Integer apply(HttpToolResponse input) {
+ return input.getResponseCode();
+ }
+ };
+ }
+
+ private static class ResponseCode implements Function<HttpToolResponse, Integer> {
+ @Override public Integer apply(HttpToolResponse input) {
+ return input.getResponseCode();
+ }
+ }
+
+ public static Function<HttpToolResponse, Boolean> responseCodeEquals(final int expected) {
+ return Functionals.chain(HttpValueFunctions.responseCode(), Functions.forPredicate(Predicates.equalTo(expected)));
+ }
+
+ public static Function<HttpToolResponse, Boolean> responseCodeEquals(final int... expected) {
+ List<Integer> expectedList = Lists.newArrayList();
+ for (int e : expected) {
+ expectedList.add((Integer)e);
+ }
+ return Functionals.chain(HttpValueFunctions.responseCode(), Functions.forPredicate(Predicates.in(expectedList)));
+ }
+
+ public static Function<HttpToolResponse, String> stringContentsFunction() {
+ return new StringContents();
+ }
+
+ /** @deprecated since 0.7.0; only here for deserialization of persisted state */
+ private static Function<HttpToolResponse, String> stringContentsFunctionLegacy() {
+ return new Function<HttpToolResponse, String>() {
+ @Override public String apply(HttpToolResponse input) {
+ return input.getContentAsString();
+ }
+ };
+ }
+
+ private static class StringContents implements Function<HttpToolResponse, String> {
+ @Override public String apply(HttpToolResponse input) {
+ return input.getContentAsString();
+ }
+ }
+
+ public static Function<HttpToolResponse, JsonElement> jsonContents() {
+ return Functionals.chain(stringContentsFunction(), JsonFunctions.asJson());
+ }
+
+ public static <T> Function<HttpToolResponse, T> jsonContents(String element, Class<T> expected) {
+ return jsonContents(new String[] {element}, expected);
+ }
+
+ public static <T> Function<HttpToolResponse, T> jsonContents(String[] elements, Class<T> expected) {
+ return Functionals.chain(jsonContents(), JsonFunctions.walk(elements), JsonFunctions.cast(expected));
+ }
+
+ public static <T> Function<HttpToolResponse, T> jsonContentsFromPath(String path){
+ return Functionals.chain(jsonContents(), JsonFunctions.<T>getPath(path));
+ }
+
+ public static Function<HttpToolResponse, Long> latency() {
+ return new Latency();
+ }
+
+ /** @deprecated since 0.7.0; only here for deserialization of persisted state */
+ private static Function<HttpToolResponse, Long> latencyLegacy() {
+ return new Function<HttpToolResponse, Long>() {
+ public Long apply(HttpToolResponse input) {
+ return input.getLatencyFullContent();
+ }
+ };
+ }
+
+ private static class Latency implements Function<HttpToolResponse, Long> {
+ public Long apply(HttpToolResponse input) {
+ return input.getLatencyFullContent();
+ }
+ };
+
+ public static Function<HttpToolResponse, Boolean> containsHeader(String header) {
+ return new ContainsHeader(header);
+ }
+
+ private static class ContainsHeader implements Function<HttpToolResponse, Boolean> {
+ private final String header;
+
+ public ContainsHeader(String header) {
+ this.header = header;
+ }
+ @Override
+ public Boolean apply(HttpToolResponse input) {
+ List<String> actual = input.getHeaderLists().get(header);
+ return actual != null && actual.size() > 0;
+ }
+ }
+
+
+ /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function)} */ @Deprecated
+ public static <A,B,C> Function<A,C> chain(final Function<A,? extends B> f1, final Function<B,C> f2) {
+ return Functionals.chain(f1, f2);
+ }
+
+ /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function, Function)} */ @Deprecated
+ public static <A,B,C,D> Function<A,D> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,D> f3) {
+ return Functionals.chain(f1, f2, f3);
+ }
+
+ /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function, Function, Function)} */ @Deprecated
+ public static <A,B,C,D,E> Function<A,E> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,? extends D> f3, final Function<D,E> f4) {
+ return Functionals.chain(f1, f2, f3, f4);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java
new file mode 100644
index 0000000..a3e04cd
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.guava.MaybeFunctions;
+
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.gson.*;
+import com.jayway.jsonpath.JsonPath;
+
+public class JsonFunctions {
+
+ private JsonFunctions() {} // instead use static utility methods
+
+ public static Function<String, JsonElement> asJson() {
+ return new Function<String, JsonElement>() {
+ @Override public JsonElement apply(String input) {
+ return new JsonParser().parse(input);
+ }
+ };
+ }
+
+ public static <T> Function<JsonElement, List<T>> forEach(final Function<JsonElement, T> func) {
+ return new Function<JsonElement, List<T>>() {
+ @Override public List<T> apply(JsonElement input) {
+ JsonArray array = (JsonArray) input;
+ List<T> result = Lists.newArrayList();
+ for (int i = 0; i < array.size(); i++) {
+ result.add(func.apply(array.get(i)));
+ }
+ return result;
+ }
+ };
+ }
+
+
+ /** as {@link #walkM(Iterable)} taking a single string consisting of a dot separated path */
+ public static Function<JsonElement, JsonElement> walk(String elementOrDotSeparatedElements) {
+ return walk( Splitter.on('.').split(elementOrDotSeparatedElements) );
+ }
+
+ /** as {@link #walkM(Iterable)} taking a series of strings (dot separators not respected here) */
+ public static Function<JsonElement, JsonElement> walk(final String... elements) {
+ return walk(Arrays.asList(elements));
+ }
+
+ /** returns a function which traverses the supplied path of entries in a json object (maps of maps of maps...),
+ * @throws NoSuchElementException if any path is not present as a key in that map */
+ public static Function<JsonElement, JsonElement> walk(final Iterable<String> elements) {
+ // could do this instead, pointing at Maybe for this, and for walkN, but it's slightly less efficient
+// return Functionals.chain(MaybeFunctions.<JsonElement>wrap(), walkM(elements), MaybeFunctions.<JsonElement>get());
+
+ return new Function<JsonElement, JsonElement>() {
+ @Override public JsonElement apply(JsonElement input) {
+ JsonElement curr = input;
+ for (String element : elements) {
+ JsonObject jo = curr.getAsJsonObject();
+ curr = jo.get(element);
+ if (curr==null)
+ throw new NoSuchElementException("No element '"+element+" in JSON, when walking "+elements);
+ }
+ return curr;
+ }
+ };
+ }
+
+
+ /** as {@link #walk(String)} but if any element is not found it simply returns null */
+ public static Function<JsonElement, JsonElement> walkN(@Nullable String elements) {
+ return walkN( Splitter.on('.').split(elements) );
+ }
+
+ /** as {@link #walk(String...))} but if any element is not found it simply returns null */
+ public static Function<JsonElement, JsonElement> walkN(final String... elements) {
+ return walkN(Arrays.asList(elements));
+ }
+
+ /** as {@link #walk(Iterable))} but if any element is not found it simply returns null */
+ public static Function<JsonElement, JsonElement> walkN(final Iterable<String> elements) {
+ return new Function<JsonElement, JsonElement>() {
+ @Override public JsonElement apply(JsonElement input) {
+ JsonElement curr = input;
+ for (String element : elements) {
+ if (curr==null) return null;
+ JsonObject jo = curr.getAsJsonObject();
+ curr = jo.get(element);
+ }
+ return curr;
+ }
+ };
+ }
+
+ /** as {@link #walk(String))} and {@link #walk(Iterable)} */
+ public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(@Nullable String elements) {
+ return walkM( Splitter.on('.').split(elements) );
+ }
+
+ /** as {@link #walk(String...))} and {@link #walk(Iterable)} */
+ public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(final String... elements) {
+ return walkM(Arrays.asList(elements));
+ }
+
+ /** as {@link #walk(Iterable))} but working with objects which {@link Maybe} contain {@link JsonElement},
+ * simply preserving a {@link Maybe#absent()} object if additional walks are requested upon it
+ * (cf jquery) */
+ public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(final Iterable<String> elements) {
+ return new Function<Maybe<JsonElement>, Maybe<JsonElement>>() {
+ @Override public Maybe<JsonElement> apply(Maybe<JsonElement> input) {
+ Maybe<JsonElement> curr = input;
+ for (String element : elements) {
+ if (curr.isAbsent()) return curr;
+ JsonObject jo = curr.get().getAsJsonObject();
+ JsonElement currO = jo.get(element);
+ if (currO==null) return Maybe.absent("No element '"+element+" in JSON, when walking "+elements);
+ curr = Maybe.of(currO);
+ }
+ return curr;
+ }
+ };
+ }
+
+ /**
+ * returns an element from a single json primitive value given a full path {@link com.jayway.jsonpath.JsonPath}
+ */
+ public static <T> Function<JsonElement,T> getPath(final String path) {
+ return new Function<JsonElement, T>() {
+ @SuppressWarnings("unchecked")
+ @Override public T apply(JsonElement input) {
+ String jsonString = input.toString();
+ Object rawElement = JsonPath.read(jsonString, path);
+ return (T) rawElement;
+ }
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> Function<JsonElement, T> cast(final Class<T> expected) {
+ return new Function<JsonElement, T>() {
+ @Override public T apply(JsonElement input) {
+ if (input == null) {
+ return (T) null;
+ } else if (input.isJsonNull()) {
+ return (T) null;
+ } else if (expected == boolean.class || expected == Boolean.class) {
+ return (T) (Boolean) input.getAsBoolean();
+ } else if (expected == char.class || expected == Character.class) {
+ return (T) (Character) input.getAsCharacter();
+ } else if (expected == byte.class || expected == Byte.class) {
+ return (T) (Byte) input.getAsByte();
+ } else if (expected == short.class || expected == Short.class) {
+ return (T) (Short) input.getAsShort();
+ } else if (expected == int.class || expected == Integer.class) {
+ return (T) (Integer) input.getAsInt();
+ } else if (expected == long.class || expected == Long.class) {
+ return (T) (Long) input.getAsLong();
+ } else if (expected == float.class || expected == Float.class) {
+ return (T) (Float) input.getAsFloat();
+ } else if (expected == double.class || expected == Double.class) {
+ return (T) (Double) input.getAsDouble();
+ } else if (expected == BigDecimal.class) {
+ return (T) input.getAsBigDecimal();
+ } else if (expected == BigInteger.class) {
+ return (T) input.getAsBigInteger();
+ } else if (Number.class.isAssignableFrom(expected)) {
+ // TODO Will result in a class-cast if it's an unexpected sub-type of Number not handled above
+ return (T) input.getAsNumber();
+ } else if (expected == String.class) {
+ return (T) input.getAsString();
+ } else if (expected.isArray()) {
+ JsonArray array = input.getAsJsonArray();
+ Class<?> componentType = expected.getComponentType();
+ if (JsonElement.class.isAssignableFrom(componentType)) {
+ JsonElement[] result = new JsonElement[array.size()];
+ for (int i = 0; i < array.size(); i++) {
+ result[i] = array.get(i);
+ }
+ return (T) result;
+ } else {
+ Object[] result = (Object[]) Array.newInstance(componentType, array.size());
+ for (int i = 0; i < array.size(); i++) {
+ result[i] = cast(componentType).apply(array.get(i));
+ }
+ return (T) result;
+ }
+ } else {
+ throw new IllegalArgumentException("Cannot cast json element to type "+expected);
+ }
+ }
+ };
+ }
+
+ public static <T> Function<Maybe<JsonElement>, T> castM(final Class<T> expected) {
+ return Functionals.chain(MaybeFunctions.<JsonElement>get(), cast(expected));
+ }
+
+ public static <T> Function<Maybe<JsonElement>, T> castM(final Class<T> expected, final T defaultValue) {
+ return new Function<Maybe<JsonElement>, T>() {
+ @Override
+ public T apply(Maybe<JsonElement> input) {
+ if (input.isAbsent()) return defaultValue;
+ return cast(expected).apply(input.get());
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
new file mode 100644
index 0000000..caeb21a
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.shell;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.ssh.SshFeed;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.core.task.system.internal.SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by executing shell commands (on the local machine where
+ * this instance of brooklyn is running). Useful e.g. for paas tools such as Cloud Foundry vmc
+ * which operate against a remote target.
+ *
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private ShellFeed feed;
+ *
+ * //@Override
+ * protected void connectSensors() {
+ * super.connectSensors();
+ *
+ * feed = ShellFeed.builder()
+ * .entity(this)
+ * .machine(mySshMachineLachine)
+ * .poll(new ShellPollConfig<Long>(DISK_USAGE)
+ * .command("df -P | grep /dev")
+ * .failOnNonZeroResultCode(true)
+ * .onSuccess(new Function<SshPollValue, Long>() {
+ * public Long apply(SshPollValue input) {
+ * String[] parts = input.getStdout().split("[ \\t]+");
+ * return Long.parseLong(parts[2]);
+ * }}))
+ * .build();
+ * }
+ *
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ * super.disconnectSensors();
+ * if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ *
+ * @see SshFeed (to run on remote machines)
+ * @see FunctionFeed (for arbitrary functions)
+ *
+ * @author aled
+ */
+public class ShellFeed extends AbstractFeed {
+
+ public static final Logger log = LoggerFactory.getLogger(ShellFeed.class);
+
+ @SuppressWarnings("serial")
+ private static final ConfigKey<SetMultimap<ShellPollIdentifier, ShellPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<ShellPollIdentifier, ShellPollConfig<?>>>() {},
+ "polls");
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private EntityLocal entity;
+ private long period = 500;
+ private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
+ private List<ShellPollConfig<?>> polls = Lists.newArrayList();
+ private String uniqueTag;
+ private volatile boolean built;
+
+ public Builder entity(EntityLocal val) {
+ this.entity = val;
+ return this;
+ }
+ public Builder period(long millis) {
+ return period(millis, TimeUnit.MILLISECONDS);
+ }
+ public Builder period(long val, TimeUnit units) {
+ this.period = val;
+ this.periodUnits = units;
+ return this;
+ }
+ public Builder poll(ShellPollConfig<?> config) {
+ polls.add(config);
+ return this;
+ }
+ public Builder uniqueTag(String uniqueTag) {
+ this.uniqueTag = uniqueTag;
+ return this;
+ }
+ public ShellFeed build() {
+ built = true;
+ ShellFeed result = new ShellFeed(this);
+ result.setEntity(checkNotNull(entity, "entity"));
+ result.start();
+ return result;
+ }
+ @Override
+ protected void finalize() {
+ if (!built) log.warn("ShellFeed.Builder created, but build() never called");
+ }
+ }
+
+ private static class ShellPollIdentifier {
+ final String command;
+ final Map<String, String> env;
+ final File dir;
+ final String input;
+ final String context;
+ final long timeout;
+
+ private ShellPollIdentifier(String command, Map<String, String> env, File dir, String input, String context, long timeout) {
+ this.command = checkNotNull(command, "command");
+ this.env = checkNotNull(env, "env");
+ this.dir = dir;
+ this.input = input;
+ this.context = checkNotNull(context, "context");
+ this.timeout = timeout;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(command, env, dir, input, timeout);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof ShellPollIdentifier)) {
+ return false;
+ }
+ ShellPollIdentifier o = (ShellPollIdentifier) other;
+ return Objects.equal(command, o.command) &&
+ Objects.equal(env, o.env) &&
+ Objects.equal(dir, o.dir) &&
+ Objects.equal(input, o.input) &&
+ Objects.equal(timeout, o.timeout);
+ }
+ }
+
+ /**
+ * For rebind; do not call directly; use builder
+ */
+ public ShellFeed() {
+ }
+
+ protected ShellFeed(Builder builder) {
+ super();
+
+ SetMultimap<ShellPollIdentifier, ShellPollConfig<?>> polls = HashMultimap.<ShellPollIdentifier,ShellPollConfig<?>>create();
+ for (ShellPollConfig<?> config : builder.polls) {
+ if (!config.isEnabled()) continue;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ ShellPollConfig<?> configCopy = new ShellPollConfig(config);
+ if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
+ String command = config.getCommand();
+ Map<String, String> env = config.getEnv();
+ File dir = config.getDir();
+ String input = config.getInput();
+ String context = config.getSensor().getName();
+ long timeout = config.getTimeout();
+
+ polls.put(new ShellPollIdentifier(command, env, dir, input, context, timeout), configCopy);
+ }
+ setConfig(POLLS, polls);
+ initUniqueTag(builder.uniqueTag, polls.values());
+ }
+
+ @Override
+ protected void preStart() {
+ SetMultimap<ShellPollIdentifier, ShellPollConfig<?>> polls = getConfig(POLLS);
+
+ for (final ShellPollIdentifier pollInfo : polls.keySet()) {
+ Set<ShellPollConfig<?>> configs = polls.get(pollInfo);
+ long minPeriod = Integer.MAX_VALUE;
+ Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
+
+ for (ShellPollConfig<?> config : configs) {
+ handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
+ if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+ }
+
+ final ProcessTaskFactory<?> taskFactory = newTaskFactory(pollInfo.command, pollInfo.env, pollInfo.dir,
+ pollInfo.input, pollInfo.context, pollInfo.timeout);
+ final ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext();
+
+ getPoller().scheduleAtFixedRate(
+ new Callable<SshPollValue>() {
+ @Override public SshPollValue call() throws Exception {
+ ProcessTaskWrapper<?> taskWrapper = taskFactory.newTask();
+ executionContext.submit(taskWrapper);
+ taskWrapper.block();
+ Optional<Integer> exitCode = Optional.fromNullable(taskWrapper.getExitCode());
+ return new SshPollValue(null, exitCode.or(-1), taskWrapper.getStdout(), taskWrapper.getStderr());
+ }},
+ new DelegatingPollHandler<SshPollValue>(handlers),
+ minPeriod);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Poller<SshPollValue> getPoller() {
+ return (Poller<SshPollValue>) super.getPoller();
+ }
+
+ /**
+ * Executes the given command (using `bash -l -c $command`, so as to have a good path set).
+ *
+ * @param command The command to execute
+ * @param env Environment variable settings, in format name=value
+ * @param dir Working directory, or null to inherit from current process
+ * @param input Input to send to the command (if not null)
+ */
+ protected ProcessTaskFactory<?> newTaskFactory(final String command, Map<String,String> env, File dir, String input, final String summary, final long timeout) {
+ // FIXME Add generic timeout() support to task ExecutionManager
+ if (timeout > 0) {
+ log.warn("Timeout ({}ms) not currently supported for ShellFeed {}", timeout, this);
+ }
+
+ return new ConcreteSystemProcessTaskFactory<Object>(command)
+ .environmentVariables(env)
+ .loginShell(true)
+ .directory(dir)
+ .runAsCommand()
+ .summary(summary);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java
new file mode 100644
index 0000000..e1147c3
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.shell;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.util.collections.MutableList;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+
+public class ShellPollConfig<T> extends PollConfig<SshPollValue, T, ShellPollConfig<T>> {
+
+ private String command;
+ private Map<String,String> env = Maps.newLinkedHashMap();
+ private long timeout = -1;
+ private File dir;
+ private String input;
+
+ public static final Predicate<SshPollValue> DEFAULT_SUCCESS = new Predicate<SshPollValue>() {
+ @Override
+ public boolean apply(@Nullable SshPollValue input) {
+ return input != null && input.getExitStatus() == 0;
+ }};
+
+ public ShellPollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ super.checkSuccess(DEFAULT_SUCCESS);
+ }
+
+ public ShellPollConfig(ShellPollConfig<T> other) {
+ super(other);
+ command = other.command;
+ env = other.env;
+ timeout = other.timeout;
+ dir = other.dir;
+ input = other.input;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public Map<String, String> getEnv() {
+ return env;
+ }
+
+ public File getDir() {
+ return dir;
+ }
+
+ public String getInput() {
+ return input;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public ShellPollConfig<T> command(String val) {
+ this.command = val;
+ return this;
+ }
+
+ public ShellPollConfig<T> env(String key, String val) {
+ env.put(checkNotNull(key, "key"), checkNotNull(val, "val"));
+ return this;
+ }
+
+ public ShellPollConfig<T> env(Map<String,String> val) {
+ for (Map.Entry<String, String> entry : checkNotNull(val, "map").entrySet()) {
+ env(entry.getKey(), entry.getValue());
+ }
+ return this;
+ }
+
+ public ShellPollConfig<T> dir(File val) {
+ this.dir = val;
+ return this;
+ }
+
+ public ShellPollConfig<T> input(String val) {
+ this.input = val;
+ return this;
+ }
+
+ public ShellPollConfig<T> timeout(long timeout) {
+ return timeout(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ public ShellPollConfig<T> timeout(long timeout, TimeUnit units) {
+ this.timeout = units.toMillis(timeout);
+ return this;
+ }
+
+ @Override protected String toStringBaseName() { return "shell"; }
+ @Override protected String toStringPollSource() { return command; }
+ @Override protected MutableList<Object> equalsFields() { return super.equalsFields().appendIfNotNull(command); }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
new file mode 100644
index 0000000..8663137
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.core.location.Locations;
+import org.apache.brooklyn.core.location.Machines;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.internal.ssh.SshTool;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by polling over ssh.
+ *
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private SshFeed feed;
+ *
+ * //@Override
+ * protected void connectSensors() {
+ * super.connectSensors();
+ *
+ * feed = SshFeed.builder()
+ * .entity(this)
+ * .machine(mySshMachineLachine)
+ * .poll(new SshPollConfig<Boolean>(SERVICE_UP)
+ * .command("rabbitmqctl -q status")
+ * .onSuccess(new Function<SshPollValue, Boolean>() {
+ * public Boolean apply(SshPollValue input) {
+ * return (input.getExitStatus() == 0);
+ * }}))
+ * .build();
+ * }
+ *
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ * super.disconnectSensors();
+ * if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ *
+ * @author aled
+ */
+public class SshFeed extends AbstractFeed {
+
+ public static final Logger log = LoggerFactory.getLogger(SshFeed.class);
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<Supplier<SshMachineLocation>> MACHINE = ConfigKeys.newConfigKey(
+ new TypeToken<Supplier<SshMachineLocation>>() {},
+ "machine");
+
+ public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand");
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<SetMultimap<SshPollIdentifier, SshPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<SshPollIdentifier, SshPollConfig<?>>>() {},
+ "polls");
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private EntityLocal entity;
+ private boolean onlyIfServiceUp = false;
+ private Supplier<SshMachineLocation> machine;
+ private Duration period = Duration.of(500, TimeUnit.MILLISECONDS);
+ private List<SshPollConfig<?>> polls = Lists.newArrayList();
+ private boolean execAsCommand = false;
+ private String uniqueTag;
+ private volatile boolean built;
+
+ public Builder entity(EntityLocal val) {
+ this.entity = val;
+ return this;
+ }
+ public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
+ public Builder onlyIfServiceUp(boolean onlyIfServiceUp) {
+ this.onlyIfServiceUp = onlyIfServiceUp;
+ return this;
+ }
+ /** optional, to force a machine; otherwise it is inferred from the entity */
+ public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); }
+ /** optional, to force a machine; otherwise it is inferred from the entity */
+ public Builder machine(Supplier<SshMachineLocation> val) {
+ this.machine = val;
+ return this;
+ }
+ public Builder period(Duration period) {
+ this.period = period;
+ return this;
+ }
+ public Builder period(long millis) {
+ return period(Duration.of(millis, TimeUnit.MILLISECONDS));
+ }
+ public Builder period(long val, TimeUnit units) {
+ return period(Duration.of(val, units));
+ }
+ public Builder poll(SshPollConfig<?> config) {
+ polls.add(config);
+ return this;
+ }
+ public Builder execAsCommand() {
+ execAsCommand = true;
+ return this;
+ }
+ public Builder execAsScript() {
+ execAsCommand = false;
+ return this;
+ }
+ public Builder uniqueTag(String uniqueTag) {
+ this.uniqueTag = uniqueTag;
+ return this;
+ }
+ public SshFeed build() {
+ built = true;
+ SshFeed result = new SshFeed(this);
+ result.setEntity(checkNotNull(entity, "entity"));
+ result.start();
+ return result;
+ }
+ @Override
+ protected void finalize() {
+ if (!built) log.warn("SshFeed.Builder created, but build() never called");
+ }
+ }
+
+ private static class SshPollIdentifier {
+ final Supplier<String> command;
+ final Supplier<Map<String, String>> env;
+
+ private SshPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) {
+ this.command = checkNotNull(command, "command");
+ this.env = checkNotNull(env, "env");
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(command, env);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof SshPollIdentifier)) {
+ return false;
+ }
+ SshPollIdentifier o = (SshPollIdentifier) other;
+ return Objects.equal(command, o.command) &&
+ Objects.equal(env, o.env);
+ }
+ }
+
+ /** @deprecated since 0.7.0, use static convenience on {@link Locations} */
+ @Deprecated
+ public static SshMachineLocation getMachineOfEntity(Entity entity) {
+ return Machines.findUniqueSshMachineLocation(entity.getLocations()).orNull();
+ }
+
+ /**
+ * For rebind; do not call directly; use builder
+ */
+ public SshFeed() {
+ }
+
+ protected SshFeed(final Builder builder) {
+ setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
+ setConfig(MACHINE, builder.machine != null ? builder.machine : null);
+ setConfig(EXEC_AS_COMMAND, builder.execAsCommand);
+
+ SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create();
+ for (SshPollConfig<?> config : builder.polls) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SshPollConfig<?> configCopy = new SshPollConfig(config);
+ if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
+ polls.put(new SshPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy);
+ }
+ setConfig(POLLS, polls);
+ initUniqueTag(builder.uniqueTag, polls.values());
+ }
+
+ protected SshMachineLocation getMachine() {
+ Supplier<SshMachineLocation> supplier = getConfig(MACHINE);
+ if (supplier != null) {
+ return supplier.get();
+ } else {
+ return Locations.findUniqueSshMachineLocation(entity.getLocations()).get();
+ }
+ }
+
+ @Override
+ protected void preStart() {
+ SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = getConfig(POLLS);
+
+ for (final SshPollIdentifier pollInfo : polls.keySet()) {
+ Set<SshPollConfig<?>> configs = polls.get(pollInfo);
+ long minPeriod = Integer.MAX_VALUE;
+ Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
+
+ for (SshPollConfig<?> config : configs) {
+ handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
+ if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+ }
+
+ getPoller().scheduleAtFixedRate(
+ new Callable<SshPollValue>() {
+ public SshPollValue call() throws Exception {
+ return exec(pollInfo.command.get(), pollInfo.env.get());
+ }},
+ new DelegatingPollHandler<SshPollValue>(handlers),
+ minPeriod);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Poller<SshPollValue> getPoller() {
+ return (Poller<SshPollValue>) super.getPoller();
+ }
+
+ private SshPollValue exec(String command, Map<String,String> env) throws IOException {
+ SshMachineLocation machine = getMachine();
+ Boolean execAsCommand = getConfig(EXEC_AS_COMMAND);
+ if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env});
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+
+ int exitStatus;
+ ConfigBag flags = ConfigBag.newInstance()
+ .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true)
+ .configure(SshTool.PROP_OUT_STREAM, stdout)
+ .configure(SshTool.PROP_ERR_STREAM, stderr);
+ if (Boolean.TRUE.equals(execAsCommand)) {
+ exitStatus = machine.execCommands(flags.getAllConfig(),
+ "ssh-feed", ImmutableList.of(command), env);
+ } else {
+ exitStatus = machine.execScript(flags.getAllConfig(),
+ "ssh-feed", ImmutableList.of(command), env);
+ }
+
+ return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
new file mode 100644
index 0000000..8fec87f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<T>> {
+
+ private Supplier<String> commandSupplier;
+ private List<Supplier<Map<String,String>>> dynamicEnvironmentSupplier = MutableList.of();
+
+ public static final Predicate<SshPollValue> DEFAULT_SUCCESS = new Predicate<SshPollValue>() {
+ @Override
+ public boolean apply(@Nullable SshPollValue input) {
+ return input != null && input.getExitStatus() == 0;
+ }};
+
+ public SshPollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ super.checkSuccess(DEFAULT_SUCCESS);
+ }
+
+ public SshPollConfig(SshPollConfig<T> other) {
+ super(other);
+ commandSupplier = other.commandSupplier;
+ }
+
+ /** @deprecated since 0.7.0; use {@link #getCommandSupplier()} and resolve just-in-time */
+ public String getCommand() {
+ return getCommandSupplier().get();
+ }
+ public Supplier<String> getCommandSupplier() {
+ return commandSupplier;
+ }
+
+ /** @deprecated since 0.7.0; use {@link #getEnvSupplier()} and resolve just-in-time */
+ public Map<String, String> getEnv() {
+ return getEnvSupplier().get();
+ }
+ public Supplier<Map<String,String>> getEnvSupplier() {
+ return new Supplier<Map<String,String>>() {
+ @Override
+ public Map<String, String> get() {
+ Map<String,String> result = MutableMap.of();
+ for (Supplier<Map<String, String>> envS: dynamicEnvironmentSupplier) {
+ if (envS!=null) {
+ Map<String, String> envM = envS.get();
+ if (envM!=null) {
+ mergeEnvMaps(envM, result);
+ }
+ }
+ }
+ return result;
+ }
+ };
+ }
+
+ protected void mergeEnvMaps(Map<String,String> supplied, Map<String,String> target) {
+ if (supplied==null) return;
+ // as the value is a string there is no need to look at deep merge behaviour
+ target.putAll(supplied);
+ }
+
+ public SshPollConfig<T> command(String val) { return command(Suppliers.ofInstance(val)); }
+ public SshPollConfig<T> command(Supplier<String> val) {
+ this.commandSupplier = val;
+ return this;
+ }
+
+ /** add the given env param; sequence is as per {@link #env(Supplier)} */
+ public SshPollConfig<T> env(String key, String val) {
+ return env(Collections.singletonMap(key, val));
+ }
+
+ /** add the given env params; sequence is as per {@link #env(Supplier)}.
+ * behaviour is undefined if the map supplied here is subsequently changed.
+ * <p>
+ * if a map's contents might change, use {@link #env(Supplier)} */
+ public SshPollConfig<T> env(Map<String,String> val) {
+ if (val==null) return this;
+ return env(Suppliers.ofInstance(val));
+ }
+
+ /**
+ * adds the given dynamic supplier of environment variables.
+ * <p>
+ * use of a supplier allows env vars to be computed on each execution,
+ * for example to take the most recent sensor values.
+ * <p>
+ * in the case of multiple map suppliers, static maps, or static {@link #env(String, String)}
+ * key value pairs, the order in which they are specified here is the order
+ * in which they are computed and applied.
+ **/
+ public SshPollConfig<T> env(Supplier<Map<String,String>> val) {
+ Preconditions.checkNotNull(val);
+ dynamicEnvironmentSupplier.add(val);
+ return this;
+ }
+
+ @Override protected String toStringBaseName() { return "ssh"; }
+ @Override protected Object toStringPollSource() {
+ if (getCommandSupplier()==null) return null;
+ String command = getCommandSupplier().get();
+ return command;
+ }
+ @Override protected MutableList<Object> equalsFields() {
+ return super.equalsFields()
+ .appendIfNotNull(getCommandSupplier()!=null ? getCommandSupplier().get() : null)
+ .appendIfNotNull(getEnvSupplier()!=null ? getEnvSupplier().get() : null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java
new file mode 100644
index 0000000..af0a8a6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+
+public class SshPollValue {
+
+ private final SshMachineLocation machine;
+ private final int exitStatus;
+ private final String stdout;
+ private final String stderr;
+
+ public SshPollValue(SshMachineLocation machine, int exitStatus, String stdout, String stderr) {
+ this.machine = machine;
+ this.exitStatus = exitStatus;
+ this.stdout = stdout;
+ this.stderr = stderr;
+ }
+
+ /** The machine the command will run on. */
+ public SshMachineLocation getMachine() {
+ return machine;
+ }
+
+ /** Command exit status, or -1 if error is set. */
+ public int getExitStatus() {
+ return exitStatus;
+ }
+
+ /** Command standard output; may be null if no content available. */
+ @Nullable
+ public String getStdout() {
+ return stdout;
+ }
+
+ /** Command standard error; may be null if no content available. */
+ @Nullable
+ public String getStderr() {
+ return stderr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java
new file mode 100644
index 0000000..370c3ce
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicates;
+
+public class SshValueFunctions {
+
+ public static Function<SshPollValue, Integer> exitStatus() {
+ return new Function<SshPollValue, Integer>() {
+ @Override public Integer apply(SshPollValue input) {
+ return input.getExitStatus();
+ }
+ };
+ }
+
+ public static Function<SshPollValue, String> stdout() {
+ return new Function<SshPollValue, String>() {
+ @Override public String apply(SshPollValue input) {
+ return input.getStdout();
+ }
+ };
+ }
+
+ public static Function<SshPollValue, String> stderr() {
+ return new Function<SshPollValue, String>() {
+ @Override public String apply(SshPollValue input) {
+ return input.getStderr();
+ }
+ };
+ }
+
+ public static Function<SshPollValue, Boolean> exitStatusEquals(final int expected) {
+ return chain(SshValueFunctions.exitStatus(), Functions.forPredicate(Predicates.equalTo(expected)));
+ }
+
+ // TODO Do we want these chain methods? Does guava have them already? Duplicated in HttpValueFunctions.
+ public static <A,B,C> Function<A,C> chain(final Function<A,? extends B> f1, final Function<B,C> f2) {
+ return new Function<A,C>() {
+ @Override public C apply(@Nullable A input) {
+ return f2.apply(f1.apply(input));
+ }
+ };
+ }
+
+ public static <A,B,C,D> Function<A,D> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,D> f3) {
+ return new Function<A,D>() {
+ @Override public D apply(@Nullable A input) {
+ return f3.apply(f2.apply(f1.apply(input)));
+ }
+ };
+ }
+}