You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ge...@apache.org on 2017/03/03 12:01:34 UTC

[5/6] brooklyn-server git commit: Extract AbstractCommandFeed from SshFeed

Extract AbstractCommandFeed from SshFeed

- Added windows.CmdFeed


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/c564cacc
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/c564cacc
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/c564cacc

Branch: refs/heads/master
Commit: c564caccf8185ce6f71cd72db5425db3176030a2
Parents: 73c4558
Author: Valentin Aitken <bo...@gmail.com>
Authored: Thu Feb 16 18:03:50 2017 +0200
Committer: Valentin Aitken <bo...@gmail.com>
Committed: Fri Mar 3 10:03:39 2017 +0200

----------------------------------------------------------------------
 .../core/sensor/ssh/SshCommandSensor.java       |   6 +-
 .../brooklyn/feed/AbstractCommandFeed.java      | 150 +++++------
 .../apache/brooklyn/feed/CommandPollConfig.java |  51 ++--
 .../org/apache/brooklyn/feed/ssh/SshFeed.java   | 134 ++++++++++
 .../apache/brooklyn/feed/ssh/SshPollConfig.java |  41 +++
 .../deserializingClassRenames.properties        |   3 +
 .../feed/ssh/SshFeedIntegrationTest.java        |  20 +-
 .../apache/brooklyn/feed/windows/CmdFeed.java   |  81 ++++++
 .../feed/windows/WinRmFeedIntegrationTest.java  | 262 +++++++++++++++++++
 9 files changed, 610 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
index eca50f7..fc93d74 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
@@ -22,6 +22,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.feed.ssh.SshFeed;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +42,7 @@ import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
-import org.apache.brooklyn.feed.ssh.SshFeed;
+import org.apache.brooklyn.feed.AbstractCommandFeed;
 import org.apache.brooklyn.feed.ssh.SshPollConfig;
 import org.apache.brooklyn.feed.ssh.SshValueFunctions;
 import org.apache.brooklyn.util.collections.MutableMap;
@@ -119,7 +121,7 @@ public final class SshCommandSensor<T> extends AddSensor<T> {
             }
         };
 
-        SshPollConfig<T> pollConfig = new SshPollConfig<T>(sensor)
+        CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor)
                 .period(period)
                 .env(envSupplier)
                 .command(commandSupplier)

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
index f21d8da..95fe7ac 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
@@ -16,11 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.brooklyn.feed.ssh;
+package org.apache.brooklyn.feed;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.location.MachineLocation;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.feed.AbstractFeed;
@@ -37,30 +37,26 @@ import org.apache.brooklyn.core.feed.AttributePollHandler;
 import org.apache.brooklyn.core.feed.DelegatingPollHandler;
 import org.apache.brooklyn.core.feed.Poller;
 import org.apache.brooklyn.core.location.Locations;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.core.internal.ssh.SshTool;
 import org.apache.brooklyn.util.time.Duration;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import com.google.common.reflect.TypeToken;
 
 /**
- * Provides a feed of attribute values, by polling over ssh.
+ * Provides a feed of attribute values, by polling over Command Line Shell Interface.
  * 
  * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
  * <pre>
  * {@code
- * private SshFeed feed;
+ * private AbstractCommandFeed feed;
  * 
  * //@Override
  * protected void connectSensors() {
@@ -69,7 +65,7 @@ import com.google.common.reflect.TypeToken;
  *   feed = SshFeed.builder()
  *       .entity(this)
  *       .machine(mySshMachineLachine)
- *       .poll(new SshPollConfig<Boolean>(SERVICE_UP)
+ *       .poll(new CommandPollConfig<Boolean>(SERVICE_UP)
  *           .command("rabbitmqctl -q status")
  *           .onSuccess(new Function<SshPollValue, Boolean>() {
  *               public Boolean apply(SshPollValue input) {
@@ -86,98 +82,97 @@ import com.google.common.reflect.TypeToken;
  * }
  * </pre>
  * 
- * @author aled
  */
