You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/12/23 16:13:07 UTC

[1/6] incubator-brooklyn git commit: Adds SshjTool exec script async

Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 2cfe9386d -> 4a5072e52


Adds SshjTool exec script async


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/f10f5713
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/f10f5713
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/f10f5713

Branch: refs/heads/master
Commit: f10f57137e9e5e05c113f243f202e3777bbe533f
Parents: 0faf4d9
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 25 21:42:33 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Dec 19 00:18:46 2014 +0000

----------------------------------------------------------------------
 .../util/internal/ssh/ShellAbstractTool.java    | 115 +++++++++++++++-
 .../brooklyn/util/internal/ssh/ShellTool.java   |   9 +-
 .../util/internal/ssh/sshj/SshjTool.java        | 135 ++++++++++++++++++-
 .../ssh/sshj/SshjToolIntegrationTest.java       |  90 +++++++++++++
 4 files changed, 338 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f10f5713/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
index 56bff17..5b3370d 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
@@ -46,6 +46,7 @@ import brooklyn.util.text.Strings;
 import brooklyn.util.time.Time;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 
 public abstract class ShellAbstractTool implements ShellTool {
 
@@ -220,6 +221,7 @@ public abstract class ShellAbstractTool implements ShellTool {
         protected final Boolean runAsRoot;
         protected final Boolean noExtraOutput;
         protected final Boolean noDeleteAfterExec;
+        protected final String scriptNameWithoutExtension;
         protected final String scriptPath;
 
         public ToolAbstractExecScript(Map<String,?> props) {
@@ -239,10 +241,10 @@ public abstract class ShellAbstractTool implements ShellTool {
                 if (summary.length()>30) 
                     summary = summary.substring(0,30);
             }
-            this.scriptPath = scriptDir+"/brooklyn-"+
-                Time.makeDateStampString()+"-"+Identifiers.makeRandomId(4)+
-                (summary==null ? "" : "-"+summary) +
-                ".sh";
+            this.scriptNameWithoutExtension = "brooklyn-"+
+                    Time.makeDateStampString()+"-"+Identifiers.makeRandomId(4)+
+                    (summary==null ? "" : "-"+summary);
+            this.scriptPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension+".sh");
         }
 
         /** builds the command to run the given script;
@@ -262,7 +264,112 @@ public abstract class ShellAbstractTool implements ShellTool {
             return cmds.build();
         }
 
+        protected String getSummary() {
+            String summary = getOptionalVal(props, PROP_SUMMARY);
+            return (summary != null) ? summary : scriptPath; 
+        }
+
         public abstract int run();
     }
     
+    protected abstract class ToolAbstractAsyncExecScript extends ToolAbstractExecScript {
+        protected final String stdoutPath;
+        protected final String stderrPath;
+        protected final String exitStatusPath;
+        protected final String pidPath;
+
+        public ToolAbstractAsyncExecScript(Map<String,?> props) {
+            super(props);
+
+            stdoutPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".stdout");
+            stderrPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".stderr");
+            exitStatusPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".exitstatus");
+            pidPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".pid");
+        }
+
+        /**
+         * Builds the command to run the given script, asynchronously.
+         * The executed command will return immediately, but the output from the script
+         * will continue to be written 
+         * note that some modes require \$RESULT passed in order to access a variable, whereas most just need $ */
+        protected List<String> buildRunScriptCommand() {
+            // TODO 
+            String touchCmd = String.format("touch %s; touch %s; touch %s; touch %s", stdoutPath, stderrPath, exitStatusPath, pidPath);
+            String cmd = String.format("( %s > %s 2> %s < /dev/null ; echo $? > %s ) & disown", scriptPath, stdoutPath, stderrPath, exitStatusPath);
+            MutableList.Builder<String> cmds = MutableList.<String>builder()
+                    .add((runAsRoot ? BashCommands.sudo(touchCmd) : touchCmd))
+                    .add((runAsRoot ? BashCommands.sudo(cmd) : cmd))
+                    .add("echo $! > "+pidPath)
+                    .add("RESULT=$?");
+            if (noExtraOutput==null || !noExtraOutput) {
+                cmds.add("echo Executing async "+scriptPath);
+            }
+            cmds.add("exit $RESULT");
+            return cmds.build();
+        }
+
+        /**
+         * Builds the command to retrieve the exit status of the command, written to stdout.
+         */
+        protected List<String> buildRetrieveStatusCommand() {
+            String cmd = 
+                    "if test -s "+exitStatusPath+"; then"+"\n"+
+                    "    cat "+exitStatusPath+"\n"+
+                    "elif test -s "+pidPath+"; then"+"\n"+
+                    "    pid=`cat "+pidPath+"`"+"\n"+
+                    "    if ! ps -p $pid > /dev/null < /dev/null; then"+"\n"+
+                    "        # no exit status, and not executing; give a few seconds grace in case just about to write exit status"+"\n"+
+                    "        sleep 3"+"\n"+
+                    "        if test -s "+exitStatusPath+"; then"+"\n"+
+                    "            cat "+exitStatusPath+""+"\n"+
+                    "        else"+"\n"+
+                    "            echo \"No exit status in a.exitstatus, and pid in a.pid ($pid) not executing\""+"\n"+
+                    "            exit 1"+"\n"+
+                    "        fi"+"\n"+
+                    "    fi"+"\n"+
+                    "else"+"\n"+
+                    "    echo \"No exit status in "+exitStatusPath+", and "+pidPath+" is empty\""+"\n"+
+                    "    exit 1"+"\n"+
+                    "fi"+"\n";
+
+            MutableList.Builder<String> cmds = MutableList.<String>builder()
+                    .add((runAsRoot ? BashCommands.sudo(cmd) : cmd))
+                    .add("RESULT=$?");
+            cmds.add("exit $RESULT");
+            return cmds.build();
+        }
+
+        /**
+         * Builds the command to retrieve the stdout and stderr of the async command.
+         * An offset can be given, to only retrieve data starting at a particular character (indexed from 0).
+         */
+        protected List<String> buildRetrieveStdoutAndStderrCommand(int stdoutPosition, int stderrPosition) {
+            // Note that `tail -c +1` means start at the *first* character (i.e. start counting from 1, not 0)
+            String catStdoutCmd = "tail -c +"+(stdoutPosition+1)+" "+stdoutPath;
+            String catStderrCmd = "tail -c +"+(stderrPosition+1)+" "+stderrPath+" 1>&2";
+            MutableList.Builder<String> cmds = MutableList.<String>builder()
+                    .add((runAsRoot ? BashCommands.sudo(catStdoutCmd) : catStdoutCmd))
+                    .add((runAsRoot ? BashCommands.sudo(catStderrCmd) : catStderrCmd))
+                    .add("RESULT=$?");
+            cmds.add("exit $RESULT");
+            return cmds.build();
+        }
+
+        protected List<String> deleteTemporaryFilesCommand() {
+            if (!Boolean.TRUE.equals(noDeleteAfterExec)) {
+                // use "-f" because some systems have "rm" aliased to "rm -i"
+                // use "< /dev/null" to guarantee doesn't hang
+                return ImmutableList.of(
+                        "rm -f "+scriptPath+" < /dev/null",
+                        "rm -f "+stdoutPath+" < /dev/null",
+                        "rm -f "+stderrPath+" < /dev/null",
+                        "rm -f "+exitStatusPath+" < /dev/null",
+                        "rm -f "+pidPath+" < /dev/null");
+            } else {
+                return ImmutableList.<String>of();
+            }
+        }
+
+        public abstract int run();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f10f5713/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
index 677d5c9..5cd392b 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
@@ -20,7 +20,7 @@ package brooklyn.util.internal.ssh;
 
 import static brooklyn.entity.basic.ConfigKeys.newConfigKey;
 import static brooklyn.entity.basic.ConfigKeys.newStringConfigKey;
-import java.io.File;
+
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +28,7 @@ import java.util.Map;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.util.os.Os;
+import brooklyn.util.time.Duration;
 
 /** Methods for executing things in an environment (localhost process, or ssh) */
 public interface ShellTool {
@@ -55,9 +56,13 @@ public interface ShellTool {
     public static final ConfigKey<String> PROP_DIRECT_HEADER = newConfigKey("directHeader", "commands to run at the target before any caller-supplied commands for direct execution", "exec bash -e");
 
     ConfigKey<Boolean> PROP_NO_DELETE_SCRIPT = newConfigKey("noDeleteAfterExec", "Retains the generated script file after executing the commands instead of deleting it", false);
-    
+
     ConfigKey<String> PROP_SUMMARY = ConfigKeys.newStringConfigKey("summary", "Provides a human-readable summary, used in file generation etc");
     
+    ConfigKey<Boolean> PROP_EXEC_ASYNC = newConfigKey("execAsync", "Executes the script asynchronously, and then polls for the result (and for stdout/stderr)", false);
+
+    ConfigKey<Duration> PROP_EXEC_ASYNC_TIMEOUT = newConfigKey("execAsyncTimeout", "Timeout when executing a script asynchronously", Duration.PRACTICALLY_FOREVER);
+
     /**
      * Executes the set of commands in a shell script. Blocks until completion.
      * <p>

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f10f5713/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
index 2ad6a0a..99debff 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Throwables.getCausalChain;
 import static com.google.common.collect.Iterables.any;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -33,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import net.schmizz.sshj.connection.ConnectionException;
@@ -47,7 +49,6 @@ import net.schmizz.sshj.transport.TransportException;
 import net.schmizz.sshj.xfer.InMemorySourceFile;
 
 import org.apache.commons.io.input.ProxyInputStream;
-import org.bouncycastle.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +60,8 @@ import brooklyn.util.io.FileUtil;
 import brooklyn.util.stream.KnownSizeInputStream;
 import brooklyn.util.stream.StreamGobbler;
 import brooklyn.util.stream.Streams;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -299,16 +302,134 @@ public class SshjTool extends SshAbstractTool implements SshTool {
      * 
      * So on balance, the script-based approach seems most reliable, even if there is an overhead
      * of separate message(s) for copying the file!
+     * 
+     * Another consideration is long-running scripts. On some clouds when executing a script that takes 
+     * several minutes, we have seen it fail with -1 (e.g. 1 in 20 times). This suggests the ssh connection
+     * is being dropped. To avoid this problem, we can execute the script asynchronously, writing to files
+     * the stdout/stderr/pid/exitStatus. We then periodically poll to retrieve the contents of these files.
+     * Use {@link #PROP_EXEC_ASYNC} to force this mode of execution.
      */
     @Override
     public int execScript(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
-        return new ToolAbstractExecScript(props) {
+        Boolean execAsync = getOptionalVal(props, PROP_EXEC_ASYNC);
+        if (Boolean.TRUE.equals(execAsync)) {
+            return execScriptAsyncAndPoll(props, commands, env);
+        } else {
+            return new ToolAbstractExecScript(props) {
+                public int run() {
+                    String scriptContents = toScript(props, commands, env);
+                    if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as script: {}", host, scriptContents);
+                    copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
+                    
+                    return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);                
+                }
+            }.run();
+        }
+    }
+
+    protected int execScriptAsyncAndPoll(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
+        return new ToolAbstractAsyncExecScript(props) {
             public int run() {
                 String scriptContents = toScript(props, commands, env);
-                if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as script: {}", host, scriptContents);
+                if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as async script: {}", host, scriptContents);
                 copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
                 
-                return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);                
+                // Execute script asynchronously
+                int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);
+                if (execResult != 0) return execResult;
+
+                // Poll repeatedly to get the status
+                Duration initialDelayBetweenPolls = Duration.ONE_SECOND;
+                Duration maxDelayBetweenPolls = Duration.seconds(20);
+                double multiplierBetweenPolls = 1.2;
+                int maxConsecutiveSshFailures = 3;
+                Duration delayBetweenPolls = initialDelayBetweenPolls;
+                Stopwatch executionStartTime = Stopwatch.createStarted();
+                Duration timeout = getOptionalVal(props, PROP_EXEC_ASYNC_TIMEOUT);
+                if (timeout == null) timeout = Duration.PRACTICALLY_FOREVER;
+                int stdoutCount = 0;
+                int stderrCount = 0;
+                int iteration = 0;
+                int consecutiveSshFailures = 0;
+                try {
+                    do {
+                        // Retrieve the exit status of the async script
+                        ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
+                        ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
+                        int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr)), -1);
+    
+                        // Retrieve the stdout + stderr of the async script.
+                        // Always do this *after* retrieving exit status, so don't miss anything.
+                        if (out != null || err != null) {
+                            ByteArrayOutputStream streamsOut = new ByteArrayOutputStream();
+                            ByteArrayOutputStream streamsErr = new ByteArrayOutputStream();
+                            int streamsResult = asInt(acquire(new ShellAction(buildRetrieveStdoutAndStderrCommand(stdoutCount, stderrCount), streamsOut, streamsErr)), -1);
+                            
+                            if (streamsResult == 0) {
+                                stdoutCount += streamsOut.size();
+                                stderrCount += streamsErr.size();
+                            } else {
+                                if (out != null) out.write(toUTF8ByteArray("retrieving stdout/stderr failed with exit code "+streamsResult+" (stdout follow)"));
+                                if (err != null) err.write(toUTF8ByteArray("retrieving stdout/stderr failed with exit code "+streamsResult+" (stderr follow)"));
+                            }
+                            if (out != null) out.write(streamsOut.toByteArray());
+                            if (err != null) err.write(streamsErr.toByteArray());
+                        }
+
+                        if (statusResult != 0) {
+                            // Failed to get the exit status; if it was -1 (i.e. connection error) then consider retrying; otherwise abort.
+                            if (out != null) {
+                                out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
+                                out.write(statusOut.toByteArray());
+                            }
+                            if (err != null) {
+                                err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
+                                err.write(statusErr.toByteArray());
+                            }
+                            if (statusResult == -1) {
+                                // Looks like an ssh timeout/connection error; we are willing to retry
+                                consecutiveSshFailures++;
+                                if (consecutiveSshFailures > maxConsecutiveSshFailures) {
+                                    LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")"+"\n"
+                                            +"\t"+"EXIT STATUS: -1"+"\n"
+                                            +"\t"+"STDERR: "+new String(statusErr.toByteArray())+"\n"
+                                            +"\t"+"STDOUT: "+new String(statusOut.toByteArray())+"\n");
+                                    return -1;
+                                } else {
+                                    LOG.info("ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")"+"\n"
+                                            +"\t"+"EXIT STATUS: -1"+"\n"
+                                            +"\t"+"STDERR: "+new String(statusErr.toByteArray())+"\n"
+                                            +"\t"+"STDOUT: "+new String(statusOut.toByteArray())+"\n");
+                                }
+                            } else {
+                                return statusResult;
+                            }
+                        } else {
+                            consecutiveSshFailures = 0;
+                            String statusOutStr = new String(statusOut.toByteArray());
+                            if (Strings.isNonBlank(statusOutStr)) {
+                                return Integer.parseInt(statusOutStr.trim());
+                            }
+                        }
+                        
+                        if (iteration == 0) {
+                            if (LOG.isDebugEnabled()) LOG.debug("Waiting for async script to complete on "+SshjTool.this.toString()+" (polling periodically for "+getSummary()+")");
+                        } else {
+                            if (LOG.isTraceEnabled()) LOG.trace("Still waiting for async script to complete on "+SshjTool.this.toString()+" (polling periodically for "+getSummary()+"; iteration="+iteration+")");
+                        }
+                        Time.sleep(delayBetweenPolls);
+                        delayBetweenPolls = Duration.min(maxDelayBetweenPolls, delayBetweenPolls.multiply(multiplierBetweenPolls));
+                        iteration++;
+                        
+                    } while (timeout.isLongerThan(Duration.millis(executionStartTime.elapsed(TimeUnit.MILLISECONDS))));
+                    
+                    // Timed out
+                    String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
+                    LOG.warn(msg+"; rethrowing");
+                    throw new TimeoutException(msg);
+                } catch (Exception e) {
+                    throw Exceptions.propagate(e);
+                }
             }
         }.run();
     }
