You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/12/23 16:13:07 UTC
[1/6] incubator-brooklyn git commit: Adds SshjTool exec script async
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master 2cfe9386d -> 4a5072e52
Adds SshjTool exec script async
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/f10f5713
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/f10f5713
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/f10f5713
Branch: refs/heads/master
Commit: f10f57137e9e5e05c113f243f202e3777bbe533f
Parents: 0faf4d9
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 25 21:42:33 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Dec 19 00:18:46 2014 +0000
----------------------------------------------------------------------
.../util/internal/ssh/ShellAbstractTool.java | 115 +++++++++++++++-
.../brooklyn/util/internal/ssh/ShellTool.java | 9 +-
.../util/internal/ssh/sshj/SshjTool.java | 135 ++++++++++++++++++-
.../ssh/sshj/SshjToolIntegrationTest.java | 90 +++++++++++++
4 files changed, 338 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f10f5713/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
index 56bff17..5b3370d 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
@@ -46,6 +46,7 @@ import brooklyn.util.text.Strings;
import brooklyn.util.time.Time;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
public abstract class ShellAbstractTool implements ShellTool {
@@ -220,6 +221,7 @@ public abstract class ShellAbstractTool implements ShellTool {
protected final Boolean runAsRoot;
protected final Boolean noExtraOutput;
protected final Boolean noDeleteAfterExec;
+ protected final String scriptNameWithoutExtension;
protected final String scriptPath;
public ToolAbstractExecScript(Map<String,?> props) {
@@ -239,10 +241,10 @@ public abstract class ShellAbstractTool implements ShellTool {
if (summary.length()>30)
summary = summary.substring(0,30);
}
- this.scriptPath = scriptDir+"/brooklyn-"+
- Time.makeDateStampString()+"-"+Identifiers.makeRandomId(4)+
- (summary==null ? "" : "-"+summary) +
- ".sh";
+ this.scriptNameWithoutExtension = "brooklyn-"+
+ Time.makeDateStampString()+"-"+Identifiers.makeRandomId(4)+
+ (summary==null ? "" : "-"+summary);
+ this.scriptPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension+".sh");
}
/** builds the command to run the given script;
@@ -262,7 +264,112 @@ public abstract class ShellAbstractTool implements ShellTool {
return cmds.build();
}
+ protected String getSummary() {
+ String summary = getOptionalVal(props, PROP_SUMMARY);
+ return (summary != null) ? summary : scriptPath;
+ }
+
public abstract int run();
}
+ protected abstract class ToolAbstractAsyncExecScript extends ToolAbstractExecScript {
+ protected final String stdoutPath;
+ protected final String stderrPath;
+ protected final String exitStatusPath;
+ protected final String pidPath;
+
+ public ToolAbstractAsyncExecScript(Map<String,?> props) {
+ super(props);
+
+ stdoutPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".stdout");
+ stderrPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".stderr");
+ exitStatusPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".exitstatus");
+ pidPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".pid");
+ }
+
+ /**
+ * Builds the command to run the given script, asynchronously.
+ * The executed command will return immediately, but the output from the script
+ * will continue to be written
+ * note that some modes require \$RESULT passed in order to access a variable, whereas most just need $ */
+ protected List<String> buildRunScriptCommand() {
+ // TODO
+ String touchCmd = String.format("touch %s; touch %s; touch %s; touch %s", stdoutPath, stderrPath, exitStatusPath, pidPath);
+ String cmd = String.format("( %s > %s 2> %s < /dev/null ; echo $? > %s ) & disown", scriptPath, stdoutPath, stderrPath, exitStatusPath);
+ MutableList.Builder<String> cmds = MutableList.<String>builder()
+ .add((runAsRoot ? BashCommands.sudo(touchCmd) : touchCmd))
+ .add((runAsRoot ? BashCommands.sudo(cmd) : cmd))
+ .add("echo $! > "+pidPath)
+ .add("RESULT=$?");
+ if (noExtraOutput==null || !noExtraOutput) {
+ cmds.add("echo Executing async "+scriptPath);
+ }
+ cmds.add("exit $RESULT");
+ return cmds.build();
+ }
+
+ /**
+ * Builds the command to retrieve the exit status of the command, written to stdout.
+ */
+ protected List<String> buildRetrieveStatusCommand() {
+ String cmd =
+ "if test -s "+exitStatusPath+"; then"+"\n"+
+ " cat "+exitStatusPath+"\n"+
+ "elif test -s "+pidPath+"; then"+"\n"+
+ " pid=`cat "+pidPath+"`"+"\n"+
+ " if ! ps -p $pid > /dev/null < /dev/null; then"+"\n"+
+ " # no exit status, and not executing; give a few seconds grace in case just about to write exit status"+"\n"+
+ " sleep 3"+"\n"+
+ " if test -s "+exitStatusPath+"; then"+"\n"+
+ " cat "+exitStatusPath+""+"\n"+
+ " else"+"\n"+
+ " echo \"No exit status in a.exitstatus, and pid in a.pid ($pid) not executing\""+"\n"+
+ " exit 1"+"\n"+
+ " fi"+"\n"+
+ " fi"+"\n"+
+ "else"+"\n"+
+ " echo \"No exit status in "+exitStatusPath+", and "+pidPath+" is empty\""+"\n"+
+ " exit 1"+"\n"+
+ "fi"+"\n";
+
+ MutableList.Builder<String> cmds = MutableList.<String>builder()
+ .add((runAsRoot ? BashCommands.sudo(cmd) : cmd))
+ .add("RESULT=$?");
+ cmds.add("exit $RESULT");
+ return cmds.build();
+ }
+
+ /**
+ * Builds the command to retrieve the stdout and stderr of the async command.
+ * An offset can be given, to only retrieve data starting at a particular character (indexed from 0).
+ */
+ protected List<String> buildRetrieveStdoutAndStderrCommand(int stdoutPosition, int stderrPosition) {
+ // Note that `tail -c +1` means start at the *first* character (i.e. start counting from 1, not 0)
+ String catStdoutCmd = "tail -c +"+(stdoutPosition+1)+" "+stdoutPath;
+ String catStderrCmd = "tail -c +"+(stderrPosition+1)+" "+stderrPath+" 1>&2";
+ MutableList.Builder<String> cmds = MutableList.<String>builder()
+ .add((runAsRoot ? BashCommands.sudo(catStdoutCmd) : catStdoutCmd))
+ .add((runAsRoot ? BashCommands.sudo(catStderrCmd) : catStderrCmd))
+ .add("RESULT=$?");
+ cmds.add("exit $RESULT");
+ return cmds.build();
+ }
+
+ protected List<String> deleteTemporaryFilesCommand() {
+ if (!Boolean.TRUE.equals(noDeleteAfterExec)) {
+ // use "-f" because some systems have "rm" aliased to "rm -i"
+ // use "< /dev/null" to guarantee doesn't hang
+ return ImmutableList.of(
+ "rm -f "+scriptPath+" < /dev/null",
+ "rm -f "+stdoutPath+" < /dev/null",
+ "rm -f "+stderrPath+" < /dev/null",
+ "rm -f "+exitStatusPath+" < /dev/null",
+ "rm -f "+pidPath+" < /dev/null");
+ } else {
+ return ImmutableList.<String>of();
+ }
+ }
+
+ public abstract int run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f10f5713/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
index 677d5c9..5cd392b 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
@@ -20,7 +20,7 @@ package brooklyn.util.internal.ssh;
import static brooklyn.entity.basic.ConfigKeys.newConfigKey;
import static brooklyn.entity.basic.ConfigKeys.newStringConfigKey;
-import java.io.File;
+
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
@@ -28,6 +28,7 @@ import java.util.Map;
import brooklyn.config.ConfigKey;
import brooklyn.entity.basic.ConfigKeys;
import brooklyn.util.os.Os;
+import brooklyn.util.time.Duration;
/** Methods for executing things in an environment (localhost process, or ssh) */
public interface ShellTool {
@@ -55,9 +56,13 @@ public interface ShellTool {
public static final ConfigKey<String> PROP_DIRECT_HEADER = newConfigKey("directHeader", "commands to run at the target before any caller-supplied commands for direct execution", "exec bash -e");
ConfigKey<Boolean> PROP_NO_DELETE_SCRIPT = newConfigKey("noDeleteAfterExec", "Retains the generated script file after executing the commands instead of deleting it", false);
-
+
ConfigKey<String> PROP_SUMMARY = ConfigKeys.newStringConfigKey("summary", "Provides a human-readable summary, used in file generation etc");
+ ConfigKey<Boolean> PROP_EXEC_ASYNC = newConfigKey("execAsync", "Executes the script asynchronously, and then polls for the result (and for stdout/stderr)", false);
+
+ ConfigKey<Duration> PROP_EXEC_ASYNC_TIMEOUT = newConfigKey("execAsyncTimeout", "Timeout when executing a script asynchronously", Duration.PRACTICALLY_FOREVER);
+
/**
* Executes the set of commands in a shell script. Blocks until completion.
* <p>
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f10f5713/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
index 2ad6a0a..99debff 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Throwables.getCausalChain;
import static com.google.common.collect.Iterables.any;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -33,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import net.schmizz.sshj.connection.ConnectionException;
@@ -47,7 +49,6 @@ import net.schmizz.sshj.transport.TransportException;
import net.schmizz.sshj.xfer.InMemorySourceFile;
import org.apache.commons.io.input.ProxyInputStream;
-import org.bouncycastle.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +60,8 @@ import brooklyn.util.io.FileUtil;
import brooklyn.util.stream.KnownSizeInputStream;
import brooklyn.util.stream.StreamGobbler;
import brooklyn.util.stream.Streams;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
import brooklyn.util.time.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -299,16 +302,134 @@ public class SshjTool extends SshAbstractTool implements SshTool {
*
* So on balance, the script-based approach seems most reliable, even if there is an overhead
* of separate message(s) for copying the file!
+ *
+ * Another consideration is long-running scripts. On some clouds when executing a script that takes
+ * several minutes, we have seen it fail with -1 (e.g. 1 in 20 times). This suggests the ssh connection
+ * is being dropped. To avoid this problem, we can execute the script asynchronously, writing to files
+ * the stdout/stderr/pid/exitStatus. We then periodically poll to retrieve the contents of these files.
+ * Use {@link #PROP_EXEC_ASYNC} to force this mode of execution.
*/
@Override
public int execScript(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
- return new ToolAbstractExecScript(props) {
+ Boolean execAsync = getOptionalVal(props, PROP_EXEC_ASYNC);
+ if (Boolean.TRUE.equals(execAsync)) {
+ return execScriptAsyncAndPoll(props, commands, env);
+ } else {
+ return new ToolAbstractExecScript(props) {
+ public int run() {
+ String scriptContents = toScript(props, commands, env);
+ if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as script: {}", host, scriptContents);
+ copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
+
+ return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);
+ }
+ }.run();
+ }
+ }
+
+ protected int execScriptAsyncAndPoll(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
+ return new ToolAbstractAsyncExecScript(props) {
public int run() {
String scriptContents = toScript(props, commands, env);
- if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as script: {}", host, scriptContents);
+ if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as async script: {}", host, scriptContents);
copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
- return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);
+ // Execute script asynchronously
+ int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);
+ if (execResult != 0) return execResult;
+
+ // Poll repeatedly to get the status
+ Duration initialDelayBetweenPolls = Duration.ONE_SECOND;
+ Duration maxDelayBetweenPolls = Duration.seconds(20);
+ double multiplierBetweenPolls = 1.2;
+ int maxConsecutiveSshFailures = 3;
+ Duration delayBetweenPolls = initialDelayBetweenPolls;
+ Stopwatch executionStartTime = Stopwatch.createStarted();
+ Duration timeout = getOptionalVal(props, PROP_EXEC_ASYNC_TIMEOUT);
+ if (timeout == null) timeout = Duration.PRACTICALLY_FOREVER;
+ int stdoutCount = 0;
+ int stderrCount = 0;
+ int iteration = 0;
+ int consecutiveSshFailures = 0;
+ try {
+ do {
+ // Retrieve the exit status of the async script
+ ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
+ ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
+ int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr)), -1);
+
+ // Retrieve the stdout + stderr of the async script.
+ // Always do this *after* retrieving exit status, so don't miss anything.
+ if (out != null || err != null) {
+ ByteArrayOutputStream streamsOut = new ByteArrayOutputStream();
+ ByteArrayOutputStream streamsErr = new ByteArrayOutputStream();
+ int streamsResult = asInt(acquire(new ShellAction(buildRetrieveStdoutAndStderrCommand(stdoutCount, stderrCount), streamsOut, streamsErr)), -1);
+
+ if (streamsResult == 0) {
+ stdoutCount += streamsOut.size();
+ stderrCount += streamsErr.size();
+ } else {
+ if (out != null) out.write(toUTF8ByteArray("retrieving stdout/stderr failed with exit code "+streamsResult+" (stdout follow)"));
+ if (err != null) err.write(toUTF8ByteArray("retrieving stdout/stderr failed with exit code "+streamsResult+" (stderr follow)"));
+ }
+ if (out != null) out.write(streamsOut.toByteArray());
+ if (err != null) err.write(streamsErr.toByteArray());
+ }
+
+ if (statusResult != 0) {
+ // Failed to get the exit status; if it was -1 (i.e. connection error) then consider retrying; otherwise abort.
+ if (out != null) {
+ out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
+ out.write(statusOut.toByteArray());
+ }
+ if (err != null) {
+ err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
+ err.write(statusErr.toByteArray());
+ }
+ if (statusResult == -1) {
+ // Looks like an ssh timeout/connection error; we are willing to retry
+ consecutiveSshFailures++;
+ if (consecutiveSshFailures > maxConsecutiveSshFailures) {
+ LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")"+"\n"
+ +"\t"+"EXIT STATUS: -1"+"\n"
+ +"\t"+"STDERR: "+new String(statusErr.toByteArray())+"\n"
+ +"\t"+"STDOUT: "+new String(statusOut.toByteArray())+"\n");
+ return -1;
+ } else {
+ LOG.info("ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")"+"\n"
+ +"\t"+"EXIT STATUS: -1"+"\n"
+ +"\t"+"STDERR: "+new String(statusErr.toByteArray())+"\n"
+ +"\t"+"STDOUT: "+new String(statusOut.toByteArray())+"\n");
+ }
+ } else {
+ return statusResult;
+ }
+ } else {
+ consecutiveSshFailures = 0;
+ String statusOutStr = new String(statusOut.toByteArray());
+ if (Strings.isNonBlank(statusOutStr)) {
+ return Integer.parseInt(statusOutStr.trim());
+ }
+ }
+
+ if (iteration == 0) {
+ if (LOG.isDebugEnabled()) LOG.debug("Waiting for async script to complete on "+SshjTool.this.toString()+" (polling periodically for "+getSummary()+")");
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("Still waiting for async script to complete on "+SshjTool.this.toString()+" (polling periodically for "+getSummary()+"; iteration="+iteration+")");
+ }
+ Time.sleep(delayBetweenPolls);
+ delayBetweenPolls = Duration.min(maxDelayBetweenPolls, delayBetweenPolls.multiply(multiplierBetweenPolls));
+ iteration++;
+
+ } while (timeout.isLongerThan(Duration.millis(executionStartTime.elapsed(TimeUnit.MILLISECONDS))));
+
+ // Timed out
+ String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
+ LOG.warn(msg+"; rethrowing");
+ throw new TimeoutException(msg);
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
}
}.run();
}
@@ -714,7 +835,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
for (CharSequence cmd : commands) {
try {
- output.write(Strings.toUTF8ByteArray(cmd+"\n"));
+ output.write(toUTF8ByteArray(cmd+"\n"));
output.flush();
} catch (ConnectionException e) {
if (!shell.isOpen()) {
@@ -784,6 +905,10 @@ public class SshjTool extends SshAbstractTool implements SshTool {
}
}
+ private byte[] toUTF8ByteArray(String string) {
+ return org.bouncycastle.util.Strings.toUTF8ByteArray(string);
+ }
+
private Supplier<InputStream> newInputStreamSupplier(final byte[] contents) {
return new Supplier<InputStream>() {
@Override public InputStream get() {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f10f5713/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
index 69b1351..f5b8700 100644
--- a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
+++ b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
@@ -19,6 +19,7 @@
package brooklyn.util.internal.ssh.sshj;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -28,18 +29,23 @@ import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import net.schmizz.sshj.connection.channel.direct.Session;
import org.testng.annotations.Test;
+import brooklyn.test.Asserts;
import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.internal.ssh.SshException;
import brooklyn.util.internal.ssh.SshTool;
import brooklyn.util.internal.ssh.SshToolAbstractIntegrationTest;
import brooklyn.util.os.Os;
+import brooklyn.util.time.Duration;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -157,7 +163,91 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
assertEquals(localtool3.getLocalTempDir(), new File(Os.tidyPath(customRelativeTempDir)));
}
+ @Test(groups = {"Integration"})
+ public void testAsyncExecStdoutAndStderr() throws Exception {
+ // Include a sleep, to ensure that the contents retrieved in first poll and subsequent polls are appended
+ List<String> cmds = ImmutableList.of(
+ "echo mystringToStdout",
+ "echo mystringToStderr 1>&2",
+ "sleep 3",
+ "echo mystringPostSleepToStdout",
+ "echo mystringPostSleepToStderr 1>&2");
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ int exitCode = tool.execScript(
+ ImmutableMap.of("out", out, "err", err, SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true),
+ cmds,
+ ImmutableMap.<String,String>of());
+ String outStr = new String(out.toByteArray());
+ String errStr = new String(err.toByteArray());
+ assertEquals(exitCode, 0);
+ assertEquals("mystringToStdout\nmystringPostSleepToStdout", outStr.trim());
+ assertEquals("mystringToStderr\nmystringPostSleepToStderr", errStr.trim());
+ }
+
+ @Test(groups = {"Integration"})
+ public void testAsyncExecReturnsExitCode() throws Exception {
+ int exitCode = tool.execScript(
+ ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true),
+ ImmutableList.of("exit 123"),
+ ImmutableMap.<String,String>of());
+ assertEquals(exitCode, 123);
+ }
+
+ @Test(groups = {"Integration"})
+ public void testAsyncExecTimesOut() throws Exception {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ tool.execScript(
+ ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_ASYNC_TIMEOUT.getName(), Duration.millis(1)),
+ ImmutableList.of("sleep 60"),
+ ImmutableMap.<String,String>of());
+ fail();
+ } catch (Exception e) {
+ TimeoutException te = Exceptions.getFirstThrowableOfType(e, TimeoutException.class);
+ if (te == null) throw e;
+ }
+
+ long seconds = stopwatch.elapsed(TimeUnit.SECONDS);
+ assertTrue(seconds < 30, "exec took "+seconds+" seconds");
+ }
+
+ @Test(groups = {"Integration"})
+ public void testAsyncExecAbortsIfProcessFails() throws Exception {
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ int exitStatus = tool.execScript(
+ ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_ASYNC_TIMEOUT.getName(), Duration.millis(1)),
+ ImmutableList.of("sleep 63"),
+ ImmutableMap.<String,String>of());
+
+ assertEquals(exitStatus, 1);
+
+ long seconds = stopwatch.elapsed(TimeUnit.SECONDS);
+ assertTrue(seconds < 30, "exec took "+seconds+" seconds");
+ }});
+ try {
+ thread.start();
+
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ int exitStatus = tool.execCommands(ImmutableMap.<String,Object>of(), ImmutableList.of("ps aux| grep \"sleep 63\" | grep -v grep"));
+ assertEquals(exitStatus, 0);
+ }});
+
+ tool.execCommands(ImmutableMap.<String,Object>of(), ImmutableList.of("ps aux| grep \"sleep 63\" | grep -v grep | awk '{print($2)}' | xargs kill"));
+
+ thread.join(30*1000);
+ assertFalse(thread.isAlive());
+ } finally {
+ thread.interrupt();
+ }
+ }
+
+
protected String execShellDirect(List<String> cmds) {
return execShellDirect(cmds, ImmutableMap.<String,Object>of());
}
[2/6] incubator-brooklyn git commit: install java: use async-ssh-exec
Posted by al...@apache.org.
install java: use async-ssh-exec
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/ee8d521c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/ee8d521c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/ee8d521c
Branch: refs/heads/master
Commit: ee8d521c17aae93c5865752ed16664b67e6da14f
Parents: 5c0cc78
Author: Aled Sage <al...@gmail.com>
Authored: Thu Dec 18 22:35:30 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Dec 19 00:18:47 2014 +0000
----------------------------------------------------------------------
.../brooklyn/entity/java/JavaSoftwareProcessSshDriver.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ee8d521c/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java b/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java
index 88dd631..3847261 100644
--- a/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java
+++ b/software/base/src/main/java/brooklyn/entity/java/JavaSoftwareProcessSshDriver.java
@@ -42,10 +42,12 @@ import brooklyn.util.collections.MutableMap;
import brooklyn.util.collections.MutableSet;
import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.flags.TypeCoercions;
+import brooklyn.util.internal.ssh.ShellTool;
import brooklyn.util.ssh.BashCommands;
import brooklyn.util.task.DynamicTasks;
import brooklyn.util.task.Tasks;
import brooklyn.util.task.ssh.SshTasks;
+import brooklyn.util.task.system.ProcessTaskFactory;
import brooklyn.util.task.system.ProcessTaskWrapper;
import brooklyn.util.text.StringEscapes.BashStringEscapes;
import brooklyn.util.text.Strings;
@@ -339,8 +341,9 @@ public abstract class JavaSoftwareProcessSshDriver extends AbstractSoftwareProce
getLocation().acquireMutex("installing", "installing Java at " + getLocation());
try {
log.debug("Installing Java {} at {}@{}", new Object[]{version, getEntity(), getLocation()});
- ProcessTaskWrapper<Integer> installCommand = Entities.submit(getEntity(),
- SshTasks.newSshExecTaskFactory(getLocation(), command));
+ ProcessTaskFactory<Integer> taskFactory = SshTasks.newSshExecTaskFactory(getLocation(), command)
+ .configure(ShellTool.PROP_EXEC_ASYNC, true);
+ ProcessTaskWrapper<Integer> installCommand = Entities.submit(getEntity(), taskFactory);
int result = installCommand.get();
if (result != 0) {
log.warn("Installation of Java {} failed at {}@{}: {}",
[6/6] incubator-brooklyn git commit: This closes #358
Posted by al...@apache.org.
This closes #358
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4a5072e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4a5072e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4a5072e5
Branch: refs/heads/master
Commit: 4a5072e52a790eeb73a4f8e789c2cb6e54c6c682
Parents: 2cfe938 63483f8
Author: Aled Sage <al...@gmail.com>
Authored: Tue Dec 23 15:12:46 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Dec 23 15:12:46 2014 +0000
----------------------------------------------------------------------
.../internal/BrooklynFeatureEnablement.java | 10 +
.../util/internal/ssh/ShellAbstractTool.java | 167 +++++++++++++-
.../brooklyn/util/internal/ssh/ShellTool.java | 11 +-
.../util/internal/ssh/sshj/SshjTool.java | 216 +++++++++++++++++--
.../internal/ssh/ShellToolAbstractTest.java | 5 +-
.../sshj/SshjToolAsyncStubIntegrationTest.java | 169 +++++++++++++++
.../ssh/sshj/SshjToolIntegrationTest.java | 95 ++++++++
.../java/JavaSoftwareProcessSshDriver.java | 7 +-
8 files changed, 648 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
[5/6] incubator-brooklyn git commit: ssh async-exec: incorporate
comments + logging
Posted by al...@apache.org.
ssh async-exec: incorporate comments + logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/63483f8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/63483f8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/63483f8e
Branch: refs/heads/master
Commit: 63483f8ea3611d6534ee27bea4debe085f488e4d
Parents: ee8d521
Author: Aled Sage <al...@gmail.com>
Authored: Tue Dec 23 14:10:02 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Dec 23 14:10:02 2014 +0000
----------------------------------------------------------------------
.../java/brooklyn/internal/BrooklynFeatureEnablement.java | 9 ++++++++-
.../brooklyn/util/internal/ssh/ShellAbstractTool.java | 1 -
.../java/brooklyn/util/internal/ssh/sshj/SshjTool.java | 10 ++++++++++
3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/63483f8e/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java b/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
index d8aa66e..9946ac7 100644
--- a/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
+++ b/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import brooklyn.internal.storage.BrooklynStorage;
import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.util.internal.ssh.ShellTool;
import com.google.common.annotations.Beta;
import com.google.common.collect.Maps;
@@ -84,6 +85,12 @@ public class BrooklynFeatureEnablement {
*/
public static final String FEATURE_INFER_CATALOG_ITEM_ON_REBIND = "brooklyn.backwardCompatibility.feature.inferCatalogItemOnRebind";
+ /**
+ * When executing over ssh, whether to support the "async exec" approach, or only the classic approach.
+ *
+ * If this feature is disabled, then even if the {@link ShellTool#PROP_EXEC_ASYNC} is configured it
+ * will still use the classic ssh approach.
+ */
public static final String FEATURE_SSH_ASYNC_EXEC = "brooklyn.experimental.feature.ssh.asyncExec";
private static final Map<String, Boolean> FEATURE_ENABLEMENTS = Maps.newLinkedHashMap();
@@ -104,7 +111,7 @@ public class BrooklynFeatureEnablement {
setDefault(FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, false);
setDefault(FEATURE_RENAME_THREADS, false);
setDefault(FEATURE_INFER_CATALOG_ITEM_ON_REBIND, true);
- setDefault(FEATURE_SSH_ASYNC_EXEC, true);
+ setDefault(FEATURE_SSH_ASYNC_EXEC, false);
}
static {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/63483f8e/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
index 41dc886..82624d7 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
@@ -297,7 +297,6 @@ public abstract class ShellAbstractTool implements ShellTool {
* will continue to be written
* note that some modes require \$RESULT passed in order to access a variable, whereas most just need $ */
protected List<String> buildRunScriptCommand() {
- // TODO
String touchCmd = String.format("touch %s %s %s %s", stdoutPath, stderrPath, exitStatusPath, pidPath);
String cmd = String.format("( %s > %s 2> %s < /dev/null ; echo $? > %s ) & disown", scriptPath, stdoutPath, stderrPath, exitStatusPath);
MutableList.Builder<String> cmds = MutableList.<String>builder()
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/63483f8e/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
index 154c433..e7ecd0c 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
@@ -319,6 +319,9 @@ public class SshjTool extends SshAbstractTool implements SshTool {
if (Boolean.TRUE.equals(execAsync) && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC)) {
return execScriptAsyncAndPoll(props, commands, env);
} else {
+ if (Boolean.TRUE.equals(execAsync)) {
+ if (LOG.isDebugEnabled()) LOG.debug("Ignoring ssh exec-async configuration, because feature is disabled");
+ }
return new ToolAbstractExecScript(props) {
public int run() {
String scriptContents = toScript(props, commands, env);
@@ -397,15 +400,18 @@ public class SshjTool extends SshAbstractTool implements SshTool {
stderrCount += (countingErr == null) ? 0 : countingErr.getCount();
if (longPollResult == 0) {
+ if (LOG.isDebugEnabled()) LOG.debug("Long-poll succeeded (exit status 0) on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return longPollResult; // success
} else if (longPollResult == -1) {
// probably a connection failure; try again
+ if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return null;
} else {
// want to double-check whether this is the exit-code from the async process, or
// some unexpected failure in our long-poll command.
+ if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")");
Integer result = retrieveStatusCommand();
if (result != null) {
return result;
@@ -434,14 +440,17 @@ public class SshjTool extends SshAbstractTool implements SshTool {
String statusOutStr = new String(statusOut.toByteArray()).trim();
if (Strings.isEmpty(statusOutStr)) {
// suggests not yet completed; will retry with long-poll
+ if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; command successful but no result available on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return null;
} else {
+ if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; returning '"+statusOutStr+"' on "+SshjTool.this.toString()+" (for "+getSummary()+")");
int result = Integer.parseInt(statusOutStr);
return result;
}
} else if (statusResult == -1) {
// probably a connection failure; try again with long-poll
+ if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status directly received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return null;
} else {
@@ -454,6 +463,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
err.write(statusErr.toByteArray());
}
+ if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status failed; returning "+statusResult+" on "+SshjTool.this.toString()+" (for "+getSummary()+")");
return statusResult;
}
}
[4/6] incubator-brooklyn git commit:
ShellToolAbstractTset.execCommands: remove duplication
Posted by al...@apache.org.
ShellToolAbstractTset.execCommands: remove duplication
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/32f92a4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/32f92a4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/32f92a4a
Branch: refs/heads/master
Commit: 32f92a4a657b677d2b5d6d951b1eef85425c9ccc
Parents: f10f571
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 25 21:44:06 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Dec 19 00:18:47 2014 +0000
----------------------------------------------------------------------
.../java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/32f92a4a/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java b/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java
index ec84e28..b8b394b 100644
--- a/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java
+++ b/core/src/test/java/brooklyn/util/internal/ssh/ShellToolAbstractTest.java
@@ -399,10 +399,7 @@ public abstract class ShellToolAbstractTest {
}
protected String execCommands(List<String> cmds, Map<String,?> env) {
- execCommands(null, cmds, env);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- tool.execCommands(ImmutableMap.of("out", out), cmds, env);
- return new String(out.toByteArray());
+ return execCommands(null, cmds, env);
}
protected String execCommands(ConfigBag config, List<String> cmds, Map<String,?> env) {
[3/6] incubator-brooklyn git commit: ssh-async: use long-polling
Posted by al...@apache.org.
ssh-async: use long-polling
- Polling ssh command executes until either command completes,
process no longer running, or timeout.
- Adds PROP_EXEC_TIMEOUT for control when ssh commands should timeout.
- Adds PROP_EXEC_ASYNC_POLLING_TIMEOUT for the long-polling of
async exec, to timeout each poll.
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/5c0cc786
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/5c0cc786
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/5c0cc786
Branch: refs/heads/master
Commit: 5c0cc786a098abf9311336ea2ba4fa311413e3c7
Parents: 32f92a4
Author: Aled Sage <al...@gmail.com>
Authored: Mon Dec 1 17:09:38 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Dec 19 00:18:47 2014 +0000
----------------------------------------------------------------------
.../internal/BrooklynFeatureEnablement.java | 3 +
.../util/internal/ssh/ShellAbstractTool.java | 107 ++++++--
.../brooklyn/util/internal/ssh/ShellTool.java | 4 +-
.../util/internal/ssh/sshj/SshjTool.java | 251 +++++++++++--------
.../sshj/SshjToolAsyncStubIntegrationTest.java | 169 +++++++++++++
.../ssh/sshj/SshjToolIntegrationTest.java | 15 +-
6 files changed, 411 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java b/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
index 0ce1b54..d8aa66e 100644
--- a/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
+++ b/core/src/main/java/brooklyn/internal/BrooklynFeatureEnablement.java
@@ -84,6 +84,8 @@ public class BrooklynFeatureEnablement {
*/
public static final String FEATURE_INFER_CATALOG_ITEM_ON_REBIND = "brooklyn.backwardCompatibility.feature.inferCatalogItemOnRebind";
+ public static final String FEATURE_SSH_ASYNC_EXEC = "brooklyn.experimental.feature.ssh.asyncExec";
+
private static final Map<String, Boolean> FEATURE_ENABLEMENTS = Maps.newLinkedHashMap();
private static final Object MUTEX = new Object();
@@ -102,6 +104,7 @@ public class BrooklynFeatureEnablement {
setDefault(FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, false);
setDefault(FEATURE_RENAME_THREADS, false);
setDefault(FEATURE_INFER_CATALOG_ITEM_ON_REBIND, true);
+ setDefault(FEATURE_SSH_ASYNC_EXEC, true);
}
static {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
index 5b3370d..41dc886 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
@@ -43,8 +43,10 @@ import brooklyn.util.ssh.BashCommands;
import brooklyn.util.text.Identifiers;
import brooklyn.util.text.StringEscapes.BashStringEscapes;
import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
import brooklyn.util.time.Time;
+import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
@@ -223,6 +225,7 @@ public abstract class ShellAbstractTool implements ShellTool {
protected final Boolean noDeleteAfterExec;
protected final String scriptNameWithoutExtension;
protected final String scriptPath;
+ protected final Duration execTimeout;
public ToolAbstractExecScript(Map<String,?> props) {
this.props = props;
@@ -234,6 +237,7 @@ public abstract class ShellAbstractTool implements ShellTool {
this.runAsRoot = getOptionalVal(props, PROP_RUN_AS_ROOT);
this.noExtraOutput = getOptionalVal(props, PROP_NO_EXTRA_OUTPUT);
this.noDeleteAfterExec = getOptionalVal(props, PROP_NO_DELETE_SCRIPT);
+ this.execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT);
String summary = getOptionalVal(props, PROP_SUMMARY);
if (summary!=null) {
@@ -294,11 +298,11 @@ public abstract class ShellAbstractTool implements ShellTool {
* note that some modes require \$RESULT passed in order to access a variable, whereas most just need $ */
protected List<String> buildRunScriptCommand() {
// TODO
- String touchCmd = String.format("touch %s; touch %s; touch %s; touch %s", stdoutPath, stderrPath, exitStatusPath, pidPath);
+ String touchCmd = String.format("touch %s %s %s %s", stdoutPath, stderrPath, exitStatusPath, pidPath);
String cmd = String.format("( %s > %s 2> %s < /dev/null ; echo $? > %s ) & disown", scriptPath, stdoutPath, stderrPath, exitStatusPath);
MutableList.Builder<String> cmds = MutableList.<String>builder()
- .add((runAsRoot ? BashCommands.sudo(touchCmd) : touchCmd))
- .add((runAsRoot ? BashCommands.sudo(cmd) : cmd))
+ .add(runAsRoot ? BashCommands.sudo(touchCmd) : touchCmd)
+ .add(runAsRoot ? BashCommands.sudo(cmd) : cmd)
.add("echo $! > "+pidPath)
.add("RESULT=$?");
if (noExtraOutput==null || !noExtraOutput) {
@@ -312,25 +316,29 @@ public abstract class ShellAbstractTool implements ShellTool {
* Builds the command to retrieve the exit status of the command, written to stdout.
*/
protected List<String> buildRetrieveStatusCommand() {
- String cmd =
- "if test -s "+exitStatusPath+"; then"+"\n"+
- " cat "+exitStatusPath+"\n"+
- "elif test -s "+pidPath+"; then"+"\n"+
- " pid=`cat "+pidPath+"`"+"\n"+
- " if ! ps -p $pid > /dev/null < /dev/null; then"+"\n"+
- " # no exit status, and not executing; give a few seconds grace in case just about to write exit status"+"\n"+
- " sleep 3"+"\n"+
- " if test -s "+exitStatusPath+"; then"+"\n"+
- " cat "+exitStatusPath+""+"\n"+
- " else"+"\n"+
- " echo \"No exit status in a.exitstatus, and pid in a.pid ($pid) not executing\""+"\n"+
- " exit 1"+"\n"+
- " fi"+"\n"+
- " fi"+"\n"+
- "else"+"\n"+
- " echo \"No exit status in "+exitStatusPath+", and "+pidPath+" is empty\""+"\n"+
- " exit 1"+"\n"+
- "fi"+"\n";
+ // Retrieve exit status from file (writtent to stdout), if populated;
+ // if not found and pid still running, then return empty string; else exit code 1.
+ List<String> cmdParts = ImmutableList.of(
+ "# Retrieve status", // comment is to aid testing - see SshjToolAsyncStubIntegrationTest
+ "if test -s "+exitStatusPath+"; then",
+ " cat "+exitStatusPath,
+ "elif test -s "+pidPath+"; then",
+ " pid=`cat "+pidPath+"`",
+ " if ! ps -p $pid > /dev/null < /dev/null; then",
+ " # no exit status, and not executing; give a few seconds grace in case just about to write exit status",
+ " sleep 3",
+ " if test -s "+exitStatusPath+"; then",
+ " cat "+exitStatusPath+"",
+ " else",
+ " echo \"No exit status in "+exitStatusPath+", and pid in "+pidPath+" ($pid) not executing\"",
+ " exit 1",
+ " fi",
+ " fi",
+ "else",
+ " echo \"No exit status in "+exitStatusPath+", and "+pidPath+" is empty\"",
+ " exit 1",
+ "fi"+"\n");
+ String cmd = Joiner.on("\n").join(cmdParts);
MutableList.Builder<String> cmds = MutableList.<String>builder()
.add((runAsRoot ? BashCommands.sudo(cmd) : cmd))
@@ -355,16 +363,61 @@ public abstract class ShellAbstractTool implements ShellTool {
return cmds.build();
}
+ /**
+ * Builds the command to retrieve the stdout and stderr of the async command.
+ * An offset can be given, to only retrieve data starting at a particular character (indexed from 0).
+ */
+ protected List<String> buildLongPollCommand(int stdoutPosition, int stderrPosition, Duration timeout) {
+ // Note that `tail -c +1` means start at the *first* character (i.e. start counting from 1, not 0)
+ // TODO Relies on commands terminating when ssh connection dropped (because not run with `nohup`)
+ String catStdoutCmd = "tail -c +"+(stdoutPosition+1)+" -f "+stdoutPath+" &";
+ String catStderrCmd = "tail -c +"+(stderrPosition+1)+" -f "+stderrPath+" 1>&2 &";
+ long maxTime = Math.max(1, timeout.toSeconds());
+
+ List<String> waitForExitStatusParts = ImmutableList.of(
+ "# Long poll", // comment is to aid testing - see SshjToolAsyncStubIntegrationTest
+ "EXIT_STATUS_PATH="+exitStatusPath,
+ "PID_PATH="+pidPath,
+ "MAX_TIME="+maxTime,
+ "COUNTER=0",
+ "while [ \"$COUNTER\" -lt $MAX_TIME ]; do",
+ " if test -s $EXIT_STATUS_PATH; then",
+ " EXIT_STATUS=`cat $EXIT_STATUS_PATH`",
+ " exit $EXIT_STATUS",
+ " elif test -s $PID_PATH; then",
+ " PID=`cat $PID_PATH`",
+ " if ! ps -p $PID > /dev/null < /dev/null; then",
+ " # no exit status, and not executing; give a few seconds grace in case just about to write exit status",
+ " sleep 3",
+ " if test -s $EXIT_STATUS_PATH; then",
+ " EXIT_STATUS=`cat $EXIT_STATUS_PATH`",
+ " exit $EXIT_STATUS",
+ " else",
+ " echo \"No exit status in $EXIT_STATUS_PATH, and pid in $PID_PATH ($PID) not executing\"",
+ " exit 126",
+ " fi",
+ " fi",
+ " fi",
+ " # No exit status in $EXIT_STATUS_PATH; keep waiting",
+ " sleep 1",
+ " COUNTER+=1",
+ "done",
+ "exit 125"+"\n");
+ String waitForExitStatus = Joiner.on("\n").join(waitForExitStatusParts);
+
+ MutableList.Builder<String> cmds = MutableList.<String>builder()
+ .add(runAsRoot ? BashCommands.sudo(catStdoutCmd) : catStdoutCmd)
+ .add(runAsRoot ? BashCommands.sudo(catStderrCmd) : catStderrCmd)
+ .add(runAsRoot ? BashCommands.sudo(waitForExitStatus) : waitForExitStatus);
+ return cmds.build();
+ }
+
protected List<String> deleteTemporaryFilesCommand() {
if (!Boolean.TRUE.equals(noDeleteAfterExec)) {
// use "-f" because some systems have "rm" aliased to "rm -i"
// use "< /dev/null" to guarantee doesn't hang
return ImmutableList.of(
- "rm -f "+scriptPath+" < /dev/null",
- "rm -f "+stdoutPath+" < /dev/null",
- "rm -f "+stderrPath+" < /dev/null",
- "rm -f "+exitStatusPath+" < /dev/null",
- "rm -f "+pidPath+" < /dev/null");
+ "rm -f "+scriptPath+" "+stdoutPath+" "+stderrPath+" "+exitStatusPath+" "+pidPath+" < /dev/null");
} else {
return ImmutableList.<String>of();
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
index 5cd392b..0555039 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellTool.java
@@ -59,9 +59,11 @@ public interface ShellTool {
ConfigKey<String> PROP_SUMMARY = ConfigKeys.newStringConfigKey("summary", "Provides a human-readable summary, used in file generation etc");
+ ConfigKey<Duration> PROP_EXEC_TIMEOUT = newConfigKey("execTimeout", "Timeout when executing a script", Duration.PRACTICALLY_FOREVER);
+
ConfigKey<Boolean> PROP_EXEC_ASYNC = newConfigKey("execAsync", "Executes the script asynchronously, and then polls for the result (and for stdout/stderr)", false);
- ConfigKey<Duration> PROP_EXEC_ASYNC_TIMEOUT = newConfigKey("execAsyncTimeout", "Timeout when executing a script asynchronously", Duration.PRACTICALLY_FOREVER);
+ ConfigKey<Duration> PROP_EXEC_ASYNC_POLLING_TIMEOUT = newConfigKey("execAsyncPollTimeout", "Timeout per poll when executing a script asynchronously", Duration.ONE_MINUTE);
/**
* Executes the set of commands in a shell script. Blocks until completion.
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
index 99debff..154c433 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
@@ -33,6 +33,7 @@ import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -52,11 +53,13 @@ import org.apache.commons.io.input.ProxyInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import brooklyn.internal.BrooklynFeatureEnablement;
import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.internal.ssh.BackoffLimitedRetryHandler;
import brooklyn.util.internal.ssh.SshAbstractTool;
import brooklyn.util.internal.ssh.SshTool;
import brooklyn.util.io.FileUtil;
+import brooklyn.util.repeat.Repeater;
import brooklyn.util.stream.KnownSizeInputStream;
import brooklyn.util.stream.StreamGobbler;
import brooklyn.util.stream.Streams;
@@ -72,6 +75,7 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.CountingOutputStream;
import com.google.common.net.HostAndPort;
import com.google.common.primitives.Ints;
@@ -312,7 +316,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
@Override
public int execScript(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
Boolean execAsync = getOptionalVal(props, PROP_EXEC_ASYNC);
- if (Boolean.TRUE.equals(execAsync)) {
+ if (Boolean.TRUE.equals(execAsync) && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC)) {
return execScriptAsyncAndPoll(props, commands, env);
} else {
return new ToolAbstractExecScript(props) {
@@ -320,8 +324,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
String scriptContents = toScript(props, commands, env);
if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as script: {}", host, scriptContents);
copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
-
- return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);
+ return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1);
}
}.run();
}
@@ -329,114 +332,137 @@ public class SshjTool extends SshAbstractTool implements SshTool {
protected int execScriptAsyncAndPoll(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
return new ToolAbstractAsyncExecScript(props) {
+ private int maxConsecutiveSshFailures = 3;
+ private Duration maxDelayBetweenPolls = Duration.seconds(20);
+ private Duration pollTimeout = getOptionalVal(props, PROP_EXEC_ASYNC_POLLING_TIMEOUT, Duration.FIVE_MINUTES);
+ private int iteration = 0;
+ private int consecutiveSshFailures = 0;
+ private int stdoutCount = 0;
+ private int stderrCount = 0;
+ private Stopwatch timer;
+
public int run() {
+ timer = Stopwatch.createStarted();
String scriptContents = toScript(props, commands, env);
if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as async script: {}", host, scriptContents);
copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
// Execute script asynchronously
- int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err)), -1);
+ int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1);
if (execResult != 0) return execResult;
- // Poll repeatedly to get the status
- Duration initialDelayBetweenPolls = Duration.ONE_SECOND;
- Duration maxDelayBetweenPolls = Duration.seconds(20);
- double multiplierBetweenPolls = 1.2;
- int maxConsecutiveSshFailures = 3;
- Duration delayBetweenPolls = initialDelayBetweenPolls;
- Stopwatch executionStartTime = Stopwatch.createStarted();
- Duration timeout = getOptionalVal(props, PROP_EXEC_ASYNC_TIMEOUT);
- if (timeout == null) timeout = Duration.PRACTICALLY_FOREVER;
- int stdoutCount = 0;
- int stderrCount = 0;
- int iteration = 0;
- int consecutiveSshFailures = 0;
+ // Long polling to get the status
try {
- do {
- // Retrieve the exit status of the async script
- ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
- ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
- int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr)), -1);
-
- // Retrieve the stdout + stderr of the async script.
- // Always do this *after* retrieving exit status, so don't miss anything.
- if (out != null || err != null) {
- ByteArrayOutputStream streamsOut = new ByteArrayOutputStream();
- ByteArrayOutputStream streamsErr = new ByteArrayOutputStream();
- int streamsResult = asInt(acquire(new ShellAction(buildRetrieveStdoutAndStderrCommand(stdoutCount, stderrCount), streamsOut, streamsErr)), -1);
-
- if (streamsResult == 0) {
- stdoutCount += streamsOut.size();
- stderrCount += streamsErr.size();
- } else {
- if (out != null) out.write(toUTF8ByteArray("retrieving stdout/stderr failed with exit code "+streamsResult+" (stdout follow)"));
- if (err != null) err.write(toUTF8ByteArray("retrieving stdout/stderr failed with exit code "+streamsResult+" (stderr follow)"));
- }
- if (out != null) out.write(streamsOut.toByteArray());
- if (err != null) err.write(streamsErr.toByteArray());
- }
+ final AtomicReference<Integer> result = new AtomicReference<Integer>();
+ boolean success = Repeater.create("async script on "+SshjTool.this.toString()+" (for "+getSummary()+")")
+ .backoffTo(maxDelayBetweenPolls)
+ .limitTimeTo(execTimeout)
+ .until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ iteration++;
+ if (LOG.isDebugEnabled()) LOG.debug("Doing long-poll (iteration="+iteration+") for async script to complete on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+ Integer exitstatus = longPoll();
+ result.set(exitstatus);
+ return exitstatus != null;
+ }})
+ .run();
+
+ if (!success) {
+ // Timed out
+ String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
+ LOG.warn(msg+"; rethrowing");
+ throw new TimeoutException(msg);
+ }
- if (statusResult != 0) {
- // Failed to get the exit status; if it was -1 (i.e. connection error) then consider retrying; otherwise abort.
- if (out != null) {
- out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
- out.write(statusOut.toByteArray());
- }
- if (err != null) {
- err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
- err.write(statusErr.toByteArray());
- }
- if (statusResult == -1) {
- // Looks like an ssh timeout/connection error; we are willing to retry
- consecutiveSshFailures++;
- if (consecutiveSshFailures > maxConsecutiveSshFailures) {
- LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")"+"\n"
- +"\t"+"EXIT STATUS: -1"+"\n"
- +"\t"+"STDERR: "+new String(statusErr.toByteArray())+"\n"
- +"\t"+"STDOUT: "+new String(statusOut.toByteArray())+"\n");
- return -1;
- } else {
- LOG.info("ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")"+"\n"
- +"\t"+"EXIT STATUS: -1"+"\n"
- +"\t"+"STDERR: "+new String(statusErr.toByteArray())+"\n"
- +"\t"+"STDOUT: "+new String(statusOut.toByteArray())+"\n");
- }
- } else {
- return statusResult;
- }
- } else {
- consecutiveSshFailures = 0;
- String statusOutStr = new String(statusOut.toByteArray());
- if (Strings.isNonBlank(statusOutStr)) {
- return Integer.parseInt(statusOutStr.trim());
- }
- }
-
- if (iteration == 0) {
- if (LOG.isDebugEnabled()) LOG.debug("Waiting for async script to complete on "+SshjTool.this.toString()+" (polling periodically for "+getSummary()+")");
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("Still waiting for async script to complete on "+SshjTool.this.toString()+" (polling periodically for "+getSummary()+"; iteration="+iteration+")");
- }
- Time.sleep(delayBetweenPolls);
- delayBetweenPolls = Duration.min(maxDelayBetweenPolls, delayBetweenPolls.multiply(multiplierBetweenPolls));
- iteration++;
-
- } while (timeout.isLongerThan(Duration.millis(executionStartTime.elapsed(TimeUnit.MILLISECONDS))));
+ return result.get();
- // Timed out
- String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
- LOG.warn(msg+"; rethrowing");
- throw new TimeoutException(msg);
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
+ Integer longPoll() throws IOException {
+ // Long-polling to get stdout, stderr + exit status of async task.
+ // If our long-poll disconnects, we will just re-execute.
+ // We wrap the stdout/stderr so that we can get the size count.
+ // If we disconnect, we will pick up from that char of the stream.
+ // TODO Additional stdout/stderr written by buildLongPollCommand() could interfere,
+ // causing us to miss some characters.
+ // TODO Want to include timeout for long-polling
+ Duration nextPollTimeout = Duration.min(pollTimeout, Duration.millis(execTimeout.toMilliseconds()-timer.elapsed(TimeUnit.MILLISECONDS)));
+ CountingOutputStream countingOut = (out == null) ? null : new CountingOutputStream(out);
+ CountingOutputStream countingErr = (err == null) ? null : new CountingOutputStream(err);
+ int longPollResult = asInt(acquire(new ShellAction(buildLongPollCommand(stdoutCount, stderrCount, nextPollTimeout), countingOut, countingErr, nextPollTimeout)), -1);
+ stdoutCount += (countingOut == null) ? 0 : countingOut.getCount();
+ stderrCount += (countingErr == null) ? 0 : countingErr.getCount();
+
+ if (longPollResult == 0) {
+ return longPollResult; // success
+
+ } else if (longPollResult == -1) {
+ // probably a connection failure; try again
+ return null;
+
+ } else {
+ // want to double-check whether this is the exit-code from the async process, or
+ // some unexpected failure in our long-poll command.
+ Integer result = retrieveStatusCommand();
+ if (result != null) {
+ return result;
+ }
+ }
+
+ consecutiveSshFailures++;
+ if (consecutiveSshFailures > maxConsecutiveSshFailures) {
+ LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors (return -1) when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
+ return -1;
+ } else {
+ LOG.info("Retrying after ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
+ return null;
+ }
+ }
+ Integer retrieveStatusCommand() throws IOException {
+ // want to double-check whether this is the exit-code from the async process, or
+ // some unexpected failure in our long-poll command.
+ ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
+ ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
+ int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr, execTimeout)), -1);
+
+ if (statusResult == 0) {
+ // The status we retrieved really is valid; return it.
+ // TODO How to ensure no additional output in stdout/stderr when parsing below?
+ String statusOutStr = new String(statusOut.toByteArray()).trim();
+ if (Strings.isEmpty(statusOutStr)) {
+ // suggests not yet completed; will retry with long-poll
+ return null;
+ } else {
+ int result = Integer.parseInt(statusOutStr);
+ return result;
+ }
+
+ } else if (statusResult == -1) {
+ // probably a connection failure; try again with long-poll
+ return null;
+
+ } else {
+ if (out != null) {
+ out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
+ out.write(statusOut.toByteArray());
+ }
+ if (err != null) {
+ err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
+ err.write(statusErr.toByteArray());
+ }
+
+ return statusResult;
+ }
+ }
}.run();
}
-
public int execShellDirect(Map<String,?> props, List<String> commands, Map<String,?> env) {
OutputStream out = getOptionalVal(props, PROP_OUT_STREAM);
OutputStream err = getOptionalVal(props, PROP_ERR_STREAM);
+ Duration execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT);
List<String> cmdSequence = toCommandSequence(commands, env);
List<String> allcmds = ImmutableList.<String>builder()
@@ -447,7 +473,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {}: {}", host, allcmds);
- Integer result = acquire(new ShellAction(allcmds, out, err));
+ Integer result = acquire(new ShellAction(allcmds, out, err, execTimeout));
if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} completed: return status {}", host, result);
return asInt(result, -1);
}
@@ -460,6 +486,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
OutputStream out = getOptionalVal(props, PROP_OUT_STREAM);
OutputStream err = getOptionalVal(props, PROP_ERR_STREAM);
String separator = getOptionalVal(props, PROP_SEPARATOR);
+ Duration execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT);
List<String> allcmds = toCommandSequence(commands, env);
String singlecmd = Joiner.on(separator).join(allcmds);
@@ -470,7 +497,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
if (LOG.isTraceEnabled()) LOG.trace("Running command at {}: {}", host, singlecmd);
- Command result = acquire(new ExecAction(singlecmd, out, err));
+ Command result = acquire(new ExecAction(singlecmd, out, err, execTimeout));
if (LOG.isTraceEnabled()) LOG.trace("Running command at {} completed: exit code {}", host, result.getExitStatus());
// can be null if no exit status is received (observed on kill `ps aux | grep thing-to-grep-for | awk {print $2}`
if (result.getExitStatus()==null) LOG.warn("Null exit status running at {}: {}", host, singlecmd);
@@ -722,18 +749,23 @@ public class SshjTool extends SshAbstractTool implements SshTool {
class ExecAction implements SshAction<Command> {
private final String command;
+ private final OutputStream out;
+ private final OutputStream err;
+ private final Duration timeout;
private Session session;
private Shell shell;
private StreamGobbler outgobbler;
private StreamGobbler errgobbler;
- private OutputStream out;
- private OutputStream err;
-
- ExecAction(String command, OutputStream out, OutputStream err) {
+
+ ExecAction(String command, OutputStream out, OutputStream err, Duration timeout) {
this.command = checkNotNull(command, "command");
this.out = out;
this.err = err;
+ Duration sessionTimeout = (sshClientConnection.getSessionTimeout() == 0)
+ ? Duration.PRACTICALLY_FOREVER
+ : Duration.millis(sshClientConnection.getSessionTimeout());
+ this.timeout = (timeout == null) ? sessionTimeout : Duration.min(timeout, sessionTimeout);
}
@Override
@@ -762,7 +794,8 @@ public class SshjTool extends SshAbstractTool implements SshTool {
errgobbler.start();
}
try {
- output.join(sshClientConnection.getSessionTimeout(), TimeUnit.MILLISECONDS);
+
+ output.join((int)Math.min(timeout.toMilliseconds(), Integer.MAX_VALUE), TimeUnit.MILLISECONDS);
return output;
} finally {
@@ -788,19 +821,27 @@ public class SshjTool extends SshAbstractTool implements SshTool {
}
class ShellAction implements SshAction<Integer> {
- private final List<String> commands;
+ @VisibleForTesting
+ final List<String> commands;
+ @VisibleForTesting
+ final OutputStream out;
+ @VisibleForTesting
+ final OutputStream err;
private Session session;
private Shell shell;
private StreamGobbler outgobbler;
private StreamGobbler errgobbler;
- private OutputStream out;
- private OutputStream err;
+ private Duration timeout;
- ShellAction(List<String> commands, OutputStream out, OutputStream err) {
+ ShellAction(List<String> commands, OutputStream out, OutputStream err, Duration timeout) {
this.commands = checkNotNull(commands, "commands");
this.out = out;
this.err = err;
+ Duration sessionTimeout = (sshClientConnection.getSessionTimeout() == 0)
+ ? Duration.PRACTICALLY_FOREVER
+ : Duration.millis(sshClientConnection.getSessionTimeout());
+ this.timeout = (timeout == null) ? sessionTimeout : Duration.min(timeout, sessionTimeout);
}
@Override
@@ -854,8 +895,8 @@ public class SshjTool extends SshAbstractTool implements SshTool {
closeWhispering(output, this);
try {
- long timeout = sshClientConnection.getSessionTimeout();
- long timeoutEnd = System.currentTimeMillis() + timeout;
+ long timeoutMillis = Math.min(timeout.toMilliseconds(), Integer.MAX_VALUE);
+ long timeoutEnd = System.currentTimeMillis() + timeoutMillis;
Exception last = null;
do {
if (!shell.isOpen() && ((SessionChannel)session).getExitStatus()!=null)
@@ -874,7 +915,7 @@ public class SshjTool extends SshAbstractTool implements SshTool {
// which is nohupped to really be in the background (#162)
// now let's bail out
break;
- } while (timeout<=0 || System.currentTimeMillis() < timeoutEnd);
+ } while (System.currentTimeMillis() < timeoutEnd);
if (shell.isOpen() && ((SessionChannel)session).getExitStatus()==null) {
LOG.debug("Timeout ({}) in SSH shell to {}", sshClientConnection.getSessionTimeout(), this);
// we timed out, or other problem -- reproduce the error
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
new file mode 100644
index 0000000..2acb200
--- /dev/null
+++ b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.util.internal.ssh.sshj;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.internal.ssh.sshj.SshjTool.ShellAction;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/**
+ * Tests for async-exec with {@link SshjTool}, where it stubs out the actual ssh commands
+ * to return a controlled sequence of responses.
+ */
+public class SshjToolAsyncStubIntegrationTest {
+
+ static class InjectedResult {
+ Predicate<SshjTool.ShellAction> expected;
+ Function<SshjTool.ShellAction, Integer> result;
+
+ InjectedResult(Predicate<SshjTool.ShellAction> expected, Function<SshjTool.ShellAction, Integer> result) {
+ this.expected = expected;
+ this.result = result;
+ }
+ }
+
+ private SshjTool tool;
+ private List<InjectedResult> sequence;
+ int counter = 0;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ sequence = Lists.newArrayList();
+ counter = 0;
+
+ tool = new SshjTool(ImmutableMap.<String,Object>of("host", "localhost")) {
+ @SuppressWarnings("unchecked")
+ protected <T, C extends SshAction<T>> T acquire(C action) {
+ if (action instanceof SshjTool.ShellAction) {
+ SshjTool.ShellAction shellAction = (SshjTool.ShellAction) action;
+ InjectedResult injectedResult = sequence.get(counter);
+ assertTrue(injectedResult.expected.apply(shellAction), "counter="+counter+"; cmds="+shellAction.commands);
+ counter++;
+ return (T) injectedResult.result.apply(shellAction);
+ }
+ return super.acquire(action);
+ }
+ };
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (tool != null) tool.disconnect();
+ }
+
+ private Predicate<SshjTool.ShellAction> containsCmd(final String cmd) {
+ return new Predicate<SshjTool.ShellAction>() {
+ @Override public boolean apply(ShellAction input) {
+ return input != null && input.commands.toString().contains(cmd);
+ }
+ };
+ }
+
+ private Function<SshjTool.ShellAction, Integer> returning(final int result, final String stdout, final String stderr) {
+ return new Function<SshjTool.ShellAction, Integer>() {
+ @Override public Integer apply(ShellAction input) {
+ try {
+ if (stdout != null && input.out != null) input.out.write(stdout.getBytes());
+ if (stderr != null && input.err != null) input.err.write(stderr.getBytes());
+ } catch (IOException e) {
+ throw Exceptions.propagate(e);
+ }
+ return result;
+ }
+ };
+ }
+
+ @Test(groups="Integration")
+ public void testPolls() throws Exception {
+ sequence = ImmutableList.of(
+ new InjectedResult(containsCmd("& disown"), returning(0, "", "")),
+ new InjectedResult(containsCmd("# Long poll"), returning(0, "mystringToStdout", "mystringToStderr")));
+
+ runTest(0, "mystringToStdout", "mystringToStderr");
+ assertEquals(counter, sequence.size());
+ }
+
+ @Test(groups="Integration")
+ public void testPollsAndReturnsNonZeroExitCode() throws Exception {
+ sequence = ImmutableList.of(
+ new InjectedResult(containsCmd("& disown"), returning(0, "", "")),
+ new InjectedResult(containsCmd("# Long poll"), returning(123, "mystringToStdout", "mystringToStderr")),
+ new InjectedResult(containsCmd("# Retrieve status"), returning(0, "123", "")));
+
+ runTest(123, "mystringToStdout", "mystringToStderr");
+ assertEquals(counter, sequence.size());
+ }
+
+ @Test(groups="Integration")
+ public void testPollsRepeatedly() throws Exception {
+ sequence = ImmutableList.of(
+ new InjectedResult(containsCmd("& disown"), returning(0, "", "")),
+ new InjectedResult(containsCmd("# Long poll"), returning(125, "mystringToStdout", "mystringToStderr")),
+ new InjectedResult(containsCmd("# Retrieve status"), returning(0, "", "")),
+ new InjectedResult(containsCmd("# Long poll"), returning(125, "mystringToStdout2", "mystringToStderr2")),
+ new InjectedResult(containsCmd("# Retrieve status"), returning(0, "", "")),
+ new InjectedResult(containsCmd("# Long poll"), returning(-1, "mystringToStdout3", "mystringToStderr3")),
+ new InjectedResult(containsCmd("# Long poll"), returning(125, "mystringToStdout4", "mystringToStderr4")),
+ new InjectedResult(containsCmd("# Retrieve status"), returning(0, "", "")),
+ new InjectedResult(containsCmd("# Long poll"), returning(0, "mystringToStdout5", "mystringToStderr5")));
+
+ runTest(0,
+ "mystringToStdout"+"mystringToStdout2"+"mystringToStdout3"+"mystringToStdout4"+"mystringToStdout5",
+ "mystringToStderr"+"mystringToStderr2"+"mystringToStderr3"+"mystringToStderr4"+"mystringToStderr5");
+ assertEquals(counter, sequence.size());
+ }
+
+ protected void runTest(int expectedExit, String expectedStdout, String expectedStderr) throws Exception {
+ List<String> cmds = ImmutableList.of("abc");
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ int exitCode = tool.execScript(
+ ImmutableMap.of(
+ "out", out,
+ "err", err,
+ SshjTool.PROP_EXEC_ASYNC.getName(), true,
+ SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true,
+ SshjTool.PROP_EXEC_ASYNC_POLLING_TIMEOUT.getName(), Duration.ONE_MILLISECOND),
+ cmds,
+ ImmutableMap.<String,String>of());
+ String outStr = new String(out.toByteArray());
+ String errStr = new String(err.toByteArray());
+
+ assertEquals(exitCode, expectedExit);
+ assertEquals(outStr.trim(), expectedStdout);
+ assertEquals(errStr.trim(), expectedStderr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5c0cc786/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
index f5b8700..b81712d 100644
--- a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
+++ b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
@@ -176,15 +176,20 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream err = new ByteArrayOutputStream();
int exitCode = tool.execScript(
- ImmutableMap.of("out", out, "err", err, SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true),
+ ImmutableMap.of(
+ "out", out,
+ "err", err,
+ SshjTool.PROP_EXEC_ASYNC.getName(), true,
+ SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true,
+ SshjTool.PROP_EXEC_ASYNC_POLLING_TIMEOUT.getName(), Duration.ONE_SECOND),
cmds,
ImmutableMap.<String,String>of());
String outStr = new String(out.toByteArray());
String errStr = new String(err.toByteArray());
assertEquals(exitCode, 0);
- assertEquals("mystringToStdout\nmystringPostSleepToStdout", outStr.trim());
- assertEquals("mystringToStderr\nmystringPostSleepToStderr", errStr.trim());
+ assertEquals(outStr.trim(), "mystringToStdout\nmystringPostSleepToStdout");
+ assertEquals(errStr.trim(), "mystringToStderr\nmystringPostSleepToStderr");
}
@Test(groups = {"Integration"})
@@ -201,7 +206,7 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
tool.execScript(
- ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_ASYNC_TIMEOUT.getName(), Duration.millis(1)),
+ ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_TIMEOUT.getName(), Duration.millis(1)),
ImmutableList.of("sleep 60"),
ImmutableMap.<String,String>of());
fail();
@@ -220,7 +225,7 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
int exitStatus = tool.execScript(
- ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_ASYNC_TIMEOUT.getName(), Duration.millis(1)),
+ ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_TIMEOUT.getName(), Duration.millis(1)),
ImmutableList.of("sleep 63"),
ImmutableMap.<String,String>of());