-public class SshFeed extends AbstractFeed {
+public abstract class AbstractCommandFeed extends AbstractFeed {
 
-    public static final Logger log = LoggerFactory.getLogger(SshFeed.class);
+    public static final Logger log = LoggerFactory.getLogger(AbstractCommandFeed.class);
     
     @SuppressWarnings("serial")
-    public static final ConfigKey<Supplier<SshMachineLocation>> MACHINE = ConfigKeys.newConfigKey(
-            new TypeToken<Supplier<SshMachineLocation>>() {},
+    public static final ConfigKey<Supplier<MachineLocation>> MACHINE = ConfigKeys.newConfigKey(
+            new TypeToken<Supplier<MachineLocation>>() {},
             "machine");
-    
+
     public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand");
     
     @SuppressWarnings("serial")
-    public static final ConfigKey<SetMultimap<SshPollIdentifier, SshPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
-            new TypeToken<SetMultimap<SshPollIdentifier, SshPollConfig<?>>>() {},
+    public static final ConfigKey<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+            new TypeToken<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>>() {},
             "polls");
     
-    public static Builder builder() {
-        return new Builder();
-    }
-    
-    public static class Builder {
+    public static abstract class Builder<T extends AbstractCommandFeed, B extends Builder<T, B>> {
         private Entity entity;
         private boolean onlyIfServiceUp = false;
-        private Supplier<SshMachineLocation> machine;
+        private Supplier<MachineLocation> machine;
         private Duration period = Duration.of(500, TimeUnit.MILLISECONDS);
-        private List<SshPollConfig<?>> polls = Lists.newArrayList();
         private boolean execAsCommand = false;
         private String uniqueTag;
         private volatile boolean built;
         
-        public Builder entity(Entity val) {
+        public B entity(Entity val) {
             this.entity = val;
-            return this;
+            return self();
         }
-        public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
-        public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { 
+        public B onlyIfServiceUp() { return onlyIfServiceUp(true); }
+        public B onlyIfServiceUp(boolean onlyIfServiceUp) {
             this.onlyIfServiceUp = onlyIfServiceUp; 
-            return this; 
+            return self();
         }
         /** optional, to force a machine; otherwise it is inferred from the entity */
-        public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); }
+        public B machine(MachineLocation val) { return machine(Suppliers.ofInstance(val)); }
         /** optional, to force a machine; otherwise it is inferred from the entity */
-        public Builder machine(Supplier<SshMachineLocation> val) {
+        public B machine(Supplier<MachineLocation> val) {
             this.machine = val;
-            return this;
+            return self();
         }
-        public Builder period(Duration period) {
+        public B period(Duration period) {
             this.period = period;
-            return this;
+            return self();
         }
-        public Builder period(long millis) {
+        public B period(long millis) {
             return period(Duration.of(millis, TimeUnit.MILLISECONDS));
         }
-        public Builder period(long val, TimeUnit units) {
+        public B period(long val, TimeUnit units) {
             return period(Duration.of(val, units));
         }
-        public Builder poll(SshPollConfig<?> config) {
-            polls.add(config);
-            return this;
-        }
-        public Builder execAsCommand() {
+        public abstract B poll(CommandPollConfig<?> config);
+        public abstract List<CommandPollConfig<?>> getPolls();
+
+        public B execAsCommand() {
             execAsCommand = true;
-            return this;
+            return self();
         }
-        public Builder execAsScript() {
+        public B execAsScript() {
             execAsCommand = false;
-            return this;
+            return self();
         }
-        public Builder uniqueTag(String uniqueTag) {
+        public B uniqueTag(String uniqueTag) {
             this.uniqueTag = uniqueTag;
-            return this;
+            return self();
         }
-        public SshFeed build() {
+        
+        protected abstract B self();
+        
+        protected abstract T instantiateFeed();
+
+        public T build() {
             built = true;
-            SshFeed result = new SshFeed(this);
+            T result = instantiateFeed();
             result.setEntity(checkNotNull((EntityLocal)entity, "entity"));
             result.start();
             return result;
         }
+
         @Override
         protected void finalize() {
             if (!built) log.warn("SshFeed.Builder created, but build() never called");
         }
     }
     
-    private static class SshPollIdentifier {
+    private static class CommandPollIdentifier {
         final Supplier<String> command;
         final Supplier<Map<String, String>> env;
 
-        private SshPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) {
+        private CommandPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) {
             this.command = checkNotNull(command, "command");
             this.env = checkNotNull(env, "env");
         }
@@ -189,10 +184,10 @@ public class SshFeed extends AbstractFeed {
         
         @Override
         public boolean equals(Object other) {
-            if (!(other instanceof SshPollIdentifier)) {
+            if (!(other instanceof CommandPollIdentifier)) {
                 return false;
             }
-            SshPollIdentifier o = (SshPollIdentifier) other;
+            CommandPollIdentifier o = (CommandPollIdentifier) other;
             return Objects.equal(command, o.command) &&
                     Objects.equal(env, o.env);
         }
@@ -201,44 +196,44 @@ public class SshFeed extends AbstractFeed {
     /**
      * For rebind; do not call directly; use builder
      */
-    public SshFeed() {
+    public AbstractCommandFeed() {
     }
 
-    protected SshFeed(final Builder builder) {
+    protected AbstractCommandFeed(final Builder builder) {
         config().set(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
         config().set(MACHINE, builder.machine);
         config().set(EXEC_AS_COMMAND, builder.execAsCommand);
         
-        SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create();
-        for (SshPollConfig<?> config : builder.polls) {
+        SetMultimap<CommandPollIdentifier, CommandPollConfig<?>> polls = HashMultimap.<CommandPollIdentifier,CommandPollConfig<?>>create();
+        for (CommandPollConfig<?> config : (List<CommandPollConfig<?>>)builder.getPolls()) {
             @SuppressWarnings({ "unchecked", "rawtypes" })
-            SshPollConfig<?> configCopy = new SshPollConfig(config);
+            CommandPollConfig<?> configCopy = new CommandPollConfig(config);
             if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
-            polls.put(new SshPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy);
+            polls.put(new CommandPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy);
         }
         config().set(POLLS, polls);
         initUniqueTag(builder.uniqueTag, polls.values());
     }
 
-    protected SshMachineLocation getMachine() {
-        Supplier<SshMachineLocation> supplier = config().get(MACHINE);
+    protected MachineLocation getMachine() {
+        Supplier<MachineLocation> supplier = config().get(MACHINE);
         if (supplier != null) {
             return supplier.get();
         } else {
-            return Locations.findUniqueSshMachineLocation(entity.getLocations()).get();
+            return Locations.findUniqueMachineLocation(entity.getLocations()).get();
         }
     }
     
     @Override
     protected void preStart() {
-        SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = config().get(POLLS);
+        SetMultimap<CommandPollIdentifier, CommandPollConfig<?>> polls = config().get(POLLS);
         
-        for (final SshPollIdentifier pollInfo : polls.keySet()) {
-            Set<SshPollConfig<?>> configs = polls.get(pollInfo);
+        for (final CommandPollIdentifier pollInfo : polls.keySet()) {
+            Set<CommandPollConfig<?>> configs = polls.get(pollInfo);
             long minPeriod = Integer.MAX_VALUE;
             Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
 
-            for (SshPollConfig<?> config : configs) {
+            for (CommandPollConfig<?> config : configs) {
                 handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
                 if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
             }
@@ -260,26 +255,5 @@ public class SshFeed extends AbstractFeed {
         return (Poller<SshPollValue>) super.getPoller();
     }
     
-    private SshPollValue exec(String command, Map<String,String> env) throws IOException {
-        SshMachineLocation machine = getMachine();
-        Boolean execAsCommand = config().get(EXEC_AS_COMMAND);
-        if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env});
-        ByteArrayOutputStream stdout = new ByteArrayOutputStream();
-        ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
-        int exitStatus;
-        ConfigBag flags = ConfigBag.newInstanceExtending(config().getBag())
-            .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true)
-            .configure(SshTool.PROP_OUT_STREAM, stdout)
-            .configure(SshTool.PROP_ERR_STREAM, stderr);
-        if (Boolean.TRUE.equals(execAsCommand)) {
-            exitStatus = machine.execCommands(flags.getAllConfig(),
-                    "ssh-feed", ImmutableList.of(command), env);
-        } else {
-            exitStatus = machine.execScript(flags.getAllConfig(),
-                    "ssh-feed", ImmutableList.of(command), env);
-        }
-
-        return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray()));
-    }
+    protected abstract SshPollValue exec(String command, Map<String,String> env) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java
index 1df98e9..f8c5b81 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.brooklyn.feed.ssh;
+package org.apache.brooklyn.feed;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -34,10 +34,11 @@ import com.google.common.base.Suppliers;
 
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 
-public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<T>> {
+public class CommandPollConfig<T> extends PollConfig<SshPollValue, T, CommandPollConfig<T>> {
 
     private Supplier<String> commandSupplier;
     private List<Supplier<Map<String,String>>> dynamicEnvironmentSupplier = MutableList.of();
@@ -48,20 +49,20 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
             return input != null && input.getExitStatus() == 0;
         }};
 
-    public static <T> SshPollConfig<T> forSensor(AttributeSensor<T> sensor) {
-        return new SshPollConfig<T>(sensor);
+    public static <T> CommandPollConfig<T> forSensor(AttributeSensor<T> sensor) {
+        return new CommandPollConfig<T>(sensor);
     }
 
-    public static SshPollConfig<Void> forMultiple() {
-        return new SshPollConfig<Void>(PollConfig.NO_SENSOR);
+    public static CommandPollConfig<Void> forMultiple() {
+        return new CommandPollConfig<Void>(PollConfig.NO_SENSOR);
     }
 
-    public SshPollConfig(AttributeSensor<T> sensor) {
+    public CommandPollConfig(AttributeSensor<T> sensor) {
         super(sensor);
         super.checkSuccess(DEFAULT_SUCCESS);
     }
 
-    public SshPollConfig(SshPollConfig<T> other) {
+    public CommandPollConfig(CommandPollConfig<T> other) {
         super(other);
         commandSupplier = other.commandSupplier;
     }
@@ -83,29 +84,7 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
     
     @SuppressWarnings("unused")
     public Supplier<Map<String,String>> getEnvSupplier() {
-        if (true) return new CombiningEnvSupplier(dynamicEnvironmentSupplier);
-        
-        // TODO Kept in case it's persisted; new code will not use this.
-        return new Supplier<Map<String,String>>() {
-            @Override
-            public Map<String, String> get() {
-                Map<String,String> result = MutableMap.of();
-                for (Supplier<Map<String, String>> envS: dynamicEnvironmentSupplier) {
-                    if (envS!=null) {
-                        Map<String, String> envM = envS.get();
-                        if (envM!=null) {
-                            mergeEnvMaps(envM, result);
-                        }
-                    }
-                }
-                return result;
-            }
-            private void mergeEnvMaps(Map<String,String> supplied, Map<String,String> target) {
-                if (supplied==null) return;
-                // as the value is a string there is no need to look at deep merge behaviour
-                target.putAll(supplied);
-            }
-        };
+        return new CombiningEnvSupplier(dynamicEnvironmentSupplier);
     }
     
     private static class CombiningEnvSupplier implements Supplier<Map<String,String>> {
@@ -149,14 +128,14 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
         }
     }
 
-    public SshPollConfig<T> command(String val) { return command(Suppliers.ofInstance(val)); }
-    public SshPollConfig<T> command(Supplier<String> val) {
+    public CommandPollConfig<T> command(String val) { return command(Suppliers.ofInstance(val)); }
+    public CommandPollConfig<T> command(Supplier<String> val) {
         this.commandSupplier = val;
         return this;
     }
 
     /** add the given env param; sequence is as per {@link #env(Supplier)} */
-    public SshPollConfig<T> env(String key, String val) {
+    public CommandPollConfig<T> env(String key, String val) {
         return env(Collections.singletonMap(key, val));
     }
     
@@ -164,7 +143,7 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
      * behaviour is undefined if the map supplied here is subsequently changed.
      * <p>
      * if a map's contents might change, use {@link #env(Supplier)} */
-    public SshPollConfig<T> env(Map<String,String> val) {
+    public CommandPollConfig<T> env(Map<String,String> val) {
         if (val==null) return this;
         return env(Suppliers.ofInstance(val));
     }
@@ -179,7 +158,7 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
      * key value pairs, the order in which they are specified here is the order
      * in which they are computed and applied. 
      **/
-    public SshPollConfig<T> env(Supplier<Map<String,String>> val) {
+    public CommandPollConfig<T> env(Supplier<Map<String,String>> val) {
         Preconditions.checkNotNull(val);
         dynamicEnvironmentSupplier.add(val);
         return this;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
new file mode 100644
index 0000000..7c365c4
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.internal.ssh.SshTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides a feed of attribute values, by polling over ssh.
+ * 
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private SshFeed feed;
+ * 
+ * //@Override
+ * protected void connectSensors() {
+ *   super.connectSensors();
+ *   
+ *   feed = SshFeed.builder()
+ *       .entity(this)
+ *       .machine(mySshMachineLachine)
+ *       .poll(new SshPollConfig<Boolean>(SERVICE_UP)
+ *           .command("rabbitmqctl -q status")
+ *           .onSuccess(new Function<SshPollValue, Boolean>() {
+ *               public Boolean apply(SshPollValue input) {
+ *                 return (input.getExitStatus() == 0);
+ *               }}))
+ *       .build();
+ * }
+ * 
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ *   super.disconnectSensors();
+ *   if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ * 
+ * @author aled
+ */
+public class SshFeed extends org.apache.brooklyn.feed.AbstractCommandFeed {
+    public static final Logger log = LoggerFactory.getLogger(SshFeed.class);
+
+    public static class Builder extends org.apache.brooklyn.feed.AbstractCommandFeed.Builder<SshFeed, Builder> {
+        private List<CommandPollConfig<?>> polls = Lists.newArrayList();
+
+        @Override
+        public Builder poll(CommandPollConfig<?> config) {
+            polls.add(config);
+            return self();
+        }
+
+        @Override
+        public List<CommandPollConfig<?>> getPolls() {
+            return polls;
+        }
+
+        @Override
+        protected Builder self() {
+           return this;
+        }
+   
+        @Override
+        protected SshFeed instantiateFeed() {
+           return new SshFeed(this);
+        }
+     }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    /**
+     * For rebind; do not call directly; use builder
+     */
+    public SshFeed() {
+    }
+
+    public SshFeed(final Builder builder) {
+        super(builder);
+    }
+
+    @Override
+    protected SshPollValue exec(String command, Map<String,String> env) throws IOException {
+        SshMachineLocation machine = (SshMachineLocation)getMachine();
+        Boolean execAsCommand = config().get(EXEC_AS_COMMAND);
+        if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env});
+        ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+        ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+
+        int exitStatus;
+        ConfigBag flags = ConfigBag.newInstanceExtending(config().getBag())
+            .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true)
+            .configure(SshTool.PROP_OUT_STREAM, stdout)
+            .configure(SshTool.PROP_ERR_STREAM, stderr);
+        if (Boolean.TRUE.equals(execAsCommand)) {
+            exitStatus = machine.execCommands(flags.getAllConfig(),
+                    "ssh-feed", ImmutableList.of(command), env);
+        } else {
+            exitStatus = machine.execScript(flags.getAllConfig(),
+                    "ssh-feed", ImmutableList.of(command), env);
+        }
+
+        return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
new file mode 100644
index 0000000..859d30f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.feed.CommandPollConfig;
+
+/**
+ * @deprecated since 0.11.0. Please use {@link CommandPollConfig}.
+ */
+@Deprecated
+public class SshPollConfig<T> extends CommandPollConfig<T> {
+    public SshPollConfig(AttributeSensor<T> sensor) {
+        super(sensor);
+    }
+
+    public SshPollConfig(CommandPollConfig<T> other) {
+        super(other);
+    }
+
+    @Override
+    public SshPollConfig<T> self() {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties b/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties
index 07b18d6..0e664cf 100644
--- a/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties
+++ b/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties
@@ -1451,3 +1451,6 @@ brooklyn.networking.vclouddirector.FakeSSLSocketFactory
 brooklyn.networking.vclouddirector.NatPredicates                                 : brooklyn.networking.vclouddirector.nat.NatPredicates
 brooklyn.networking.vclouddirector.NatService                                    : brooklyn.networking.vclouddirector.nat.NatService
 brooklyn.networking.vclouddirector.PortForwardingConfig                          : brooklyn.networking.vclouddirector.nat.PortForwardingConfig
+
+org.apache.brooklyn.feed.ssh.SshFeed$SshPollIdentifier : org.apache.brooklyn.feed.AbstractCommandFeed$CommandPollIdentifier
+org.apache.brooklyn.feed.ssh.SshPollConfig$2 : org.apache.brooklyn.feed.CommandPollConfig$CombiningEnvSupplier
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java
index 416ab9d..781ab95 100644
--- a/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java
@@ -32,11 +32,7 @@ import org.apache.brooklyn.core.entity.EntityInternal.FeedSupport;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
-import org.apache.brooklyn.feed.ssh.SshFeed;
-import org.apache.brooklyn.feed.ssh.SshFeedIntegrationTest;
-import org.apache.brooklyn.feed.ssh.SshPollConfig;
-import org.apache.brooklyn.feed.ssh.SshPollValue;
-import org.apache.brooklyn.feed.ssh.SshValueFunctions;
+import org.apache.brooklyn.feed.CommandPollConfig;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.stream.Streams;
@@ -96,7 +92,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
         
         feed = SshFeed.builder()
                 .entity(entity2)
-                .poll(new SshPollConfig<String>(SENSOR_STRING)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
                         .command("echo hello")
                         .onSuccess(SshValueFunctions.stdout()))
                 .build();
@@ -126,7 +122,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
         feed = SshFeed.builder()
                 .entity(entity)
                 .machine(machine)
-                .poll(new SshPollConfig<Integer>(SENSOR_INT)
+                .poll(new CommandPollConfig<Integer>(SENSOR_INT)
                         .command("exit 123")
                         .checkSuccess(Predicates.alwaysTrue())
                         .onSuccess(SshValueFunctions.exitStatus()))
@@ -140,7 +136,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
         feed = SshFeed.builder()
                 .entity(entity)
                 .machine(machine)
-                .poll(new SshPollConfig<String>(SENSOR_STRING)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
                         .command("echo hello")
                         .onSuccess(SshValueFunctions.stdout()))
                 .build();
@@ -156,7 +152,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
         feed = SshFeed.builder()
                 .entity(entity)
                 .machine(machine)
-                .poll(new SshPollConfig<String>(SENSOR_STRING)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
                         .command(cmd)
                         .onFailure(SshValueFunctions.stderr()))
                 .build();
@@ -169,7 +165,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
         feed = SshFeed.builder()
                 .entity(entity)
                 .machine(machine)
-                .poll(new SshPollConfig<String>(SENSOR_STRING)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
                         .command("exit 123")
                         .onFailure(new Function<SshPollValue, String>() {
                             @Override
@@ -191,7 +187,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
                     SshFeed.builder()
                         .entity(entity)
                         .onlyIfServiceUp()
-                        .poll(new SshPollConfig<String>(SENSOR_STRING)
+                        .poll(new CommandPollConfig<String>(SENSOR_STRING)
                             .command("echo hello")
                             .onSuccess(SshValueFunctions.stdout()))
                         .build();
@@ -226,7 +222,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
         
         feed = SshFeed.builder()
                 .entity(entity2)
-                .poll(new SshPollConfig<String>(SENSOR_STRING)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
                         .env(envSupplier)
                         .command(cmdSupplier)
                         .onSuccess(SshValueFunctions.stdout()))

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
----------------------------------------------------------------------
diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
new file mode 100644
index 0000000..94d451a
--- /dev/null
+++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.windows;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.brooklyn.feed.AbstractCommandFeed;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.location.winrm.WinRmMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.internal.winrm.WinRmToolResponse;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class CmdFeed extends AbstractCommandFeed {
+    public static class Builder extends org.apache.brooklyn.feed.AbstractCommandFeed.Builder<CmdFeed, CmdFeed.Builder> {
+        private List<CommandPollConfig<?>> polls = Lists.newArrayList();
+
+        @Override
+        public CmdFeed.Builder poll(CommandPollConfig<?> config) {
+            polls.add(config);
+            return self();
+        }
+
+        @Override
+        public List<CommandPollConfig<?>> getPolls() {
+            return polls;
+        }
+
+        @Override
+        protected CmdFeed.Builder self() {
+            return this;
+        }
+
+        @Override
+        protected CmdFeed instantiateFeed() {
+            return new CmdFeed(this);
+        }
+    }
+
+    public static CmdFeed.Builder builder() {
+        return new CmdFeed.Builder();
+    }
+
+    protected CmdFeed(final Builder builder) {
+        super(builder);
+    }
+    @Override
+    protected SshPollValue exec(String command, Map<String,String> env) throws IOException {
+        WinRmMachineLocation machine = (WinRmMachineLocation)getMachine();
+        if (log.isTraceEnabled()) log.trace("WinRm polling for {}, executing {} with env {}", new Object[] {machine, command, env});
+
+        WinRmToolResponse winRmToolResponse;
+        int exitStatus;
+        ConfigBag flags = ConfigBag.newInstanceExtending(config().getBag());
+        winRmToolResponse = machine.executeCommand(flags.getAllConfig(),
+                ImmutableList.of(command));
+        exitStatus = winRmToolResponse.getStatusCode();
+
+        return new SshPollValue(null, exitStatus, winRmToolResponse.getStdOut(), winRmToolResponse.getStdErr());
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/software/winrm/src/test/java/org/apache/brooklyn/feed/windows/WinRmFeedIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/winrm/src/test/java/org/apache/brooklyn/feed/windows/WinRmFeedIntegrationTest.java b/software/winrm/src/test/java/org/apache/brooklyn/feed/windows/WinRmFeedIntegrationTest.java
new file mode 100644
index 0000000..afaedb4
--- /dev/null
+++ b/software/winrm/src/test/java/org/apache/brooklyn/feed/windows/WinRmFeedIntegrationTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.windows;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.brooklyn.api.entity.EntityInitializer;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.MachineProvisioningLocation;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.EntityInternal.FeedSupport;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.feed.ssh.SshValueFunctions;
+import org.apache.brooklyn.location.winrm.WinRmMachineLocation;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.StringFunctions;
+import org.apache.brooklyn.util.text.StringPredicates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test is almost identical to {@link org.apache.brooklyn.feed.ssh.SshFeedIntegrationTest}.
+ * To launch the test I put in ~/.brooklyn/brooklyn.properties
+ *   brooklyn.location.named.WindowsLiveTest=byon:(hosts=192.168.1.2,osFamily=windows,user=winUser,password=p0ssw0rd)
+ */
+public class WinRmFeedIntegrationTest extends BrooklynAppLiveTestSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(WinRmFeedIntegrationTest.class);
+
+    private static final String LOCATION_SPEC = "named:WindowsLiveTest";
+    
+    final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString", "");
+    final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor("aLong", "");
+
+    private WinRmMachineLocation machine;
+    private TestEntity entity;
+    private CmdFeed feed;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        MachineProvisioningLocation<?> provisioningLocation = (MachineProvisioningLocation<?>)
+                mgmt.getLocationRegistry().getLocationManaged(LOCATION_SPEC);
+        machine = (WinRmMachineLocation)provisioningLocation.obtain(ImmutableMap.of());
+        entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        app.start(ImmutableList.of(machine));
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        if (feed != null) feed.stop();
+        super.tearDown();
+    }
+    
+    /** this is one of the most common pattern */
+    @Test(groups="Integration")
+    public void testReturnsStdoutAndInfersMachine() throws Exception {
+        final TestEntity entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+            // inject the machine location, because the app was started with a provisioning location
+            // and TestEntity doesn't provision
+            .location(machine));
+        
+        feed = CmdFeed.builder()
+                .entity(entity2)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
+                        .command("echo hello")
+                        .onSuccess(SshValueFunctions.stdout()))
+                .build();
+        
+        EntityAsserts.assertAttributeEventuallyNonNull(entity2, SENSOR_STRING);
+        String val = entity2.getAttribute(SENSOR_STRING);
+        Assert.assertTrue(val.contains("hello"), "val="+val);
+        Assert.assertEquals(val.trim(), "hello");
+    }
+
+    @Test(groups="Integration")
+    public void testFeedDeDupe() throws Exception {
+        testReturnsStdoutAndInfersMachine();
+        entity.addFeed(feed);
+        log.info("Feed 0 is: "+feed);
+        
+        testReturnsStdoutAndInfersMachine();
+        log.info("Feed 1 is: "+feed);
+        entity.addFeed(feed);
+                
+        FeedSupport feeds = ((EntityInternal)entity).feeds();
+        Assert.assertEquals(feeds.getFeeds().size(), 1, "Wrong feed count: "+feeds.getFeeds());
+    }
+    
+    @Test(groups="Integration")
+    public void testReturnsSshExitStatus() throws Exception {
+        feed = CmdFeed.builder()
+                .entity(entity)
+                .machine(machine)
+                .poll(new CommandPollConfig<Integer>(SENSOR_INT)
+                        .command("exit 123")
+                        .checkSuccess(Predicates.alwaysTrue())
+                        .onSuccess(SshValueFunctions.exitStatus()))
+                .build();
+
+        EntityAsserts.assertAttributeEqualsEventually(entity, SENSOR_INT, 123);
+    }
+    
+    @Test(groups="Integration")
+    public void testReturnsStdout() throws Exception {
+        feed = CmdFeed.builder()
+                .entity(entity)
+                .machine(machine)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
+                        .command("echo hello")
+                        .onSuccess(SshValueFunctions.stdout()))
+                .build();
+        
+        EntityAsserts.assertAttributeEventually(entity, SENSOR_STRING, 
+            Predicates.compose(Predicates.equalTo("hello"), StringFunctions.trim()));
+    }
+
+    @Test(groups="Integration")
+    public void testReturnsStderr() throws Exception {
+        final String cmd = "thiscommanddoesnotexist";
+        
+        feed = CmdFeed.builder()
+                .entity(entity)
+                .machine(machine)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
+                        .command(cmd)
+                        .onFailure(SshValueFunctions.stderr()))
+                .build();
+        
+        EntityAsserts.assertAttributeEventually(entity, SENSOR_STRING, StringPredicates.containsLiteral(cmd));
+    }
+    
+    @Test(groups="Integration")
+    public void testFailsOnNonZero() throws Exception {
+        feed = CmdFeed.builder()
+                .entity(entity)
+                .machine(machine)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
+                        .command("exit 123")
+                        .onFailure(new Function<SshPollValue, String>() {
+                            @Override
+                            public String apply(SshPollValue input) {
+                                return "Exit status " + input.getExitStatus();
+                            }}))
+                .build();
+        
+        EntityAsserts.assertAttributeEventually(entity, SENSOR_STRING, StringPredicates.containsLiteral("Exit status 123"));
+    }
+    
+    @Test(groups="Integration")
+    public void testAddedEarly() throws Exception {
+        final TestEntity entity2 = app.addChild(EntitySpec.create(TestEntity.class)
+            .location(machine)
+            .addInitializer(new EntityInitializer() {
+                @Override
+                public void apply(EntityLocal entity) {
+                    CmdFeed.builder()
+                        .entity(entity)
+                        .onlyIfServiceUp()
+                        .poll(new CommandPollConfig<String>(SENSOR_STRING)
+                            .command("echo hello")
+                            .onSuccess(SshValueFunctions.stdout()))
+                        .build();
+                }
+            }));
+
+        // TODO would be nice to hook in and assert no errors
+        EntityAsserts.assertAttributeEqualsContinually(entity2, SENSOR_STRING, null);
+
+        entity2.sensors().set(Attributes.SERVICE_UP, true);
+        EntityAsserts.assertAttributeEventually(entity2, SENSOR_STRING, StringPredicates.containsLiteral("hello"));
+    }
+
+    
+    @Test(groups="Integration")
+    public void testDynamicEnvAndCommandSupplier() throws Exception {
+        final TestEntity entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class).location(machine));
+        
+        final AtomicInteger count = new AtomicInteger();
+        Supplier<Map<String, String>> envSupplier = new Supplier<Map<String,String>>() {
+            @Override
+            public Map<String, String> get() {
+                return MutableMap.of("COUNT", ""+count.incrementAndGet());
+            }
+        };
+        Supplier<String> cmdSupplier = new Supplier<String>() {
+            @Override
+            public String get() {
+                return "echo count-"+count.incrementAndGet()+"-%COUNT%";
+            }
+        };
+        
+        feed = CmdFeed.builder()
+                .entity(entity2)
+                .poll(new CommandPollConfig<String>(SENSOR_STRING)
+                        .env(envSupplier)
+                        .command(cmdSupplier)
+                        .onSuccess(SshValueFunctions.stdout()))
+                .build();
+        
+        EntityAsserts.assertAttributeEventuallyNonNull(entity2, SENSOR_STRING);
+        final String val1 = assertDifferentOneInOutput(entity2);
+        
+        EntityAsserts.assertAttributeEventually(entity2, SENSOR_STRING, Predicates.not(Predicates.equalTo(val1)));        
+        final String val2 = assertDifferentOneInOutput(entity2);
+        log.info("vals from dynamic sensors are: "+val1.trim()+" and "+val2.trim());
+    }
+
+    private String assertDifferentOneInOutput(final TestEntity entity2) {
+        String val = entity2.getAttribute(SENSOR_STRING);
+        Assert.assertTrue(val.startsWith("count"), "val="+val);
+        try {
+            String[] fields = val.trim().split("-");
+            int field1 = Integer.parseInt(fields[1]); 
+            int field2 = Integer.parseInt(fields[2]);
+            Assert.assertEquals(Math.abs(field2-field1), 1, "expected difference of 1");
+        } catch (Throwable t) {
+            Exceptions.propagateIfFatal(t);
+            Assert.fail("Wrong output from sensor, got '"+val.trim()+"', giving error: "+t);
+        }
+        return val;
+    }
+
+}