@@ -714,7 +835,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
 
                 for (CharSequence cmd : commands) {
                     try {
-                        output.write(Strings.toUTF8ByteArray(cmd+"\n"));
+                        output.write(toUTF8ByteArray(cmd+"\n"));
                         output.flush();
                     } catch (ConnectionException e) {
                         if (!shell.isOpen()) {
@@ -784,6 +905,10 @@ public class SshjTool extends SshAbstractTool implements SshTool {
         }
     }
 
+    private byte[] toUTF8ByteArray(String string) {
+        return org.bouncycastle.util.Strings.toUTF8ByteArray(string);
+    }
+    
     private Supplier<InputStream> newInputStreamSupplier(final byte[] contents) {
         return new Supplier<InputStream>() {
             @Override public InputStream get() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f10f5713/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
index 69b1351..f5b8700 100644
--- a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
+++ b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
@@ -19,6 +19,7 @@
 package brooklyn.util.internal.ssh.sshj;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -28,18 +29,23 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import net.schmizz.sshj.connection.channel.direct.Session;
 
 import org.testng.annotations.Test;
 
+import brooklyn.test.Asserts;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.internal.ssh.SshException;
 import brooklyn.util.internal.ssh.SshTool;
 import brooklyn.util.internal.ssh.SshToolAbstractIntegrationTest;
 import brooklyn.util.os.Os;
+import brooklyn.util.time.Duration;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
@@ -157,7 +163,91 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
         assertEquals(localtool3.getLocalTempDir(), new File(Os.tidyPath(customRelativeTempDir)));
     }
 
+    @Test(groups = {"Integration"})
+    public void testAsyncExecStdoutAndStderr() throws Exception {
+        // Include a sleep, to ensure that the contents retrieved in first poll and subsequent polls are appended
+        List<String> cmds = ImmutableList.of(
+                "echo mystringToStdout",
+                "echo mystringToStderr 1>&2",
+                "sleep 3",
+                "echo mystringPostSleepToStdout",
+                "echo mystringPostSleepToStderr 1>&2");
+        
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ByteArrayOutputStream err = new ByteArrayOutputStream();
+        int exitCode = tool.execScript(
+                ImmutableMap.of("out", out, "err", err, SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true), 
+                cmds, 
+                ImmutableMap.<String,String>of());
+        String outStr = new String(out.toByteArray());
+        String errStr = new String(err.toByteArray());
 
+        assertEquals(exitCode, 0);
+        assertEquals("mystringToStdout\nmystringPostSleepToStdout", outStr.trim());
+        assertEquals("mystringToStderr\nmystringPostSleepToStderr", errStr.trim());
+    }
+
+    @Test(groups = {"Integration"})
+    public void testAsyncExecReturnsExitCode() throws Exception {
+        int exitCode = tool.execScript(
+                ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true), 
+                ImmutableList.of("exit 123"), 
+                ImmutableMap.<String,String>of());
+        assertEquals(exitCode, 123);
+    }
+
+    @Test(groups = {"Integration"})
+    public void testAsyncExecTimesOut() throws Exception {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        try {
+            tool.execScript(
+                ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_ASYNC_TIMEOUT.getName(), Duration.millis(1)), 
+                ImmutableList.of("sleep 60"), 
+                ImmutableMap.<String,String>of());
+            fail();
+        } catch (Exception e) {
+            TimeoutException te = Exceptions.getFirstThrowableOfType(e, TimeoutException.class);
+            if (te == null) throw e;
+        }
+        
+        long seconds = stopwatch.elapsed(TimeUnit.SECONDS);
+        assertTrue(seconds < 30, "exec took "+seconds+" seconds");
+    }
+
+    @Test(groups = {"Integration"})
+    public void testAsyncExecAbortsIfProcessFails() throws Exception {
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                Stopwatch stopwatch = Stopwatch.createStarted();
+                int exitStatus = tool.execScript(
+                    ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_ASYNC_TIMEOUT.getName(), Duration.millis(1)), 
+                    ImmutableList.of("sleep 63"), 
+                    ImmutableMap.<String,String>of());
+                
+                assertEquals(exitStatus, 1);
+                
+                long seconds = stopwatch.elapsed(TimeUnit.SECONDS);
+                assertTrue(seconds < 30, "exec took "+seconds+" seconds");
+            }});
+        try {
+            thread.start();
+            
+            Asserts.succeedsEventually(new Runnable() {
+                public void run() {
+                    int exitStatus = tool.execCommands(ImmutableMap.<String,Object>of(), ImmutableList.of("ps aux| grep \"sleep 63\" | grep -v grep"));
+                    assertEquals(exitStatus, 0);
+                }});
+            
+            tool.execCommands(ImmutableMap.<String,Object>of(), ImmutableList.of("ps aux| grep \"sleep 63\" | grep -v grep | awk '{print($2)}' | xargs kill"));
+            
+            thread.join(30*1000);
+            assertFalse(thread.isAlive());
+        } finally {
+            thread.interrupt();
+        }
+    }
+
+    
     protected String execShellDirect(List<String> cmds) {
         return execShellDirect(cmds, ImmutableMap.<String,Object>of());
     }


