You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/24 12:27:32 UTC
[brooklyn-server] 03/05: improve logic for configuring commandUrl to ssh-command-sensor
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 3259cacca18c288141952fb7840cbeeb984f89bc
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Wed Aug 24 13:00:42 2022 +0100
improve logic for configuring commandUrl to ssh-command-sensor
previous implementation (a) wouldn't install if the target didn't have an install dir, and (b) wouldn't work if install dir wasn't set until after a rebind
(because subscriptions aren't persisted; the logic for setting up needs to be part of the feed)
---
.../brooklyn/core/sensor/ssh/SshCommandSensor.java | 55 ++++++++--------------
.../apache/brooklyn/feed/AbstractCommandFeed.java | 31 ++++++++++--
.../java/org/apache/brooklyn/feed/ssh/SshFeed.java | 51 ++++++++++++++++++++
.../org/apache/brooklyn/feed/windows/CmdFeed.java | 7 +++
4 files changed, 106 insertions(+), 38 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
index 3632257361..551a14927a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
@@ -31,6 +31,8 @@ import org.apache.brooklyn.api.entity.EntityInitializer;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.MapConfigKey;
@@ -69,6 +71,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
/**
* Configurable {@link EntityInitializer} which adds an SSH sensor feed running the <code>command</code> supplied
@@ -86,7 +89,7 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
public static final ConfigKey<String> SENSOR_COMMAND_URL = ConfigKeys.newStringConfigKey("commandUrl", "Remote SSH command to execute for sensor (takes precedence over command)");
public static final ConfigKey<String> SENSOR_EXECUTION_DIR = ConfigKeys.newStringConfigKey("executionDir", "Directory where the command should run; "
+ "if not supplied, executes in the entity's run dir (or home dir if no run dir is defined); "
- + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir");
+ + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir; not compatible with commandUrl");
public static final ConfigKey<Object> VALUE_ON_ERROR = ConfigKeys.newConfigKey(Object.class, "value.on.error",
"Value to be used if an error occurs whilst executing the ssh command", null);
public static final MapConfigKey<Object> SENSOR_SHELL_ENVIRONMENT = BrooklynConfigKeys.SHELL_ENVIRONMENT;
@@ -109,36 +112,9 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
public void apply(final EntityLocal entity) {
ConfigBag params = initParams();
- String commandUrl = EntityInitializers.resolve(initParams(), SENSOR_COMMAND_URL);
- if (Objects.nonNull(commandUrl)) {
-
- entity.subscriptions().subscribe(entity, BrooklynConfigKeys.INSTALL_DIR, booleanSensorEvent -> {
- if (Strings.isNonBlank(booleanSensorEvent.getValue()) && !commandUrlInstalled.get()) {
-
- // Prepare path for a remote command script.
- // Take into account possibility of multiple ssh commands initialized at the same entity.
- String commandUrlPath = booleanSensorEvent.getValue() + "/command-url-" + Identifiers.makeRandomId(4)+ ".sh";
-
- // Look for SshMachineLocation and install remote command script.
- Maybe<SshMachineLocation> locationMaybe = Locations.findUniqueSshMachineLocation(entity.getLocations());
- if (locationMaybe.isPresent()) {
- TaskFactory<?> install = SshTasks.installFromUrl(locationMaybe.get(), commandUrl, commandUrlPath);
- Object ret = DynamicTasks.queueIfPossible(install.newTask()).orSubmitAsync(entity).andWaitForSuccess();
-
- // Prevent command duplicates in case if INSTALL_DIR changed from the outside.
- commandUrlInstalled.set(true);
- } else {
- throw new IllegalStateException("Could not find SshMachineLocation to run 'commandUrl'");
- }
-
- // Run a deferred command.
- params.putStringKey(SENSOR_COMMAND.getName(), "bash " + commandUrlPath);
- apply(entity, params);
- }
- });
- } else {
- apply(entity, params);
- }
+ // previously if a commandUrl was used we would listen for the install dir to be set; but that doesn't survive rebind;
+ // now we install on first run as part of the SshFeed
+ apply(entity, params);
}
private void apply(final EntityLocal entity, final ConfigBag params) {
@@ -152,7 +128,7 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
}
Supplier<Map<String,String>> envSupplier = new EnvSupplier(entity, params);
- Supplier<String> commandSupplier = new CommandSupplier(entity, params);
+ CommandSupplier commandSupplier = new CommandSupplier(entity, params);
CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor)
.env(envSupplier)
@@ -163,12 +139,21 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
standardPollConfig(entity, initParams(), pollConfig);
- SshFeed feed = SshFeed.builder()
+ SshFeed.Builder feedBuilder = SshFeed.builder()
.entity(entity)
.onlyIfServiceUp(Maybe.ofDisallowingNull(EntityInitializers.resolve(params, ONLY_IF_SERVICE_UP)).or(true))
- .poll(pollConfig)
- .build();
+ .poll(pollConfig);
+
+ String commandUrl = EntityInitializers.resolve(initParams(), SENSOR_COMMAND_URL);
+ if (commandUrl!=null) {
+ feedBuilder.commandUrlToInstallAndRun(commandUrl);
+ // commandSupplier above will be ignored
+ if (commandSupplier.rawSensorCommand!=null || commandSupplier.rawSensorExecDir!=null) {
+ throw new IllegalArgumentException("commandUrl is not compatible with command or executionDir");
+ }
+ }
+ SshFeed feed = feedBuilder.build();
entity.addFeed(feed);
// Deprecated; kept for backwards compatibility with historic persisted state
diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
index 5a9921d166..75d8f71443 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
@@ -30,8 +30,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.location.MachineLocation;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
import org.apache.brooklyn.core.feed.AbstractFeed;
import org.apache.brooklyn.core.feed.AttributePollHandler;
import org.apache.brooklyn.core.feed.DelegatingPollHandler;
@@ -40,6 +42,12 @@ import org.apache.brooklyn.core.location.Locations;
import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
import org.apache.brooklyn.feed.function.FunctionFeed;
import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.text.Identifiers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.brooklyn.util.time.Duration;
@@ -95,7 +103,10 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
"machine");
public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand");
-
+ public static final ConfigKey<String> COMMAND_URL = ConfigKeys.newStringConfigKey("commandUrl");
+
+ protected static final ConfigKey<String> COMMAND_URL_COPIED_AS = ConfigKeys.newStringConfigKey("commandUrlCopiedAs");
+
@SuppressWarnings("serial")
public static final ConfigKey<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
new TypeToken<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>>() {},
@@ -108,6 +119,7 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
private Duration period = Duration.of(500, TimeUnit.MILLISECONDS);
private boolean execAsCommand = false;
private String uniqueTag;
+ private String commandUrlToInstallAndRun;
private volatile boolean built;
public B entity(Entity val) {
@@ -139,6 +151,11 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
public abstract B poll(CommandPollConfig<?> config);
public abstract List<CommandPollConfig<?>> getPolls();
+ public B commandUrlToInstallAndRun(String commandUrl) {
+ this.commandUrlToInstallAndRun = commandUrl;
+ return self();
+ }
+
public B execAsCommand() {
execAsCommand = true;
return self();
@@ -202,7 +219,8 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
config().set(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
config().set(MACHINE, builder.machine);
config().set(EXEC_AS_COMMAND, builder.execAsCommand);
-
+ config().set(COMMAND_URL, builder.commandUrlToInstallAndRun);
+
SetMultimap<CommandPollIdentifier, CommandPollConfig<?>> polls = HashMultimap.<CommandPollIdentifier,CommandPollConfig<?>>create();
for (CommandPollConfig<?> config : (List<CommandPollConfig<?>>)builder.getPolls()) {
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -225,7 +243,11 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
@Override
protected void preStart() {
- getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> exec(pollInfo.command.get(), pollInfo.env.get()));
+ if (config().get(COMMAND_URL)!=null) {
+ getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> installAndExec(config().get(COMMAND_URL), pollInfo.env.get()));
+ } else {
+ getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> exec(pollInfo.command.get(), pollInfo.env.get()));
+ }
}
@Override
@@ -235,4 +257,7 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
}
protected abstract SshPollValue exec(String command, Map<String,String> env) throws IOException;
+
+ protected abstract SshPollValue installAndExec(String commandUrl, Map<String,String> env) throws IOException;
+
}
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
index a17e0707f6..0f4642643c 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
@@ -21,15 +21,23 @@ package org.apache.brooklyn.feed.ssh;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.location.Locations;
import org.apache.brooklyn.feed.CommandPollConfig;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.internal.ssh.SshTool;
import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
import org.apache.brooklyn.util.core.task.ssh.internal.PlainSshExecTaskFactory;
import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
import org.apache.brooklyn.util.core.task.system.ProcessTaskStub.ScriptReturnType;
import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.text.Identifiers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,4 +147,47 @@ public class SshFeed extends org.apache.brooklyn.feed.AbstractCommandFeed {
return new SshPollValue(machine, task.getExitCode(), task.getStdout(), task.getStderr());
}
+ protected SshPollValue installAndExec(String commandUrl, Map<String,String> env) throws IOException {
+ String commandUrlCopiedAs = config().get(COMMAND_URL_COPIED_AS);
+ if (commandUrlCopiedAs==null) {
+ synchronized (this) {
+ commandUrlCopiedAs = config().get(COMMAND_URL_COPIED_AS);
+ if (commandUrlCopiedAs==null) {
+ String installDir = getEntity().sensors().get(BrooklynConfigKeys.INSTALL_DIR);
+ if (installDir == null) {
+ commandUrlCopiedAs = "brooklyn-ssh-command-url-" + entity.getApplicationId() + "-" + entity.getId() + "-" + Identifiers.makeRandomId(4) + ".sh";
+ log.debug("Install dir not available at " + getEntity() + "; will use default/home directory for "+this+", in "+commandUrlCopiedAs);
+ } else {
+ commandUrlCopiedAs = Os.mergePathsUnix(installDir, "command-url-" + Identifiers.makeRandomId(4) + ".sh");
+ }
+
+ // Look for SshMachineLocation and install remote command script.
+ Maybe<SshMachineLocation> locationMaybe = Locations.findUniqueSshMachineLocation(entity.getLocations());
+ if (locationMaybe.isPresent()) {
+ TaskFactory<?> install = SshTasks.installFromUrl(locationMaybe.get(), commandUrl, commandUrlCopiedAs);
+ DynamicTasks.queueIfPossible(install.newTask()).orSubmitAsync(entity).andWaitForSuccess();
+ log.debug("Installed from "+commandUrl+" to "+commandUrlCopiedAs+" at "+getEntity());
+ } else {
+ throw new IllegalStateException("Ssh machine location not available at " + getEntity() + "; skipping run of " + this);
+ }
+
+ config().set(COMMAND_URL_COPIED_AS, commandUrlCopiedAs);
+ }
+ }
+ }
+ return exec("bash "+commandUrlCopiedAs, env);
+ }
+
+ protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
+ if (key.getName().equals(COMMAND_URL.getName())) {
+ config().set(COMMAND_URL_COPIED_AS, (String)null);
+ return;
+ }
+ if (key.getName().equals(COMMAND_URL_COPIED_AS.getName())) {
+ // allowed
+ return;
+ }
+
+ super.doReconfigureConfig(key, val);
+ }
}
diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
index 4cf3afbed0..209b8e94dc 100644
--- a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
+++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
@@ -86,4 +86,11 @@ public class CmdFeed extends AbstractCommandFeed {
return new SshPollValue(null, exitStatus, winRmToolResponse.getStdOut(), winRmToolResponse.getStdErr());
}
+
+ @Override
+ protected SshPollValue installAndExec(String commandUrl, Map<String, String> env) throws IOException {
+ // TODO
+ throw new IllegalStateException("commandUrl not supported for WinRM cmds");
+ }
+
}