You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ge...@apache.org on 2017/03/03 12:01:34 UTC
[5/6] brooklyn-server git commit: Extract AbstractCommandFeed from
SshFeed
Extract AbstractCommandFeed from SshFeed
- Added windows.CmdFeed
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/c564cacc
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/c564cacc
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/c564cacc
Branch: refs/heads/master
Commit: c564caccf8185ce6f71cd72db5425db3176030a2
Parents: 73c4558
Author: Valentin Aitken <bo...@gmail.com>
Authored: Thu Feb 16 18:03:50 2017 +0200
Committer: Valentin Aitken <bo...@gmail.com>
Committed: Fri Mar 3 10:03:39 2017 +0200
----------------------------------------------------------------------
.../core/sensor/ssh/SshCommandSensor.java | 6 +-
.../brooklyn/feed/AbstractCommandFeed.java | 150 +++++------
.../apache/brooklyn/feed/CommandPollConfig.java | 51 ++--
.../org/apache/brooklyn/feed/ssh/SshFeed.java | 134 ++++++++++
.../apache/brooklyn/feed/ssh/SshPollConfig.java | 41 +++
.../deserializingClassRenames.properties | 3 +
.../feed/ssh/SshFeedIntegrationTest.java | 20 +-
.../apache/brooklyn/feed/windows/CmdFeed.java | 81 ++++++
.../feed/windows/WinRmFeedIntegrationTest.java | 262 +++++++++++++++++++
9 files changed, 610 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
index eca50f7..fc93d74 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
@@ -22,6 +22,8 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.feed.ssh.SshFeed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +42,7 @@ import org.apache.brooklyn.core.effector.AddSensor;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
-import org.apache.brooklyn.feed.ssh.SshFeed;
+import org.apache.brooklyn.feed.AbstractCommandFeed;
import org.apache.brooklyn.feed.ssh.SshPollConfig;
import org.apache.brooklyn.feed.ssh.SshValueFunctions;
import org.apache.brooklyn.util.collections.MutableMap;
@@ -119,7 +121,7 @@ public final class SshCommandSensor<T> extends AddSensor<T> {
}
};
- SshPollConfig<T> pollConfig = new SshPollConfig<T>(sensor)
+ CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor)
.period(period)
.env(envSupplier)
.command(commandSupplier)
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
index f21d8da..95fe7ac 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
@@ -16,11 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.brooklyn.feed.ssh;
+package org.apache.brooklyn.feed;
import static com.google.common.base.Preconditions.checkNotNull;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.location.MachineLocation;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.feed.AbstractFeed;
@@ -37,30 +37,26 @@ import org.apache.brooklyn.core.feed.AttributePollHandler;
import org.apache.brooklyn.core.feed.DelegatingPollHandler;
import org.apache.brooklyn.core.feed.Poller;
import org.apache.brooklyn.core.location.Locations;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.core.internal.ssh.SshTool;
import org.apache.brooklyn.util.time.Duration;
import com.google.common.base.Objects;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
/**
- * Provides a feed of attribute values, by polling over ssh.
+ * Provides a feed of attribute values, by polling over Command Line Shell Interface.
*
* Example usage (e.g. in an entity that extends SoftwareProcessImpl):
* <pre>
* {@code
- * private SshFeed feed;
+ * private AbstractCommandFeed feed;
*
* //@Override
* protected void connectSensors() {
@@ -69,7 +65,7 @@ import com.google.common.reflect.TypeToken;
* feed = SshFeed.builder()
* .entity(this)
* .machine(mySshMachineLachine)
- * .poll(new SshPollConfig<Boolean>(SERVICE_UP)
+ * .poll(new CommandPollConfig<Boolean>(SERVICE_UP)
* .command("rabbitmqctl -q status")
* .onSuccess(new Function<SshPollValue, Boolean>() {
* public Boolean apply(SshPollValue input) {
@@ -86,98 +82,97 @@ import com.google.common.reflect.TypeToken;
* }
* </pre>
*
- * @author aled
*/
-public class SshFeed extends AbstractFeed {
+public abstract class AbstractCommandFeed extends AbstractFeed {
- public static final Logger log = LoggerFactory.getLogger(SshFeed.class);
+ public static final Logger log = LoggerFactory.getLogger(AbstractCommandFeed.class);
@SuppressWarnings("serial")
- public static final ConfigKey<Supplier<SshMachineLocation>> MACHINE = ConfigKeys.newConfigKey(
- new TypeToken<Supplier<SshMachineLocation>>() {},
+ public static final ConfigKey<Supplier<MachineLocation>> MACHINE = ConfigKeys.newConfigKey(
+ new TypeToken<Supplier<MachineLocation>>() {},
"machine");
-
+
public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand");
@SuppressWarnings("serial")
- public static final ConfigKey<SetMultimap<SshPollIdentifier, SshPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
- new TypeToken<SetMultimap<SshPollIdentifier, SshPollConfig<?>>>() {},
+ public static final ConfigKey<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>>() {},
"polls");
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
+ public static abstract class Builder<T extends AbstractCommandFeed, B extends Builder<T, B>> {
private Entity entity;
private boolean onlyIfServiceUp = false;
- private Supplier<SshMachineLocation> machine;
+ private Supplier<MachineLocation> machine;
private Duration period = Duration.of(500, TimeUnit.MILLISECONDS);
- private List<SshPollConfig<?>> polls = Lists.newArrayList();
private boolean execAsCommand = false;
private String uniqueTag;
private volatile boolean built;
- public Builder entity(Entity val) {
+ public B entity(Entity val) {
this.entity = val;
- return this;
+ return self();
}
- public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
- public Builder onlyIfServiceUp(boolean onlyIfServiceUp) {
+ public B onlyIfServiceUp() { return onlyIfServiceUp(true); }
+ public B onlyIfServiceUp(boolean onlyIfServiceUp) {
this.onlyIfServiceUp = onlyIfServiceUp;
- return this;
+ return self();
}
/** optional, to force a machine; otherwise it is inferred from the entity */
- public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); }
+ public B machine(MachineLocation val) { return machine(Suppliers.ofInstance(val)); }
/** optional, to force a machine; otherwise it is inferred from the entity */
- public Builder machine(Supplier<SshMachineLocation> val) {
+ public B machine(Supplier<MachineLocation> val) {
this.machine = val;
- return this;
+ return self();
}
- public Builder period(Duration period) {
+ public B period(Duration period) {
this.period = period;
- return this;
+ return self();
}
- public Builder period(long millis) {
+ public B period(long millis) {
return period(Duration.of(millis, TimeUnit.MILLISECONDS));
}
- public Builder period(long val, TimeUnit units) {
+ public B period(long val, TimeUnit units) {
return period(Duration.of(val, units));
}
- public Builder poll(SshPollConfig<?> config) {
- polls.add(config);
- return this;
- }
- public Builder execAsCommand() {
+ public abstract B poll(CommandPollConfig<?> config);
+ public abstract List<CommandPollConfig<?>> getPolls();
+
+ public B execAsCommand() {
execAsCommand = true;
- return this;
+ return self();
}
- public Builder execAsScript() {
+ public B execAsScript() {
execAsCommand = false;
- return this;
+ return self();
}
- public Builder uniqueTag(String uniqueTag) {
+ public B uniqueTag(String uniqueTag) {
this.uniqueTag = uniqueTag;
- return this;
+ return self();
}
- public SshFeed build() {
+
+ protected abstract B self();
+
+ protected abstract T instantiateFeed();
+
+ public T build() {
built = true;
- SshFeed result = new SshFeed(this);
+ T result = instantiateFeed();
result.setEntity(checkNotNull((EntityLocal)entity, "entity"));
result.start();
return result;
}
+
@Override
protected void finalize() {
if (!built) log.warn("SshFeed.Builder created, but build() never called");
}
}
- private static class SshPollIdentifier {
+ private static class CommandPollIdentifier {
final Supplier<String> command;
final Supplier<Map<String, String>> env;
- private SshPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) {
+ private CommandPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) {
this.command = checkNotNull(command, "command");
this.env = checkNotNull(env, "env");
}
@@ -189,10 +184,10 @@ public class SshFeed extends AbstractFeed {
@Override
public boolean equals(Object other) {
- if (!(other instanceof SshPollIdentifier)) {
+ if (!(other instanceof CommandPollIdentifier)) {
return false;
}
- SshPollIdentifier o = (SshPollIdentifier) other;
+ CommandPollIdentifier o = (CommandPollIdentifier) other;
return Objects.equal(command, o.command) &&
Objects.equal(env, o.env);
}
@@ -201,44 +196,44 @@ public class SshFeed extends AbstractFeed {
/**
* For rebind; do not call directly; use builder
*/
- public SshFeed() {
+ public AbstractCommandFeed() {
}
- protected SshFeed(final Builder builder) {
+ protected AbstractCommandFeed(final Builder builder) {
config().set(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
config().set(MACHINE, builder.machine);
config().set(EXEC_AS_COMMAND, builder.execAsCommand);
- SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create();
- for (SshPollConfig<?> config : builder.polls) {
+ SetMultimap<CommandPollIdentifier, CommandPollConfig<?>> polls = HashMultimap.<CommandPollIdentifier,CommandPollConfig<?>>create();
+ for (CommandPollConfig<?> config : (List<CommandPollConfig<?>>)builder.getPolls()) {
@SuppressWarnings({ "unchecked", "rawtypes" })
- SshPollConfig<?> configCopy = new SshPollConfig(config);
+ CommandPollConfig<?> configCopy = new CommandPollConfig(config);
if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
- polls.put(new SshPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy);
+ polls.put(new CommandPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy);
}
config().set(POLLS, polls);
initUniqueTag(builder.uniqueTag, polls.values());
}
- protected SshMachineLocation getMachine() {
- Supplier<SshMachineLocation> supplier = config().get(MACHINE);
+ protected MachineLocation getMachine() {
+ Supplier<MachineLocation> supplier = config().get(MACHINE);
if (supplier != null) {
return supplier.get();
} else {
- return Locations.findUniqueSshMachineLocation(entity.getLocations()).get();
+ return Locations.findUniqueMachineLocation(entity.getLocations()).get();
}
}
@Override
protected void preStart() {
- SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = config().get(POLLS);
+ SetMultimap<CommandPollIdentifier, CommandPollConfig<?>> polls = config().get(POLLS);
- for (final SshPollIdentifier pollInfo : polls.keySet()) {
- Set<SshPollConfig<?>> configs = polls.get(pollInfo);
+ for (final CommandPollIdentifier pollInfo : polls.keySet()) {
+ Set<CommandPollConfig<?>> configs = polls.get(pollInfo);
long minPeriod = Integer.MAX_VALUE;
Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
- for (SshPollConfig<?> config : configs) {
+ for (CommandPollConfig<?> config : configs) {
handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
}
@@ -260,26 +255,5 @@ public class SshFeed extends AbstractFeed {
return (Poller<SshPollValue>) super.getPoller();
}
- private SshPollValue exec(String command, Map<String,String> env) throws IOException {
- SshMachineLocation machine = getMachine();
- Boolean execAsCommand = config().get(EXEC_AS_COMMAND);
- if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env});
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
- int exitStatus;
- ConfigBag flags = ConfigBag.newInstanceExtending(config().getBag())
- .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true)
- .configure(SshTool.PROP_OUT_STREAM, stdout)
- .configure(SshTool.PROP_ERR_STREAM, stderr);
- if (Boolean.TRUE.equals(execAsCommand)) {
- exitStatus = machine.execCommands(flags.getAllConfig(),
- "ssh-feed", ImmutableList.of(command), env);
- } else {
- exitStatus = machine.execScript(flags.getAllConfig(),
- "ssh-feed", ImmutableList.of(command), env);
- }
-
- return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray()));
- }
+ protected abstract SshPollValue exec(String command, Map<String,String> env) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java
index 1df98e9..f8c5b81 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/CommandPollConfig.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.brooklyn.feed.ssh;
+package org.apache.brooklyn.feed;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -34,10 +34,11 @@ import com.google.common.base.Suppliers;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
-public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<T>> {
+public class CommandPollConfig<T> extends PollConfig<SshPollValue, T, CommandPollConfig<T>> {
private Supplier<String> commandSupplier;
private List<Supplier<Map<String,String>>> dynamicEnvironmentSupplier = MutableList.of();
@@ -48,20 +49,20 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
return input != null && input.getExitStatus() == 0;
}};
- public static <T> SshPollConfig<T> forSensor(AttributeSensor<T> sensor) {
- return new SshPollConfig<T>(sensor);
+ public static <T> CommandPollConfig<T> forSensor(AttributeSensor<T> sensor) {
+ return new CommandPollConfig<T>(sensor);
}
- public static SshPollConfig<Void> forMultiple() {
- return new SshPollConfig<Void>(PollConfig.NO_SENSOR);
+ public static CommandPollConfig<Void> forMultiple() {
+ return new CommandPollConfig<Void>(PollConfig.NO_SENSOR);
}
- public SshPollConfig(AttributeSensor<T> sensor) {
+ public CommandPollConfig(AttributeSensor<T> sensor) {
super(sensor);
super.checkSuccess(DEFAULT_SUCCESS);
}
- public SshPollConfig(SshPollConfig<T> other) {
+ public CommandPollConfig(CommandPollConfig<T> other) {
super(other);
commandSupplier = other.commandSupplier;
}
@@ -83,29 +84,7 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
@SuppressWarnings("unused")
public Supplier<Map<String,String>> getEnvSupplier() {
- if (true) return new CombiningEnvSupplier(dynamicEnvironmentSupplier);
-
- // TODO Kept in case it's persisted; new code will not use this.
- return new Supplier<Map<String,String>>() {
- @Override
- public Map<String, String> get() {
- Map<String,String> result = MutableMap.of();
- for (Supplier<Map<String, String>> envS: dynamicEnvironmentSupplier) {
- if (envS!=null) {
- Map<String, String> envM = envS.get();
- if (envM!=null) {
- mergeEnvMaps(envM, result);
- }
- }
- }
- return result;
- }
- private void mergeEnvMaps(Map<String,String> supplied, Map<String,String> target) {
- if (supplied==null) return;
- // as the value is a string there is no need to look at deep merge behaviour
- target.putAll(supplied);
- }
- };
+ return new CombiningEnvSupplier(dynamicEnvironmentSupplier);
}
private static class CombiningEnvSupplier implements Supplier<Map<String,String>> {
@@ -149,14 +128,14 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
}
}
- public SshPollConfig<T> command(String val) { return command(Suppliers.ofInstance(val)); }
- public SshPollConfig<T> command(Supplier<String> val) {
+ public CommandPollConfig<T> command(String val) { return command(Suppliers.ofInstance(val)); }
+ public CommandPollConfig<T> command(Supplier<String> val) {
this.commandSupplier = val;
return this;
}
/** add the given env param; sequence is as per {@link #env(Supplier)} */
- public SshPollConfig<T> env(String key, String val) {
+ public CommandPollConfig<T> env(String key, String val) {
return env(Collections.singletonMap(key, val));
}
@@ -164,7 +143,7 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
* behaviour is undefined if the map supplied here is subsequently changed.
* <p>
* if a map's contents might change, use {@link #env(Supplier)} */
- public SshPollConfig<T> env(Map<String,String> val) {
+ public CommandPollConfig<T> env(Map<String,String> val) {
if (val==null) return this;
return env(Suppliers.ofInstance(val));
}
@@ -179,7 +158,7 @@ public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<
* key value pairs, the order in which they are specified here is the order
* in which they are computed and applied.
**/
- public SshPollConfig<T> env(Supplier<Map<String,String>> val) {
+ public CommandPollConfig<T> env(Supplier<Map<String,String>> val) {
Preconditions.checkNotNull(val);
dynamicEnvironmentSupplier.add(val);
return this;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
new file mode 100644
index 0000000..7c365c4
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.internal.ssh.SshTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides a feed of attribute values, by polling over ssh.
+ *
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private SshFeed feed;
+ *
+ * //@Override
+ * protected void connectSensors() {
+ * super.connectSensors();
+ *
+ * feed = SshFeed.builder()
+ * .entity(this)
+ * .machine(mySshMachineLachine)
+ * .poll(new SshPollConfig<Boolean>(SERVICE_UP)
+ * .command("rabbitmqctl -q status")
+ * .onSuccess(new Function<SshPollValue, Boolean>() {
+ * public Boolean apply(SshPollValue input) {
+ * return (input.getExitStatus() == 0);
+ * }}))
+ * .build();
+ * }
+ *
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ * super.disconnectSensors();
+ * if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ *
+ * @author aled
+ */
+public class SshFeed extends org.apache.brooklyn.feed.AbstractCommandFeed {
+ public static final Logger log = LoggerFactory.getLogger(SshFeed.class);
+
+ public static class Builder extends org.apache.brooklyn.feed.AbstractCommandFeed.Builder<SshFeed, Builder> {
+ private List<CommandPollConfig<?>> polls = Lists.newArrayList();
+
+ @Override
+ public Builder poll(CommandPollConfig<?> config) {
+ polls.add(config);
+ return self();
+ }
+
+ @Override
+ public List<CommandPollConfig<?>> getPolls() {
+ return polls;
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ protected SshFeed instantiateFeed() {
+ return new SshFeed(this);
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * For rebind; do not call directly; use builder
+ */
+ public SshFeed() {
+ }
+
+ public SshFeed(final Builder builder) {
+ super(builder);
+ }
+
+ @Override
+ protected SshPollValue exec(String command, Map<String,String> env) throws IOException {
+ SshMachineLocation machine = (SshMachineLocation)getMachine();
+ Boolean execAsCommand = config().get(EXEC_AS_COMMAND);
+ if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env});
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+
+ int exitStatus;
+ ConfigBag flags = ConfigBag.newInstanceExtending(config().getBag())
+ .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true)
+ .configure(SshTool.PROP_OUT_STREAM, stdout)
+ .configure(SshTool.PROP_ERR_STREAM, stderr);
+ if (Boolean.TRUE.equals(execAsCommand)) {
+ exitStatus = machine.execCommands(flags.getAllConfig(),
+ "ssh-feed", ImmutableList.of(command), env);
+ } else {
+ exitStatus = machine.execScript(flags.getAllConfig(),
+ "ssh-feed", ImmutableList.of(command), env);
+ }
+
+ return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
new file mode 100644
index 0000000..859d30f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.feed.CommandPollConfig;
+
+/**
+ * @deprecated since 0.11.0. Please use {@link CommandPollConfig}.
+ */
+@Deprecated
+public class SshPollConfig<T> extends CommandPollConfig<T> {
+ public SshPollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ }
+
+ public SshPollConfig(CommandPollConfig<T> other) {
+ super(other);
+ }
+
+ @Override
+ public SshPollConfig<T> self() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties b/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties
index 07b18d6..0e664cf 100644
--- a/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties
+++ b/core/src/main/resources/org/apache/brooklyn/core/mgmt/persist/deserializingClassRenames.properties
@@ -1451,3 +1451,6 @@ brooklyn.networking.vclouddirector.FakeSSLSocketFactory
brooklyn.networking.vclouddirector.NatPredicates : brooklyn.networking.vclouddirector.nat.NatPredicates
brooklyn.networking.vclouddirector.NatService : brooklyn.networking.vclouddirector.nat.NatService
brooklyn.networking.vclouddirector.PortForwardingConfig : brooklyn.networking.vclouddirector.nat.PortForwardingConfig
+
+org.apache.brooklyn.feed.ssh.SshFeed$SshPollIdentifier : org.apache.brooklyn.feed.AbstractCommandFeed$CommandPollIdentifier
+org.apache.brooklyn.feed.ssh.SshPollConfig$2 : org.apache.brooklyn.feed.CommandPollConfig$CombiningEnvSupplier
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java
index 416ab9d..781ab95 100644
--- a/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/feed/ssh/SshFeedIntegrationTest.java
@@ -32,11 +32,7 @@ import org.apache.brooklyn.core.entity.EntityInternal.FeedSupport;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
-import org.apache.brooklyn.feed.ssh.SshFeed;
-import org.apache.brooklyn.feed.ssh.SshFeedIntegrationTest;
-import org.apache.brooklyn.feed.ssh.SshPollConfig;
-import org.apache.brooklyn.feed.ssh.SshPollValue;
-import org.apache.brooklyn.feed.ssh.SshValueFunctions;
+import org.apache.brooklyn.feed.CommandPollConfig;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.stream.Streams;
@@ -96,7 +92,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
feed = SshFeed.builder()
.entity(entity2)
- .poll(new SshPollConfig<String>(SENSOR_STRING)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
.command("echo hello")
.onSuccess(SshValueFunctions.stdout()))
.build();
@@ -126,7 +122,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
feed = SshFeed.builder()
.entity(entity)
.machine(machine)
- .poll(new SshPollConfig<Integer>(SENSOR_INT)
+ .poll(new CommandPollConfig<Integer>(SENSOR_INT)
.command("exit 123")
.checkSuccess(Predicates.alwaysTrue())
.onSuccess(SshValueFunctions.exitStatus()))
@@ -140,7 +136,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
feed = SshFeed.builder()
.entity(entity)
.machine(machine)
- .poll(new SshPollConfig<String>(SENSOR_STRING)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
.command("echo hello")
.onSuccess(SshValueFunctions.stdout()))
.build();
@@ -156,7 +152,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
feed = SshFeed.builder()
.entity(entity)
.machine(machine)
- .poll(new SshPollConfig<String>(SENSOR_STRING)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
.command(cmd)
.onFailure(SshValueFunctions.stderr()))
.build();
@@ -169,7 +165,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
feed = SshFeed.builder()
.entity(entity)
.machine(machine)
- .poll(new SshPollConfig<String>(SENSOR_STRING)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
.command("exit 123")
.onFailure(new Function<SshPollValue, String>() {
@Override
@@ -191,7 +187,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
SshFeed.builder()
.entity(entity)
.onlyIfServiceUp()
- .poll(new SshPollConfig<String>(SENSOR_STRING)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
.command("echo hello")
.onSuccess(SshValueFunctions.stdout()))
.build();
@@ -226,7 +222,7 @@ public class SshFeedIntegrationTest extends BrooklynAppUnitTestSupport {
feed = SshFeed.builder()
.entity(entity2)
- .poll(new SshPollConfig<String>(SENSOR_STRING)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
.env(envSupplier)
.command(cmdSupplier)
.onSuccess(SshValueFunctions.stdout()))
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
----------------------------------------------------------------------
diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
new file mode 100644
index 0000000..94d451a
--- /dev/null
+++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.windows;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.brooklyn.feed.AbstractCommandFeed;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.location.winrm.WinRmMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.internal.winrm.WinRmToolResponse;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class CmdFeed extends AbstractCommandFeed {
+ public static class Builder extends org.apache.brooklyn.feed.AbstractCommandFeed.Builder<CmdFeed, CmdFeed.Builder> {
+ private List<CommandPollConfig<?>> polls = Lists.newArrayList();
+
+ @Override
+ public CmdFeed.Builder poll(CommandPollConfig<?> config) {
+ polls.add(config);
+ return self();
+ }
+
+ @Override
+ public List<CommandPollConfig<?>> getPolls() {
+ return polls;
+ }
+
+ @Override
+ protected CmdFeed.Builder self() {
+ return this;
+ }
+
+ @Override
+ protected CmdFeed instantiateFeed() {
+ return new CmdFeed(this);
+ }
+ }
+
+ public static CmdFeed.Builder builder() {
+ return new CmdFeed.Builder();
+ }
+
+ protected CmdFeed(final Builder builder) {
+ super(builder);
+ }
+ @Override
+ protected SshPollValue exec(String command, Map<String,String> env) throws IOException {
+ WinRmMachineLocation machine = (WinRmMachineLocation)getMachine();
+ if (log.isTraceEnabled()) log.trace("WinRm polling for {}, executing {} with env {}", new Object[] {machine, command, env});
+
+ WinRmToolResponse winRmToolResponse;
+ int exitStatus;
+ ConfigBag flags = ConfigBag.newInstanceExtending(config().getBag());
+ winRmToolResponse = machine.executeCommand(flags.getAllConfig(),
+ ImmutableList.of(command));
+ exitStatus = winRmToolResponse.getStatusCode();
+
+ return new SshPollValue(null, exitStatus, winRmToolResponse.getStdOut(), winRmToolResponse.getStdErr());
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c564cacc/software/winrm/src/test/java/org/apache/brooklyn/feed/windows/WinRmFeedIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/winrm/src/test/java/org/apache/brooklyn/feed/windows/WinRmFeedIntegrationTest.java b/software/winrm/src/test/java/org/apache/brooklyn/feed/windows/WinRmFeedIntegrationTest.java
new file mode 100644
index 0000000..afaedb4
--- /dev/null
+++ b/software/winrm/src/test/java/org/apache/brooklyn/feed/windows/WinRmFeedIntegrationTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.windows;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.brooklyn.api.entity.EntityInitializer;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.MachineProvisioningLocation;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.EntityInternal.FeedSupport;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.feed.CommandPollConfig;
+import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.feed.ssh.SshValueFunctions;
+import org.apache.brooklyn.location.winrm.WinRmMachineLocation;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.StringFunctions;
+import org.apache.brooklyn.util.text.StringPredicates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test is almost identical to {@link org.apache.brooklyn.feed.ssh.SshFeedIntegrationTest}.
+ * To launch the test I put in ~/.brooklyn/brooklyn.properties
+ * brooklyn.location.named.WindowsLiveTest=byon:(hosts=192.168.1.2,osFamily=windows,user=winUser,password=p0ssw0rd)
+ */
+public class WinRmFeedIntegrationTest extends BrooklynAppLiveTestSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(WinRmFeedIntegrationTest.class);
+
+ private static final String LOCATION_SPEC = "named:WindowsLiveTest";
+
+ final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString", "");
+ final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor("aLong", "");
+
+ private WinRmMachineLocation machine;
+ private TestEntity entity;
+ private CmdFeed feed;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ MachineProvisioningLocation<?> provisioningLocation = (MachineProvisioningLocation<?>)
+ mgmt.getLocationRegistry().getLocationManaged(LOCATION_SPEC);
+ machine = (WinRmMachineLocation)provisioningLocation.obtain(ImmutableMap.of());
+ entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ app.start(ImmutableList.of(machine));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ if (feed != null) feed.stop();
+ super.tearDown();
+ }
+
+ /** this is one of the most common pattern */
+ @Test(groups="Integration")
+ public void testReturnsStdoutAndInfersMachine() throws Exception {
+ final TestEntity entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ // inject the machine location, because the app was started with a provisioning location
+ // and TestEntity doesn't provision
+ .location(machine));
+
+ feed = CmdFeed.builder()
+ .entity(entity2)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
+ .command("echo hello")
+ .onSuccess(SshValueFunctions.stdout()))
+ .build();
+
+ EntityAsserts.assertAttributeEventuallyNonNull(entity2, SENSOR_STRING);
+ String val = entity2.getAttribute(SENSOR_STRING);
+ Assert.assertTrue(val.contains("hello"), "val="+val);
+ Assert.assertEquals(val.trim(), "hello");
+ }
+
+ @Test(groups="Integration")
+ public void testFeedDeDupe() throws Exception {
+ testReturnsStdoutAndInfersMachine();
+ entity.addFeed(feed);
+ log.info("Feed 0 is: "+feed);
+
+ testReturnsStdoutAndInfersMachine();
+ log.info("Feed 1 is: "+feed);
+ entity.addFeed(feed);
+
+ FeedSupport feeds = ((EntityInternal)entity).feeds();
+ Assert.assertEquals(feeds.getFeeds().size(), 1, "Wrong feed count: "+feeds.getFeeds());
+ }
+
+ @Test(groups="Integration")
+ public void testReturnsSshExitStatus() throws Exception {
+ feed = CmdFeed.builder()
+ .entity(entity)
+ .machine(machine)
+ .poll(new CommandPollConfig<Integer>(SENSOR_INT)
+ .command("exit 123")
+ .checkSuccess(Predicates.alwaysTrue())
+ .onSuccess(SshValueFunctions.exitStatus()))
+ .build();
+
+ EntityAsserts.assertAttributeEqualsEventually(entity, SENSOR_INT, 123);
+ }
+
+ @Test(groups="Integration")
+ public void testReturnsStdout() throws Exception {
+ feed = CmdFeed.builder()
+ .entity(entity)
+ .machine(machine)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
+ .command("echo hello")
+ .onSuccess(SshValueFunctions.stdout()))
+ .build();
+
+ EntityAsserts.assertAttributeEventually(entity, SENSOR_STRING,
+ Predicates.compose(Predicates.equalTo("hello"), StringFunctions.trim()));
+ }
+
+ @Test(groups="Integration")
+ public void testReturnsStderr() throws Exception {
+ final String cmd = "thiscommanddoesnotexist";
+
+ feed = CmdFeed.builder()
+ .entity(entity)
+ .machine(machine)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
+ .command(cmd)
+ .onFailure(SshValueFunctions.stderr()))
+ .build();
+
+ EntityAsserts.assertAttributeEventually(entity, SENSOR_STRING, StringPredicates.containsLiteral(cmd));
+ }
+
+ @Test(groups="Integration")
+ public void testFailsOnNonZero() throws Exception {
+ feed = CmdFeed.builder()
+ .entity(entity)
+ .machine(machine)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
+ .command("exit 123")
+ .onFailure(new Function<SshPollValue, String>() {
+ @Override
+ public String apply(SshPollValue input) {
+ return "Exit status " + input.getExitStatus();
+ }}))
+ .build();
+
+ EntityAsserts.assertAttributeEventually(entity, SENSOR_STRING, StringPredicates.containsLiteral("Exit status 123"));
+ }
+
+ @Test(groups="Integration")
+ public void testAddedEarly() throws Exception {
+ final TestEntity entity2 = app.addChild(EntitySpec.create(TestEntity.class)
+ .location(machine)
+ .addInitializer(new EntityInitializer() {
+ @Override
+ public void apply(EntityLocal entity) {
+ CmdFeed.builder()
+ .entity(entity)
+ .onlyIfServiceUp()
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
+ .command("echo hello")
+ .onSuccess(SshValueFunctions.stdout()))
+ .build();
+ }
+ }));
+
+ // TODO would be nice to hook in and assert no errors
+ EntityAsserts.assertAttributeEqualsContinually(entity2, SENSOR_STRING, null);
+
+ entity2.sensors().set(Attributes.SERVICE_UP, true);
+ EntityAsserts.assertAttributeEventually(entity2, SENSOR_STRING, StringPredicates.containsLiteral("hello"));
+ }
+
+
+ @Test(groups="Integration")
+ public void testDynamicEnvAndCommandSupplier() throws Exception {
+ final TestEntity entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class).location(machine));
+
+ final AtomicInteger count = new AtomicInteger();
+ Supplier<Map<String, String>> envSupplier = new Supplier<Map<String,String>>() {
+ @Override
+ public Map<String, String> get() {
+ return MutableMap.of("COUNT", ""+count.incrementAndGet());
+ }
+ };
+ Supplier<String> cmdSupplier = new Supplier<String>() {
+ @Override
+ public String get() {
+ return "echo count-"+count.incrementAndGet()+"-%COUNT%";
+ }
+ };
+
+ feed = CmdFeed.builder()
+ .entity(entity2)
+ .poll(new CommandPollConfig<String>(SENSOR_STRING)
+ .env(envSupplier)
+ .command(cmdSupplier)
+ .onSuccess(SshValueFunctions.stdout()))
+ .build();
+
+ EntityAsserts.assertAttributeEventuallyNonNull(entity2, SENSOR_STRING);
+ final String val1 = assertDifferentOneInOutput(entity2);
+
+ EntityAsserts.assertAttributeEventually(entity2, SENSOR_STRING, Predicates.not(Predicates.equalTo(val1)));
+ final String val2 = assertDifferentOneInOutput(entity2);
+ log.info("vals from dynamic sensors are: "+val1.trim()+" and "+val2.trim());
+ }
+
+ private String assertDifferentOneInOutput(final TestEntity entity2) {
+ String val = entity2.getAttribute(SENSOR_STRING);
+ Assert.assertTrue(val.startsWith("count"), "val="+val);
+ try {
+ String[] fields = val.trim().split("-");
+ int field1 = Integer.parseInt(fields[1]);
+ int field2 = Integer.parseInt(fields[2]);
+ Assert.assertEquals(Math.abs(field2-field1), 1, "expected difference of 1");
+ } catch (Throwable t) {
+ Exceptions.propagateIfFatal(t);
+ Assert.fail("Wrong output from sensor, got '"+val.trim()+"', giving error: "+t);
+ }
+ return val;
+ }
+
+}