[2/6] incubator-brooklyn git commit: install java: use async-ssh-exec

Posted by al...@apache.org.
install java: use async-ssh-exec


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/ee8d521c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/ee8d521c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/ee8d521c

Branch: refs/heads/master
Commit: ee8d521c17aae93c5865752ed16664b67e6da14f
Parents: 5c0cc78
Author: Aled Sage <al...@gmail.com>
Authored: Thu Dec 18 22:35:30 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Dec 19 00:18:47 2014 +0000

----------------------------------------------------------------------
 .../brooklyn/entity/java/JavaSoftwareProcessSshDriver.java    | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ee8d521c/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java b/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java
index 88dd631..3847261 100644
--- a/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java
+++ b/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java
@@ -42,10 +42,12 @@ import brooklyn.util.collections.MutableMap;
 import brooklyn.util.collections.MutableSet;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.flags.TypeCoercions;
+import brooklyn.util.internal.ssh.ShellTool;
 import brooklyn.util.ssh.BashCommands;
 import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.task.ssh.SshTasks;
+import brooklyn.util.task.system.ProcessTaskFactory;
 import brooklyn.util.task.system.ProcessTaskWrapper;
 import brooklyn.util.text.StringEscapes.BashStringEscapes;
 import brooklyn.util.text.Strings;
