You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/24 12:27:32 UTC

[brooklyn-server] 03/05: improve logic for configuring commandUrl to ssh-command-sensor

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 3259cacca18c288141952fb7840cbeeb984f89bc
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Wed Aug 24 13:00:42 2022 +0100

    improve logic for configuring commandUrl to ssh-command-sensor
    
    previous implementation (a) wouldn't install if the target didn't have an install dir, and (b) wouldn't work if install dir wasn't set until after a rebind
    (because subscriptions aren't persisted; the logic for setting up needs to be part of the feed)
---
 .../brooklyn/core/sensor/ssh/SshCommandSensor.java | 55 ++++++++--------------
 .../apache/brooklyn/feed/AbstractCommandFeed.java  | 31 ++++++++++--
 .../java/org/apache/brooklyn/feed/ssh/SshFeed.java | 51 ++++++++++++++++++++
 .../org/apache/brooklyn/feed/windows/CmdFeed.java  |  7 +++
 4 files changed, 106 insertions(+), 38 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
index 3632257361..551a14927a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
@@ -31,6 +31,8 @@ import org.apache.brooklyn.api.entity.EntityInitializer;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.mgmt.TaskFactory;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.MapConfigKey;
@@ -69,6 +71,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 
 /** 
  * Configurable {@link EntityInitializer} which adds an SSH sensor feed running the <code>command</code> supplied
@@ -86,7 +89,7 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
     public static final ConfigKey<String> SENSOR_COMMAND_URL = ConfigKeys.newStringConfigKey("commandUrl", "Remote SSH command to execute for sensor (takes precedence over command)");
     public static final ConfigKey<String> SENSOR_EXECUTION_DIR = ConfigKeys.newStringConfigKey("executionDir", "Directory where the command should run; "
         + "if not supplied, executes in the entity's run dir (or home dir if no run dir is defined); "
-        + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir");
+        + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir; not compatible with commandUrl");
     public static final ConfigKey<Object> VALUE_ON_ERROR = ConfigKeys.newConfigKey(Object.class, "value.on.error",
             "Value to be used if an error occurs whilst executing the ssh command", null);
     public static final MapConfigKey<Object> SENSOR_SHELL_ENVIRONMENT = BrooklynConfigKeys.SHELL_ENVIRONMENT;
@@ -109,36 +112,9 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
     public void apply(final EntityLocal entity) {
         ConfigBag params = initParams();
 
-        String commandUrl = EntityInitializers.resolve(initParams(), SENSOR_COMMAND_URL);
-        if (Objects.nonNull(commandUrl)) {
-
-            entity.subscriptions().subscribe(entity, BrooklynConfigKeys.INSTALL_DIR, booleanSensorEvent -> {
-                if (Strings.isNonBlank(booleanSensorEvent.getValue()) && !commandUrlInstalled.get()) {
-
-                    // Prepare path for a remote command script.
-                    // Take into account possibility of multiple ssh commands initialized at the same entity.
-                    String commandUrlPath = booleanSensorEvent.getValue() + "/command-url-" + Identifiers.makeRandomId(4)+ ".sh";
-
-                    // Look for SshMachineLocation and install remote command script.
-                    Maybe<SshMachineLocation> locationMaybe = Locations.findUniqueSshMachineLocation(entity.getLocations());
-                    if (locationMaybe.isPresent()) {
-                        TaskFactory<?> install = SshTasks.installFromUrl(locationMaybe.get(), commandUrl, commandUrlPath);
-                        Object ret = DynamicTasks.queueIfPossible(install.newTask()).orSubmitAsync(entity).andWaitForSuccess();
-
-                        // Prevent command duplicates in case if INSTALL_DIR changed from the outside.
-                        commandUrlInstalled.set(true);
-                    } else {
-                        throw new IllegalStateException("Could not find SshMachineLocation to run 'commandUrl'");
-                    }
-
-                    // Run a deferred command.
-                    params.putStringKey(SENSOR_COMMAND.getName(), "bash " + commandUrlPath);
-                    apply(entity, params);
-                }
-            });
-        } else {
-            apply(entity, params);
-        }
+        // previously if a commandUrl was used we would listen for the install dir to be set; but that doesn't survive rebind;
+        // now we install on first run as part of the SshFeed
+        apply(entity, params);
     }
 
     private void apply(final EntityLocal entity, final ConfigBag params) {
@@ -152,7 +128,7 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
         }
 
         Supplier<Map<String,String>> envSupplier = new EnvSupplier(entity, params);
-        Supplier<String> commandSupplier = new CommandSupplier(entity, params);
+        CommandSupplier commandSupplier = new CommandSupplier(entity, params);
 
         CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor)
                 .env(envSupplier)
@@ -163,12 +139,21 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> {
 
         standardPollConfig(entity, initParams(), pollConfig);
 
-        SshFeed feed = SshFeed.builder()
+        SshFeed.Builder feedBuilder = SshFeed.builder()
                 .entity(entity)
                 .onlyIfServiceUp(Maybe.ofDisallowingNull(EntityInitializers.resolve(params, ONLY_IF_SERVICE_UP)).or(true))
-                .poll(pollConfig)
-                .build();
+                .poll(pollConfig);
+
+        String commandUrl = EntityInitializers.resolve(initParams(), SENSOR_COMMAND_URL);
+        if (commandUrl!=null) {
+            feedBuilder.commandUrlToInstallAndRun(commandUrl);
+            // commandSupplier above will be ignored
+            if (commandSupplier.rawSensorCommand!=null || commandSupplier.rawSensorExecDir!=null) {
+                throw new IllegalArgumentException("commandUrl is not compatible with command or executionDir");
+            }
+        }
 
+        SshFeed feed = feedBuilder.build();
         entity.addFeed(feed);
         
         // Deprecated; kept for backwards compatibility with historic persisted state
diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
index 5a9921d166..75d8f71443 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
@@ -30,8 +30,10 @@ import java.util.concurrent.TimeUnit;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.location.MachineLocation;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.core.feed.AttributePollHandler;
 import org.apache.brooklyn.core.feed.DelegatingPollHandler;
@@ -40,6 +42,12 @@ import org.apache.brooklyn.core.location.Locations;
 import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.feed.function.FunctionFeed;
 import org.apache.brooklyn.feed.ssh.SshPollValue;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.text.Identifiers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.util.time.Duration;
@@ -95,7 +103,10 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
             "machine");
 
     public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand");
-    
+    public static final ConfigKey<String> COMMAND_URL = ConfigKeys.newStringConfigKey("commandUrl");
+
+    protected static final ConfigKey<String> COMMAND_URL_COPIED_AS = ConfigKeys.newStringConfigKey("commandUrlCopiedAs");
+
     @SuppressWarnings("serial")
     public static final ConfigKey<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
             new TypeToken<SetMultimap<CommandPollIdentifier, CommandPollConfig<?>>>() {},
@@ -108,6 +119,7 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
         private Duration period = Duration.of(500, TimeUnit.MILLISECONDS);
         private boolean execAsCommand = false;
         private String uniqueTag;
+        private String commandUrlToInstallAndRun;
         private volatile boolean built;
         
         public B entity(Entity val) {
@@ -139,6 +151,11 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
         public abstract B poll(CommandPollConfig<?> config);
         public abstract List<CommandPollConfig<?>> getPolls();
 
+        public B commandUrlToInstallAndRun(String commandUrl) {
+            this.commandUrlToInstallAndRun = commandUrl;
+            return self();
+        }
+
         public B execAsCommand() {
             execAsCommand = true;
             return self();
@@ -202,7 +219,8 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
         config().set(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
         config().set(MACHINE, builder.machine);
         config().set(EXEC_AS_COMMAND, builder.execAsCommand);
-        
+        config().set(COMMAND_URL, builder.commandUrlToInstallAndRun);
+
         SetMultimap<CommandPollIdentifier, CommandPollConfig<?>> polls = HashMultimap.<CommandPollIdentifier,CommandPollConfig<?>>create();
         for (CommandPollConfig<?> config : (List<CommandPollConfig<?>>)builder.getPolls()) {
             @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -225,7 +243,11 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
     
     @Override
     protected void preStart() {
-        getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> exec(pollInfo.command.get(), pollInfo.env.get()));
+        if (config().get(COMMAND_URL)!=null) {
+            getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> installAndExec(config().get(COMMAND_URL), pollInfo.env.get()));
+        } else {
+            getPoller().scheduleFeed(this, getConfig(POLLS), pollInfo -> () -> exec(pollInfo.command.get(), pollInfo.env.get()));
+        }
     }
     
     @Override
@@ -235,4 +257,7 @@ public abstract class AbstractCommandFeed extends AbstractFeed {
     }
     
     protected abstract SshPollValue exec(String command, Map<String,String> env) throws IOException;
+
+    protected abstract SshPollValue installAndExec(String commandUrl, Map<String,String> env) throws IOException;
+
 }
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
index a17e0707f6..0f4642643c 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
@@ -21,15 +21,23 @@ package org.apache.brooklyn.feed.ssh;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.location.Locations;
 import org.apache.brooklyn.feed.CommandPollConfig;
 import org.apache.brooklyn.location.ssh.SshMachineLocation;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.internal.ssh.SshTool;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
 import org.apache.brooklyn.util.core.task.ssh.internal.PlainSshExecTaskFactory;
 import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
 import org.apache.brooklyn.util.core.task.system.ProcessTaskStub.ScriptReturnType;
 import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.text.Identifiers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,4 +147,47 @@ public class SshFeed extends org.apache.brooklyn.feed.AbstractCommandFeed {
         return new SshPollValue(machine, task.getExitCode(), task.getStdout(), task.getStderr());
     }
 
+    protected SshPollValue installAndExec(String commandUrl, Map<String,String> env) throws IOException {
+        String commandUrlCopiedAs = config().get(COMMAND_URL_COPIED_AS);
+        if (commandUrlCopiedAs==null) {
+            synchronized (this) {
+                commandUrlCopiedAs = config().get(COMMAND_URL_COPIED_AS);
+                if (commandUrlCopiedAs==null) {
+                    String installDir = getEntity().sensors().get(BrooklynConfigKeys.INSTALL_DIR);
+                    if (installDir == null) {
+                        commandUrlCopiedAs = "brooklyn-ssh-command-url-" + entity.getApplicationId() + "-" + entity.getId() + "-" + Identifiers.makeRandomId(4) + ".sh";
+                        log.debug("Install dir not available at " + getEntity() + "; will use default/home directory for "+this+", in "+commandUrlCopiedAs);
+                    } else {
+                        commandUrlCopiedAs = Os.mergePathsUnix(installDir, "command-url-" + Identifiers.makeRandomId(4) + ".sh");
+                    }
+
+                    // Look for SshMachineLocation and install remote command script.
+                    Maybe<SshMachineLocation> locationMaybe = Locations.findUniqueSshMachineLocation(entity.getLocations());
+                    if (locationMaybe.isPresent()) {
+                        TaskFactory<?> install = SshTasks.installFromUrl(locationMaybe.get(), commandUrl, commandUrlCopiedAs);
+                        DynamicTasks.queueIfPossible(install.newTask()).orSubmitAsync(entity).andWaitForSuccess();
+                        log.debug("Installed from "+commandUrl+" to "+commandUrlCopiedAs+" at "+getEntity());
+                    } else {
+                        throw new IllegalStateException("Ssh machine location not available at " + getEntity() + "; skipping run of " + this);
+                    }
+
+                    config().set(COMMAND_URL_COPIED_AS, commandUrlCopiedAs);
+                }
+            }
+        }
+        return exec("bash "+commandUrlCopiedAs, env);
+    }
+
+    protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
+        if (key.getName().equals(COMMAND_URL.getName())) {
+            config().set(COMMAND_URL_COPIED_AS, (String)null);
+            return;
+        }
+        if (key.getName().equals(COMMAND_URL_COPIED_AS.getName())) {
+            // allowed
+            return;
+        }
+
+        super.doReconfigureConfig(key, val);
+    }
 }
diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
index 4cf3afbed0..209b8e94dc 100644
--- a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
+++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/CmdFeed.java
@@ -86,4 +86,11 @@ public class CmdFeed extends AbstractCommandFeed {
 
         return new SshPollValue(null, exitStatus, winRmToolResponse.getStdOut(), winRmToolResponse.getStdErr());
     }
+
+    @Override
+    protected SshPollValue installAndExec(String commandUrl, Map<String, String> env) throws IOException {
+        // TODO
+        throw new IllegalStateException("commandUrl not supported for WinRM cmds");
+    }
+
 }