@@ -339,8 +341,9 @@ public abstract class JavaSoftwareProcessSshDriver extends AbstractSoftwareProce
         getLocation().acquireMutex("installing", "installing Java at " + getLocation());
         try {
             log.debug("Installing Java {} at {}@{}", new Object[]{version, getEntity(), getLocation()});
-            ProcessTaskWrapper<Integer> installCommand = Entities.submit(getEntity(),
-                    SshTasks.newSshExecTaskFactory(getLocation(), command));
+            ProcessTaskFactory<Integer> taskFactory = SshTasks.newSshExecTaskFactory(getLocation(), command)
+                    .configure(ShellTool.PROP_EXEC_ASYNC, true);
+            ProcessTaskWrapper<Integer> installCommand = Entities.submit(getEntity(), taskFactory);
             int result = installCommand.get();
             if (result != 0) {
                 log.warn("Installation of Java {} failed at {}@{}: {}",


[6/6] incubator-brooklyn git commit: This closes #358

Posted by al...@apache.org.
This closes #358


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4a5072e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4a5072e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4a5072e5

Branch: refs/heads/master
Commit: 4a5072e52a790eeb73a4f8e789c2cb6e54c6c682
Parents: 2cfe938 63483f8
Author: Aled Sage <al...@gmail.com>
Authored: Tue Dec 23 15:12:46 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Dec 23 15:12:46 2014 +0000

----------------------------------------------------------------------
 .../internal/BrooklynFeatureEnablement.java     |  10 +
 .../util/internal/ssh/ShellAbstractTool.java    | 167 +++++++++++++-
 .../brooklyn/util/internal/ssh/ShellTool.java   |  11 +-
 .../util/internal/ssh/sshj/SshjTool.java        | 216 +++++++++++++++++--
 .../internal/ssh/ShellToolAbstractTest.java     |   5 +-
 .../sshj/SshjToolAsyncStubIntegrationTest.java  | 169 +++++++++++++++
 .../ssh/sshj/SshjToolIntegrationTest.java       |  95 ++++++++
 .../java/JavaSoftwareProcessSshDriver.java      |   7 +-
 8 files changed, 648 insertions(+), 32 deletions(-)
----------------------------------------------------------------------



[5/6] incubator-brooklyn git commit: ssh async-exec: incorporate comments + logging

Posted by al...@apache.org.
ssh async-exec: incorporate comments + logging

Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/63483f8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/63483f8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/63483f8e

Branch: refs/heads/master
Commit: 63483f8ea3611d6534ee27bea4debe085f488e4d
Parents: ee8d521
Author: Aled Sage <al...@gmail.com>
Authored: Tue Dec 23 14:10:02 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Dec 23 14:10:02 2014 +0000

----------------------------------------------------------------------
 .../java/brooklyn/internal/BrooklynFeatureEnablement.java |  9 ++++++++-
 .../brooklyn/util/internal/ssh/ShellAbstractTool.java     |  1 -
 .../java/brooklyn/util/internal/ssh/sshj/SshjTool.java    | 10 ++++++++++
 3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/63483f8e/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java b/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
index d8aa66e..9946ac7 100644
--- a/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
+++ b/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import brooklyn.internal.storage.BrooklynStorage;
 import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.util.internal.ssh.ShellTool;
 
 import com.google.common.annotations.Beta;
 import com.google.common.collect.Maps;
@@ -84,6 +85,12 @@ public class BrooklynFeatureEnablement {
      */
     public static final String FEATURE_INFER_CATALOG_ITEM_ON_REBIND = "brooklyn.backwardCompatibility.feature.inferCatalogItemOnRebind";
     
+    /**
+     * When executing over ssh, whether to support the "async exec" approach, or only the classic approach.
+     * 
+     * If this feature is disabled, then even if the {@link ShellTool#PROP_EXEC_ASYNC} is configured it
+     * will still use the classic ssh approach.
+     */
     public static final String FEATURE_SSH_ASYNC_EXEC = "brooklyn.experimental.feature.ssh.asyncExec";
 
     private static final Map<String, Boolean> FEATURE_ENABLEMENTS = Maps.newLinkedHashMap();
@@ -104,7 +111,7 @@ public class BrooklynFeatureEnablement {
         setDefault(FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, false);
         setDefault(FEATURE_RENAME_THREADS, false);
         setDefault(FEATURE_INFER_CATALOG_ITEM_ON_REBIND, true);
-        setDefault(FEATURE_SSH_ASYNC_EXEC, true);
+        setDefault(FEATURE_SSH_ASYNC_EXEC, false);
     }
     
     static {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/63483f8e/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
index 41dc886..82624d7 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
@@ -297,7 +297,6 @@ public abstract class ShellAbstractTool implements ShellTool {
          * will continue to be written 
          * note that some modes require \$RESULT passed in order to access a variable, whereas most just need $ */
         protected List<String> buildRunScriptCommand() {
-            // TODO 
             String touchCmd = String.format("touch %s %s %s %s", stdoutPath, stderrPath, exitStatusPath, pidPath);
             String cmd = String.format("( %s > %s 2> %s < /dev/null ; echo $? > %s ) & disown", scriptPath, stdoutPath, stderrPath, exitStatusPath);
             MutableList.Builder<String> cmds = MutableList.<String>builder()

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/63483f8e/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
index 154c433..e7ecd0c 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
@@ -319,6 +319,9 @@ public class SshjTool extends SshAbstractTool implements SshTool {
         if (Boolean.TRUE.equals(execAsync) && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC)) {
             return execScriptAsyncAndPoll(props, commands, env);
         } else {
+            if (Boolean.TRUE.equals(execAsync)) {
+                if (LOG.isDebugEnabled()) LOG.debug("Ignoring ssh exec-async configuration, because feature is disabled");
+            }
             return new ToolAbstractExecScript(props) {
                 public int run() {
                     String scriptContents = toScript(props, commands, env);
@@ -397,15 +400,18 @@ public class SshjTool extends SshAbstractTool implements SshTool {
                 stderrCount += (countingErr == null) ? 0 : countingErr.getCount();
                 
                 if (longPollResult == 0) {
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll succeeded (exit status 0) on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                     return longPollResult; // success
                     
                 } else if (longPollResult == -1) {
                     // probably a connection failure; try again
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                     return null;
                     
                 } else {
                     // want to double-check whether this is the exit-code from the async process, or
                     // some unexpected failure in our long-poll command.
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                     Integer result = retrieveStatusCommand();
                     if (result != null) {
                         return result;
@@ -434,14 +440,17 @@ public class SshjTool extends SshAbstractTool implements SshTool {
                     String statusOutStr = new String(statusOut.toByteArray()).trim();
                     if (Strings.isEmpty(statusOutStr)) {
                         // suggests not yet completed; will retry with long-poll
+                        if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; command successful but no result available on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                         return null;
                     } else {
+                        if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; returning '"+statusOutStr+"' on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                         int result = Integer.parseInt(statusOutStr);
                         return result;
                     }
 
                 } else if (statusResult == -1) {
                     // probably a connection failure; try again with long-poll
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status directly received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                     return null;
                     
                 } else {
@@ -454,6 +463,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
                         err.write(statusErr.toByteArray());
                     }
                     
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status failed; returning "+statusResult+" on "+SshjTool.this.toString()+" (for "+getSummary()+")");
                     return statusResult;
                 }
             }


[4/6] incubator-brooklyn git commit: ShellToolAbstractTset.execCommands: remove duplication

Posted by al...@apache.org.
ShellToolAbstractTset.execCommands: remove duplication


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/32f92a4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/32f92a4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/32f92a4a

Branch: refs/heads/master
Commit: 32f92a4a657b677d2b5d6d951b1eef85425c9ccc
Parents: f10f571
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 25 21:44:06 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Dec 19 00:18:47 2014 +0000

----------------------------------------------------------------------
 .../java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java  | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/32f92a4a/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java b/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java
index ec84e28..b8b394b 100644
--- a/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java
+++ b/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java
@@ -399,10 +399,7 @@ public abstract class ShellToolAbstractTest {
     }
 
     protected String execCommands(List<String> cmds, Map<String,?> env) {
-        execCommands(null, cmds, env);
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        tool.execCommands(ImmutableMap.of("out", out), cmds, env);
-        return new String(out.toByteArray());
+        return execCommands(null, cmds, env);
     }
 
     protected String execCommands(ConfigBag config, List<String> cmds, Map<String,?> env) {


[3/6] incubator-brooklyn git commit: ssh-async: use long-polling

Posted by al...@apache.org.
ssh-async: use long-polling

- Polling ssh command executes until either command completes,
  process no longer running, or timeout.
- Adds PROP_EXEC_TIMEOUT for control when ssh commands should timeout.
- Adds PROP_EXEC_ASYNC_POLLING_TIMEOUT for the long-polling of
  async exec, to timeout each poll.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/5c0cc786
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/5c0cc786
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/5c0cc786

Branch: refs/heads/master
Commit: 5c0cc786a098abf9311336ea2ba4fa311413e3c7
Parents: 32f92a4
Author: Aled Sage <al...@gmail.com>
Authored: Mon Dec 1 17:09:38 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Dec 19 00:18:47 2014 +0000

----------------------------------------------------------------------
 .../internal/BrooklynFeatureEnablement.java     |   3 +
 .../util/internal/ssh/ShellAbstractTool.java    | 107 ++++++--
 .../brooklyn/util/internal/ssh/ShellTool.java   |   4 +-
 .../util/internal/ssh/sshj/SshjTool.java        | 251 +++++++++++--------
 .../sshj/SshjToolAsyncStubIntegrationTest.java  | 169 +++++++++++++
 .../ssh/sshj/SshjToolIntegrationTest.java       |  15 +-
 6 files changed, 411 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java b/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
index 0ce1b54..d8aa66e 100644
--- a/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
+++ b/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
@@ -84,6 +84,8 @@ public class BrooklynFeatureEnablement {
      */
     public static final String FEATURE_INFER_CATALOG_ITEM_ON_REBIND = "brooklyn.backwardCompatibility.feature.inferCatalogItemOnRebind";
     
+    public static final String FEATURE_SSH_ASYNC_EXEC = "brooklyn.experimental.feature.ssh.asyncExec";
+
     private static final Map<String, Boolean> FEATURE_ENABLEMENTS = Maps.newLinkedHashMap();
 
     private static final Object MUTEX = new Object();
@@ -102,6 +104,7 @@ public class BrooklynFeatureEnablement {
         setDefault(FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, false);
         setDefault(FEATURE_RENAME_THREADS, false);
         setDefault(FEATURE_INFER_CATALOG_ITEM_ON_REBIND, true);
+        setDefault(FEATURE_SSH_ASYNC_EXEC, true);
     }
     
     static {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
index 5b3370d..41dc886 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
@@ -43,8 +43,10 @@ import brooklyn.util.ssh.BashCommands;
 import brooklyn.util.text.Identifiers;
 import brooklyn.util.text.StringEscapes.BashStringEscapes;
 import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 
@@ -223,6 +225,7 @@ public abstract class ShellAbstractTool implements ShellTool {
         protected final Boolean noDeleteAfterExec;
         protected final String scriptNameWithoutExtension;
         protected final String scriptPath;
+        protected final Duration execTimeout;
 
         public ToolAbstractExecScript(Map<String,?> props) {
             this.props = props;
@@ -234,6 +237,7 @@ public abstract class ShellAbstractTool implements ShellTool {
             this.runAsRoot = getOptionalVal(props, PROP_RUN_AS_ROOT);
             this.noExtraOutput = getOptionalVal(props, PROP_NO_EXTRA_OUTPUT);
             this.noDeleteAfterExec = getOptionalVal(props, PROP_NO_DELETE_SCRIPT);
+            this.execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT);
             
             String summary = getOptionalVal(props, PROP_SUMMARY);
             if (summary!=null) {
@@ -294,11 +298,11 @@ public abstract class ShellAbstractTool implements ShellTool {
          * note that some modes require \$RESULT passed in order to access a variable, whereas most just need $ */
         protected List<String> buildRunScriptCommand() {
             // TODO 
-            String touchCmd = String.format("touch %s; touch %s; touch %s; touch %s", stdoutPath, stderrPath, exitStatusPath, pidPath);
+            String touchCmd = String.format("touch %s %s %s %s", stdoutPath, stderrPath, exitStatusPath, pidPath);
             String cmd = String.format("( %s > %s 2> %s < /dev/null ; echo $? > %s ) & disown", scriptPath, stdoutPath, stderrPath, exitStatusPath);
             MutableList.Builder<String> cmds = MutableList.<String>builder()
-                    .add((runAsRoot ? BashCommands.sudo(touchCmd) : touchCmd))
-                    .add((runAsRoot ? BashCommands.sudo(cmd) : cmd))
+                    .add(runAsRoot ? BashCommands.sudo(touchCmd) : touchCmd)
+                    .add(runAsRoot ? BashCommands.sudo(cmd) : cmd)
                     .add("echo $! > "+pidPath)
                     .add("RESULT=$?");
             if (noExtraOutput==null || !noExtraOutput) {
@@ -312,25 +316,29 @@ public abstract class ShellAbstractTool implements ShellTool {
          * Builds the command to retrieve the exit status of the command, written to stdout.
          */
         protected List<String> buildRetrieveStatusCommand() {
-            String cmd = 
-                    "if test -s "+exitStatusPath+"; then"+"\n"+
-                    "    cat "+exitStatusPath+"\n"+
-                    "elif test -s "+pidPath+"; then"+"\n"+
-                    "    pid=`cat "+pidPath+"`"+"\n"+
-                    "    if ! ps -p $pid > /dev/null < /dev/null; then"+"\n"+
-                    "        # no exit status, and not executing; give a few seconds grace in case just about to write exit status"+"\n"+
-                    "        sleep 3"+"\n"+
-                    "        if test -s "+exitStatusPath+"; then"+"\n"+
-                    "            cat "+exitStatusPath+""+"\n"+
-                    "        else"+"\n"+
-                    "            echo \"No exit status in a.exitstatus, and pid in a.pid ($pid) not executing\""+"\n"+
-                    "            exit 1"+"\n"+
-                    "        fi"+"\n"+
-                    "    fi"+"\n"+
-                    "else"+"\n"+
-                    "    echo \"No exit status in "+exitStatusPath+", and "+pidPath+" is empty\""+"\n"+
-                    "    exit 1"+"\n"+
-                    "fi"+"\n";
+            // Retrieve exit status from file (writtent to stdout), if populated;
+            // if not found and pid still running, then return empty string; else exit code 1.
+            List<String> cmdParts = ImmutableList.of(
+                    "# Retrieve status", // comment is to aid testing - see SshjToolAsyncStubIntegrationTest
+                    "if test -s "+exitStatusPath+"; then",
+                    "    cat "+exitStatusPath,
+                    "elif test -s "+pidPath+"; then",
+                    "    pid=`cat "+pidPath+"`",
+                    "    if ! ps -p $pid > /dev/null < /dev/null; then",
+                    "        # no exit status, and not executing; give a few seconds grace in case just about to write exit status",
+                    "        sleep 3",
+                    "        if test -s "+exitStatusPath+"; then",
+                    "            cat "+exitStatusPath+"",
+                    "        else",
+                    "            echo \"No exit status in "+exitStatusPath+", and pid in "+pidPath+" ($pid) not executing\"",
+                    "            exit 1",
+                    "        fi",
+                    "    fi",
+                    "else",
+                    "    echo \"No exit status in "+exitStatusPath+", and "+pidPath+" is empty\"",
+                    "    exit 1",
+                    "fi"+"\n");
+            String cmd = Joiner.on("\n").join(cmdParts);
 
             MutableList.Builder<String> cmds = MutableList.<String>builder()
                     .add((runAsRoot ? BashCommands.sudo(cmd) : cmd))
@@ -355,16 +363,61 @@ public abstract class ShellAbstractTool implements ShellTool {
             return cmds.build();
         }
 
+        /**
+         * Builds the command to retrieve the stdout and stderr of the async command.
+         * An offset can be given, to only retrieve data starting at a particular character (indexed from 0).
+         */
+        protected List<String> buildLongPollCommand(int stdoutPosition, int stderrPosition, Duration timeout) {
+            // Note that `tail -c +1` means start at the *first* character (i.e. start counting from 1, not 0)
+            // TODO Relies on commands terminating when ssh connection dropped (because not run with `nohup`)
+            String catStdoutCmd = "tail -c +"+(stdoutPosition+1)+" -f "+stdoutPath+" &";
+            String catStderrCmd = "tail -c +"+(stderrPosition+1)+" -f "+stderrPath+" 1>&2 &";
+            long maxTime = Math.max(1, timeout.toSeconds());
+            
+            List<String> waitForExitStatusParts = ImmutableList.of(
+                    "# Long poll", // comment is to aid testing - see SshjToolAsyncStubIntegrationTest
+                    "EXIT_STATUS_PATH="+exitStatusPath,
+                    "PID_PATH="+pidPath,
+                    "MAX_TIME="+maxTime,
+                    "COUNTER=0",
+                    "while [ \"$COUNTER\" -lt $MAX_TIME ]; do",
+                    "    if test -s $EXIT_STATUS_PATH; then",
+                    "        EXIT_STATUS=`cat $EXIT_STATUS_PATH`",
+                    "        exit $EXIT_STATUS",
+                    "    elif test -s $PID_PATH; then",
+                    "        PID=`cat $PID_PATH`",
+                    "        if ! ps -p $PID > /dev/null < /dev/null; then",
+                    "            # no exit status, and not executing; give a few seconds grace in case just about to write exit status",
+                    "            sleep 3",
+                    "            if test -s $EXIT_STATUS_PATH; then",
+                    "                EXIT_STATUS=`cat $EXIT_STATUS_PATH`",
+                    "                exit $EXIT_STATUS",
+                    "            else",
+                    "                echo \"No exit status in $EXIT_STATUS_PATH, and pid in $PID_PATH ($PID) not executing\"",
+                    "                exit 126",
+                    "            fi",
+                    "        fi",
+                    "    fi",
+                    "    # No exit status in $EXIT_STATUS_PATH; keep waiting",
+                    "    sleep 1",
+                    "    COUNTER+=1",
+                    "done",
+                    "exit 125"+"\n");
+            String waitForExitStatus = Joiner.on("\n").join(waitForExitStatusParts);
+
+            MutableList.Builder<String> cmds = MutableList.<String>builder()
+                    .add(runAsRoot ? BashCommands.sudo(catStdoutCmd) : catStdoutCmd)
+                    .add(runAsRoot ? BashCommands.sudo(catStderrCmd) : catStderrCmd)
+                    .add(runAsRoot ? BashCommands.sudo(waitForExitStatus) : waitForExitStatus);
+            return cmds.build();
+        }
+
         protected List<String> deleteTemporaryFilesCommand() {
             if (!Boolean.TRUE.equals(noDeleteAfterExec)) {
                 // use "-f" because some systems have "rm" aliased to "rm -i"
                 // use "< /dev/null" to guarantee doesn't hang
                 return ImmutableList.of(
-                        "rm -f "+scriptPath+" < /dev/null",
-                        "rm -f "+stdoutPath+" < /dev/null",
-                        "rm -f "+stderrPath+" < /dev/null",
-                        "rm -f "+exitStatusPath+" < /dev/null",
-                        "rm -f "+pidPath+" < /dev/null");
+                        "rm -f "+scriptPath+" "+stdoutPath+" "+stderrPath+" "+exitStatusPath+" "+pidPath+" < /dev/null");
             } else {
                 return ImmutableList.<String>of();
             }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
index 5cd392b..0555039 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
@@ -59,9 +59,11 @@ public interface ShellTool {
 
     ConfigKey<String> PROP_SUMMARY = ConfigKeys.newStringConfigKey("summary", "Provides a human-readable summary, used in file generation etc");
     
+    ConfigKey<Duration> PROP_EXEC_TIMEOUT = newConfigKey("execTimeout", "Timeout when executing a script", Duration.PRACTICALLY_FOREVER);
+
     ConfigKey<Boolean> PROP_EXEC_ASYNC = newConfigKey("execAsync", "Executes the script asynchronously, and then polls for the result (and for stdout/stderr)", false);
 
-    ConfigKey<Duration> PROP_EXEC_ASYNC_TIMEOUT = newConfigKey("execAsyncTimeout", "Timeout when executing a script asynchronously", Duration.PRACTICALLY_FOREVER);
+    ConfigKey<Duration> PROP_EXEC_ASYNC_POLLING_TIMEOUT = newConfigKey("execAsyncPollTimeout", "Timeout per poll when executing a script asynchronously", Duration.ONE_MINUTE);
 
     /**
      * Executes the set of commands in a shell script. Blocks until completion.

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
index 99debff..154c433 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
@@ -33,6 +33,7 @@ import java.io.OutputStream;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -52,11 +53,13 @@ import org.apache.commons.io.input.ProxyInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import brooklyn.internal.BrooklynFeatureEnablement;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.internal.ssh.BackoffLimitedRetryHandler;
 import brooklyn.util.internal.ssh.SshAbstractTool;
 import brooklyn.util.internal.ssh.SshTool;
 import brooklyn.util.io.FileUtil;
+import brooklyn.util.repeat.Repeater;
 import brooklyn.util.stream.KnownSizeInputStream;
 import brooklyn.util.stream.StreamGobbler;
 import brooklyn.util.stream.Streams;
@@ -72,6 +75,7 @@ import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.CountingOutputStream;
 import com.google.common.net.HostAndPort;
 import com.google.common.primitives.Ints;
 
@@ -312,7 +316,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
     @Override
     public int execScript(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
         Boolean execAsync = getOptionalVal(props, PROP_EXEC_ASYNC);
-        if (Boolean.TRUE.equals(execAsync)) {
+        if (Boolean.TRUE.equals(execAsync) && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC)) {
             return execScriptAsyncAndPoll(props, commands, env);
         } else {
             return new ToolAbstractExecScript(props) {
@@ -320,8 +324,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
                     String scriptContents = toScript(props, commands, env);
                     if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as script: {}", host, scriptContents);
                     copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
-                    
-                    return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);                
+                    return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1);
                 }
             }.run();
         }
@@ -329,114 +332,137 @@ public class SshjTool extends SshAbstractTool implements SshTool {
 
     protected int execScriptAsyncAndPoll(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
         return new ToolAbstractAsyncExecScript(props) {
+            private int maxConsecutiveSshFailures = 3;
+            private Duration maxDelayBetweenPolls = Duration.seconds(20);
+            private Duration pollTimeout = getOptionalVal(props, PROP_EXEC_ASYNC_POLLING_TIMEOUT, Duration.FIVE_MINUTES);
+            private int iteration = 0;
+            private int consecutiveSshFailures = 0;
+            private int stdoutCount = 0;
+            private int stderrCount = 0;
+            private Stopwatch timer;
+            
             public int run() {
+                timer = Stopwatch.createStarted();
                 String scriptContents = toScript(props, commands, env);
                 if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as async script: {}", host, scriptContents);
                 copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
                 
                 // Execute script asynchronously
-                int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);
+                int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1);
                 if (execResult != 0) return execResult;
 
-                // Poll repeatedly to get the status
-                Duration initialDelayBetweenPolls = Duration.ONE_SECOND;
-                Duration maxDelayBetweenPolls = Duration.seconds(20);
-                double multiplierBetweenPolls = 1.2;
-                int maxConsecutiveSshFailures = 3;
-                Duration delayBetweenPolls = initialDelayBetweenPolls;
-                Stopwatch executionStartTime = Stopwatch.createStarted();
-                Duration timeout = getOptionalVal(props, PROP_EXEC_ASYNC_TIMEOUT);
-                if (timeout == null) timeout = Duration.PRACTICALLY_FOREVER;
-                int stdoutCount = 0;
-                int stderrCount = 0;
-                int iteration = 0;
-                int consecutiveSshFailures = 0;
+                // Long polling to get the status
                 try {
-                    do {
-                        // Retrieve the exit status of the async script
-                        ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
-                        ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
-                        int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr)), -1);
-    
-                        // Retrieve the stdout + stderr of the async script.
-                        // Always do this *after* retrieving exit status, so don't miss anything.
-                        if (out != null || err != null) {
-                            ByteArrayOutputStream streamsOut = new ByteArrayOutputStream();
-                            ByteArrayOutputStream streamsErr = new ByteArrayOutputStream();
-                            int streamsResult = asInt(acquire(new ShellAction(buildRetrieveStdoutAndStderrCommand(stdoutCount, stderrCount), streamsOut, streamsErr)), -1);
-                            
-                            if (streamsResult == 0) {
-                                stdoutCount += streamsOut.size();
-                                stderrCount += streamsErr.size();
-                            } else {
-                                if (out != null) out.write(toUTF8ByteArray("retrieving stdout/stderr failed with exit code "+streamsResult+" (stdout follow)"));
-                                if (err != null) err.write(toUTF8ByteArray("retrieving stdout/stderr failed with exit code "+streamsResult+" (stderr follow)"));
-                            }
-                            if (out != null) out.write(streamsOut.toByteArray());
-                            if (err != null) err.write(streamsErr.toByteArray());
-                        }
+                    final AtomicReference<Integer> result = new AtomicReference<Integer>();
+                    boolean success = Repeater.create("async script on "+SshjTool.this.toString()+" (for "+getSummary()+")")
+                            .backoffTo(maxDelayBetweenPolls)
+                            .limitTimeTo(execTimeout)
+                            .until(new Callable<Boolean>() {
+                                @Override
+                                public Boolean call() throws Exception {
+                                    iteration++;
+                                    if (LOG.isDebugEnabled()) LOG.debug("Doing long-poll (iteration="+iteration+") for async script to complete on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                                    Integer exitstatus = longPoll();
+                                    result.set(exitstatus);
+                                    return exitstatus != null;
+                                }})
+                            .run();
+                    
+                    if (!success) {
+                        // Timed out
+                        String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
+                        LOG.warn(msg+"; rethrowing");
+                        throw new TimeoutException(msg);
+                    }
 
-                        if (statusResult != 0) {
-                            // Failed to get the exit status; if it was -1 (i.e. connection error) then consider retrying; otherwise abort.
-                            if (out != null) {
-                                out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
-                                out.write(statusOut.toByteArray());
-                            }
-                            if (err != null) {
-                                err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
-                                err.write(statusErr.toByteArray());
-                            }
-                            if (statusResult == -1) {
-                                // Looks like an ssh timeout/connection error; we are willing to retry
-                                consecutiveSshFailures++;
-                                if (consecutiveSshFailures > maxConsecutiveSshFailures) {
-                                    LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")"+"\n"
-                                            +"\t"+"EXIT STATUS: -1"+"\n"
-                                            +"\t"+"STDERR: "+new String(statusErr.toByteArray())+"\n"
-                                            +"\t"+"STDOUT: "+new String(statusOut.toByteArray())+"\n");
-                                    return -1;
-                                } else {
-                                    LOG.info("ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")"+"\n"
-                                            +"\t"+"EXIT STATUS: -1"+"\n"
-                                            +"\t"+"STDERR: "+new String(statusErr.toByteArray())+"\n"
-                                            +"\t"+"STDOUT: "+new String(statusOut.toByteArray())+"\n");
-                                }
-                            } else {
-                                return statusResult;
-                            }
-                        } else {
-                            consecutiveSshFailures = 0;
-                            String statusOutStr = new String(statusOut.toByteArray());
-                            if (Strings.isNonBlank(statusOutStr)) {
-                                return Integer.parseInt(statusOutStr.trim());
-                            }
-                        }
-                        
-                        if (iteration == 0) {
-                            if (LOG.isDebugEnabled()) LOG.debug("Waiting for async script to complete on "+SshjTool.this.toString()+" (polling periodically for "+getSummary()+")");
-                        } else {
-                            if (LOG.isTraceEnabled()) LOG.trace("Still waiting for async script to complete on "+SshjTool.this.toString()+" (polling periodically for "+getSummary()+"; iteration="+iteration+")");
-                        }
-                        Time.sleep(delayBetweenPolls);
-                        delayBetweenPolls = Duration.min(maxDelayBetweenPolls, delayBetweenPolls.multiply(multiplierBetweenPolls));
-                        iteration++;
-                        
-                    } while (timeout.isLongerThan(Duration.millis(executionStartTime.elapsed(TimeUnit.MILLISECONDS))));
+                    return result.get();
                     
-                    // Timed out
-                    String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
-                    LOG.warn(msg+"; rethrowing");
-                    throw new TimeoutException(msg);
                 } catch (Exception e) {
                     throw Exceptions.propagate(e);
                 }
             }
+            Integer longPoll() throws IOException {
+                // Long-polling to get stdout, stderr + exit status of async task.
+                // If our long-poll disconnects, we will just re-execute.
+                // We wrap the stdout/stderr so that we can get the size count. 
+                // If we disconnect, we will pick up from that char of the stream.
+                // TODO Additional stdout/stderr written by buildLongPollCommand() could interfere, 
+                //      causing us to miss some characters.
+                // TODO Want to include timeout for long-polling
+                Duration nextPollTimeout = Duration.min(pollTimeout, Duration.millis(execTimeout.toMilliseconds()-timer.elapsed(TimeUnit.MILLISECONDS)));
+                CountingOutputStream countingOut = (out == null) ? null : new CountingOutputStream(out);
+                CountingOutputStream countingErr = (err == null) ? null : new CountingOutputStream(err);
+                int longPollResult = asInt(acquire(new ShellAction(buildLongPollCommand(stdoutCount, stderrCount, nextPollTimeout), countingOut, countingErr, nextPollTimeout)), -1);
+                stdoutCount += (countingOut == null) ? 0 : countingOut.getCount();
+                stderrCount += (countingErr == null) ? 0 : countingErr.getCount();
+                
+                if (longPollResult == 0) {
+                    return longPollResult; // success
+                    
+                } else if (longPollResult == -1) {
+                    // probably a connection failure; try again
+                    return null;
+                    
+                } else {
+                    // want to double-check whether this is the exit-code from the async process, or
+                    // some unexpected failure in our long-poll command.
+                    Integer result = retrieveStatusCommand();
+                    if (result != null) {
+                        return result;
+                    }
+                }
+                    
+                consecutiveSshFailures++;
+                if (consecutiveSshFailures > maxConsecutiveSshFailures) {
+                    LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors (return -1) when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
+                    return -1;
+                } else {
+                    LOG.info("Retrying after ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
+                    return null;
+                }
+            }
+            Integer retrieveStatusCommand() throws IOException {
+                // want to double-check whether this is the exit-code from the async process, or
+                // some unexpected failure in our long-poll command.
+                ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
+                ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
+                int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr, execTimeout)), -1);
+                
+                if (statusResult == 0) {
+                    // The status we retrieved really is valid; return it.
+                    // TODO How to ensure no additional output in stdout/stderr when parsing below?
+                    String statusOutStr = new String(statusOut.toByteArray()).trim();
+                    if (Strings.isEmpty(statusOutStr)) {
+                        // suggests not yet completed; will retry with long-poll
+                        return null;
+                    } else {
+                        int result = Integer.parseInt(statusOutStr);
+                        return result;
+                    }
+
+                } else if (statusResult == -1) {
+                    // probably a connection failure; try again with long-poll
+                    return null;
+                    
+                } else {
+                    if (out != null) {
+                        out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
+                        out.write(statusOut.toByteArray());
+                    }
+                    if (err != null) {
+                        err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
+                        err.write(statusErr.toByteArray());
+                    }
+                    
+                    return statusResult;
+                }
+            }
         }.run();
     }
-
     public int execShellDirect(Map<String,?> props, List<String> commands, Map<String,?> env) {
         OutputStream out = getOptionalVal(props, PROP_OUT_STREAM);
         OutputStream err = getOptionalVal(props, PROP_ERR_STREAM);
+        Duration execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT);
         
         List<String> cmdSequence = toCommandSequence(commands, env);
         List<String> allcmds = ImmutableList.<String>builder()
@@ -447,7 +473,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
         
         if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {}: {}", host, allcmds);
         
-        Integer result = acquire(new ShellAction(allcmds, out, err));
+        Integer result = acquire(new ShellAction(allcmds, out, err, execTimeout));
         if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} completed: return status {}", host, result);
         return asInt(result, -1);
     }
@@ -460,6 +486,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
         OutputStream out = getOptionalVal(props, PROP_OUT_STREAM);
         OutputStream err = getOptionalVal(props, PROP_ERR_STREAM);
         String separator = getOptionalVal(props, PROP_SEPARATOR);
+        Duration execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT);
 
         List<String> allcmds = toCommandSequence(commands, env);
         String singlecmd = Joiner.on(separator).join(allcmds);
@@ -470,7 +497,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
         
         if (LOG.isTraceEnabled()) LOG.trace("Running command at {}: {}", host, singlecmd);
         
-        Command result = acquire(new ExecAction(singlecmd, out, err));
+        Command result = acquire(new ExecAction(singlecmd, out, err, execTimeout));
         if (LOG.isTraceEnabled()) LOG.trace("Running command at {} completed: exit code {}", host, result.getExitStatus());
         // can be null if no exit status is received (observed on kill `ps aux | grep thing-to-grep-for | awk {print $2}`
         if (result.getExitStatus()==null) LOG.warn("Null exit status running at {}: {}", host, singlecmd);
@@ -722,18 +749,23 @@ public class SshjTool extends SshAbstractTool implements SshTool {
 
     class ExecAction implements SshAction<Command> {
         private final String command;
+        private final OutputStream out;
+        private final OutputStream err;
+        private final Duration timeout;
         
         private Session session;
         private Shell shell;
         private StreamGobbler outgobbler;
         private StreamGobbler errgobbler;
-        private OutputStream out;
-        private OutputStream err;
-
-        ExecAction(String command, OutputStream out, OutputStream err) {
+        
+        ExecAction(String command, OutputStream out, OutputStream err, Duration timeout) {
             this.command = checkNotNull(command, "command");
             this.out = out;
             this.err = err;
+            Duration sessionTimeout = (sshClientConnection.getSessionTimeout() == 0) 
+                    ? Duration.PRACTICALLY_FOREVER 
+                    : Duration.millis(sshClientConnection.getSessionTimeout());
+            this.timeout = (timeout == null) ? sessionTimeout : Duration.min(timeout, sessionTimeout);
         }
 
         @Override
@@ -762,7 +794,8 @@ public class SshjTool extends SshAbstractTool implements SshTool {
                     errgobbler.start();
                 }
                 try {
-                    output.join(sshClientConnection.getSessionTimeout(), TimeUnit.MILLISECONDS);
+                    
+                    output.join((int)Math.min(timeout.toMilliseconds(), Integer.MAX_VALUE), TimeUnit.MILLISECONDS);
                     return output;
                     
                 } finally {
@@ -788,19 +821,27 @@ public class SshjTool extends SshAbstractTool implements SshTool {
     }
 
     class ShellAction implements SshAction<Integer> {
-        private final List<String> commands;
+        @VisibleForTesting
+        final List<String> commands;
+        @VisibleForTesting
+        final OutputStream out;
+        @VisibleForTesting
+        final OutputStream err;
         
         private Session session;
         private Shell shell;
         private StreamGobbler outgobbler;
         private StreamGobbler errgobbler;
-        private OutputStream out;
-        private OutputStream err;
+        private Duration timeout;
 
-        ShellAction(List<String> commands, OutputStream out, OutputStream err) {
+        ShellAction(List<String> commands, OutputStream out, OutputStream err, Duration timeout) {
             this.commands = checkNotNull(commands, "commands");
             this.out = out;
             this.err = err;
+            Duration sessionTimeout = (sshClientConnection.getSessionTimeout() == 0) 
+                    ? Duration.PRACTICALLY_FOREVER 
+                    : Duration.millis(sshClientConnection.getSessionTimeout());
+            this.timeout = (timeout == null) ? sessionTimeout : Duration.min(timeout, sessionTimeout);
         }
 
         @Override
@@ -854,8 +895,8 @@ public class SshjTool extends SshAbstractTool implements SshTool {
                 closeWhispering(output, this);
                 
                 try {
-                    long timeout = sshClientConnection.getSessionTimeout();
-                    long timeoutEnd = System.currentTimeMillis() + timeout;
+                    long timeoutMillis = Math.min(timeout.toMilliseconds(), Integer.MAX_VALUE);
+                    long timeoutEnd = System.currentTimeMillis() + timeoutMillis;
                     Exception last = null;
                     do {
                         if (!shell.isOpen() && ((SessionChannel)session).getExitStatus()!=null)
@@ -874,7 +915,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
                             // which is nohupped to really be in the background (#162)
                             // now let's bail out
                             break;
-                    } while (timeout<=0 || System.currentTimeMillis() < timeoutEnd);
+                    } while (System.currentTimeMillis() < timeoutEnd);
                     if (shell.isOpen() && ((SessionChannel)session).getExitStatus()==null) {
                         LOG.debug("Timeout ({}) in SSH shell to {}", sshClientConnection.getSessionTimeout(), this);
                         // we timed out, or other problem -- reproduce the error

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
new file mode 100644
index 0000000..2acb200
--- /dev/null
+++ b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.util.internal.ssh.sshj;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.internal.ssh.sshj.SshjTool.ShellAction;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/**
+ * Tests for async-exec with {@link SshjTool}, where it stubs out the actual ssh commands
+ * to return a controlled sequence of responses.
+ */
+public class SshjToolAsyncStubIntegrationTest {
+
+    static class InjectedResult {
+        Predicate<SshjTool.ShellAction> expected;
+        Function<SshjTool.ShellAction, Integer> result;
+        
+        InjectedResult(Predicate<SshjTool.ShellAction> expected, Function<SshjTool.ShellAction, Integer> result) {
+            this.expected = expected;
+            this.result = result;
+        }
+    }
+    
+    private SshjTool tool;
+    private List<InjectedResult> sequence;
+    int counter = 0;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        sequence = Lists.newArrayList();
+        counter = 0;
+        
+        tool = new SshjTool(ImmutableMap.<String,Object>of("host", "localhost")) {
+            @SuppressWarnings("unchecked")
+            protected <T, C extends SshAction<T>> T acquire(C action) {
+                if (action instanceof SshjTool.ShellAction) {
+                    SshjTool.ShellAction shellAction = (SshjTool.ShellAction) action;
+                    InjectedResult injectedResult = sequence.get(counter);
+                    assertTrue(injectedResult.expected.apply(shellAction), "counter="+counter+"; cmds="+shellAction.commands);
+                    counter++;
+                    return (T) injectedResult.result.apply(shellAction);
+                }
+                return super.acquire(action);
+            }
+        };
+    }
+
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (tool != null) tool.disconnect();
+    }
+    
+    private Predicate<SshjTool.ShellAction> containsCmd(final String cmd) {
+        return new Predicate<SshjTool.ShellAction>() {
+            @Override public boolean apply(ShellAction input) {
+                return input != null && input.commands.toString().contains(cmd);
+            }
+        };
+    }
+    
+    private Function<SshjTool.ShellAction, Integer> returning(final int result, final String stdout, final String stderr) {
+        return new Function<SshjTool.ShellAction, Integer>() {
+            @Override public Integer apply(ShellAction input) {
+                try {
+                    if (stdout != null && input.out != null) input.out.write(stdout.getBytes());
+                    if (stderr != null && input.err != null) input.err.write(stderr.getBytes());
+                } catch (IOException e) {
+                    throw Exceptions.propagate(e);
+                }
+                return result;
+            }
+        };
+    }
+    
+    @Test(groups="Integration")
+    public void testPolls() throws Exception {
+        sequence = ImmutableList.of(
+                new InjectedResult(containsCmd("& disown"), returning(0, "", "")),
+                new InjectedResult(containsCmd("# Long poll"), returning(0, "mystringToStdout", "mystringToStderr")));
+
+        runTest(0, "mystringToStdout", "mystringToStderr");
+        assertEquals(counter, sequence.size());
+    }
+    
+    @Test(groups="Integration")
+    public void testPollsAndReturnsNonZeroExitCode() throws Exception {
+        sequence = ImmutableList.of(
+                new InjectedResult(containsCmd("& disown"), returning(0, "", "")),
+                new InjectedResult(containsCmd("# Long poll"), returning(123, "mystringToStdout", "mystringToStderr")),
+                new InjectedResult(containsCmd("# Retrieve status"), returning(0, "123", "")));
+
+        runTest(123, "mystringToStdout", "mystringToStderr");
+        assertEquals(counter, sequence.size());
+    }
+    
+    @Test(groups="Integration")
+    public void testPollsRepeatedly() throws Exception {
+        sequence = ImmutableList.of(
+                new InjectedResult(containsCmd("& disown"), returning(0, "", "")),
+                new InjectedResult(containsCmd("# Long poll"), returning(125, "mystringToStdout", "mystringToStderr")),
+                new InjectedResult(containsCmd("# Retrieve status"), returning(0, "", "")),
+                new InjectedResult(containsCmd("# Long poll"), returning(125, "mystringToStdout2", "mystringToStderr2")),
+                new InjectedResult(containsCmd("# Retrieve status"), returning(0, "", "")),
+                new InjectedResult(containsCmd("# Long poll"), returning(-1, "mystringToStdout3", "mystringToStderr3")),
+                new InjectedResult(containsCmd("# Long poll"), returning(125, "mystringToStdout4", "mystringToStderr4")),
+                new InjectedResult(containsCmd("# Retrieve status"), returning(0, "", "")),
+                new InjectedResult(containsCmd("# Long poll"), returning(0, "mystringToStdout5", "mystringToStderr5")));
+
+        runTest(0,
+                "mystringToStdout"+"mystringToStdout2"+"mystringToStdout3"+"mystringToStdout4"+"mystringToStdout5",
+                "mystringToStderr"+"mystringToStderr2"+"mystringToStderr3"+"mystringToStderr4"+"mystringToStderr5");
+        assertEquals(counter, sequence.size());
+    }
+    
+    protected void runTest(int expectedExit, String expectedStdout, String expectedStderr) throws Exception {
+        List<String> cmds = ImmutableList.of("abc");
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ByteArrayOutputStream err = new ByteArrayOutputStream();
+        int exitCode = tool.execScript(
+                ImmutableMap.of(
+                        "out", out, 
+                        "err", err, 
+                        SshjTool.PROP_EXEC_ASYNC.getName(), true, 
+                        SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true,
+                        SshjTool.PROP_EXEC_ASYNC_POLLING_TIMEOUT.getName(), Duration.ONE_MILLISECOND), 
+                cmds, 
+                ImmutableMap.<String,String>of());
+        String outStr = new String(out.toByteArray());
+        String errStr = new String(err.toByteArray());
+
+        assertEquals(exitCode, expectedExit);
+        assertEquals(outStr.trim(), expectedStdout);
+        assertEquals(errStr.trim(), expectedStderr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
index f5b8700..b81712d 100644
--- a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
+++ b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
@@ -176,15 +176,20 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         ByteArrayOutputStream err = new ByteArrayOutputStream();
         int exitCode = tool.execScript(
-                ImmutableMap.of("out", out, "err", err, SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true), 
+                ImmutableMap.of(
+                        "out", out, 
+                        "err", err, 
+                        SshjTool.PROP_EXEC_ASYNC.getName(), true, 
+                        SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true,
+                        SshjTool.PROP_EXEC_ASYNC_POLLING_TIMEOUT.getName(), Duration.ONE_SECOND), 
                 cmds, 
                 ImmutableMap.<String,String>of());
         String outStr = new String(out.toByteArray());
         String errStr = new String(err.toByteArray());
 
         assertEquals(exitCode, 0);
-        assertEquals("mystringToStdout\nmystringPostSleepToStdout", outStr.trim());
-        assertEquals("mystringToStderr\nmystringPostSleepToStderr", errStr.trim());
+        assertEquals(outStr.trim(), "mystringToStdout\nmystringPostSleepToStdout");
+        assertEquals(errStr.trim(), "mystringToStderr\nmystringPostSleepToStderr");
     }
 
     @Test(groups = {"Integration"})
@@ -201,7 +206,7 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
         Stopwatch stopwatch = Stopwatch.createStarted();
         try {
             tool.execScript(
-                ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_ASYNC_TIMEOUT.getName(), Duration.millis(1)), 
+                ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_TIMEOUT.getName(), Duration.millis(1)), 
                 ImmutableList.of("sleep 60"), 
                 ImmutableMap.<String,String>of());
             fail();
@@ -220,7 +225,7 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
             public void run() {
                 Stopwatch stopwatch = Stopwatch.createStarted();
                 int exitStatus = tool.execScript(
-                    ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_ASYNC_TIMEOUT.getName(), Duration.millis(1)), 
+                    ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_TIMEOUT.getName(), Duration.millis(1)), 
                     ImmutableList.of("sleep 63"), 
                     ImmutableMap.<String,String>of());