You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2022/07/09 16:24:13 UTC

[mina-sshd] branch master updated: [SSHD-1276] Added capability to redirect command/shell STDERR stream to STDOUT one

This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git


The following commit(s) were added to refs/heads/master by this push:
     new af4e310bb [SSHD-1276] Added capability to redirect command/shell STDERR stream to STDOUT one
af4e310bb is described below

commit af4e310bbc9e6ca6a328b255d2b48cbe04a89c9d
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Thu Jul 7 08:58:59 2022 +0300

    [SSHD-1276] Added capability to redirect command/shell STDERR stream to STDOUT one
---
 CHANGES.md                                         |   3 +-
 docs/client-setup.md                               | 109 +++++++++++++++++++--
 .../sshd/client/channel/AbstractClientChannel.java |  10 ++
 .../apache/sshd/client/channel/ChannelSession.java |  24 +++--
 .../apache/sshd/client/channel/ClientChannel.java  |   8 ++
 .../sshd/client/session/ClientSessionTest.java     |  71 ++++++++++++++
 6 files changed, 209 insertions(+), 16 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 725bbbda7..241256ed9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -109,4 +109,5 @@ Was originally in *HostConfigEntry*.
 * [SSHD-1266](https://issues.apache.org/jira/browse/SSHD-1266) Fix encoding/decoding critical options in OpenSSH certificates
 * [SSHD-1269](https://issues.apache.org/jira/browse/SSHD-1269) Fix TCP/IP remote port forwarding with wildcard addresses
 * [SSHD-1272](https://issues.apache.org/jira/browse/SSHD-1272) Use correct signature for RSA keys from SSH agent, and by default ignore keys for which there is no signature algorithm
-* [SSHD-1273](https://issues.apache.org/jira/browse/SSHD-1273) Add support to use env vars together with subsystem channels
\ No newline at end of file
+* [SSHD-1273](https://issues.apache.org/jira/browse/SSHD-1273) Add support to use env vars together with subsystem channels
+* [SSHD-1276](https://issues.apache.org/jira/browse/SSHD-1276) Added capability to redirect command/shell STDERR stream to STDOUT one
\ No newline at end of file
diff --git a/docs/client-setup.md b/docs/client-setup.md
index bd6df7ebb..0b994c6b9 100644
--- a/docs/client-setup.md
+++ b/docs/client-setup.md
@@ -334,37 +334,130 @@ the `ClientSession` (for specific session configuration).
 
 ## Running a command or opening a shell
 
-When running a command or opening a shell, there is an extra concern regarding the PTY configuration and/or the
-reported environment variables. By default, unless specific instructions are provided, the code uses some internal
-defaults - which however, might not be adequate for the specific client/server.
+### Running a single non-interactive command
+
+```java
+try (OutputStream stdout = ...create/obtain output stream...;
+     OutputStream stderr = ...create/obtain output stream...;
+     ClientChannel channel = session.createExecChannel(command)) {
+    channel.setOut(stdout);
+    channel.setErr(stderr);
+    channel.open().verify(...some timeout...);
+    // Wait (forever) for the channel to close - signalling command finished
+    channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0L);
+}
+
+// Parse/handle the command's output/error streams
+```
+
+If all one needs is to run a non-interactive command and then look at its string output, one can use several of the
+available *ClientSession#executeRemoteCommand* overloaded methods.
+
+### Running an interactive command/shell
+
+If one needs to parse the command/shell output and then respond by sending the correct input, the code must use **separate** thread(s)
+to read the STDOUT/STDERR and provide STDIN input. These threads must be up and running *before* opening the channel since data may start
+to pour in even before the *await/verify* call returns. If this data is not consumed at a reasonable pace, then channel may block and eventually
+even disconnect. Thus the thread(s) using the streams must be ready beforehand.
+
 
 ```java
-// Assuming one has obtained a ClientSession as already shown
+// The same code can be used when opening a ChannelExec in order to run a single interactive command
 try (ClientChannel channel = session.createShellChannel(/* use internal defaults */)) {
     channel.setIn(...stdin...);
     channel.setOut(...stdout...);
     channel.setErr(...stderr...);
-    // ... spawn the thread(s) that will pump the STDIN/OUT/ERR
+    ...spawn the servicing thread(s)....
+    try {
+        channel.open().verify(...some timeout...);
+        // Wait (forever) for the channel to close - signalling shell exited
+        channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0L);
+    } finally {
+        // ... stop the pumping threads ...
+    }
+}
+
+```
+
+In such cases it is recommended to use the inverted streams in the relevant threads
+
+
+```java
+// The same code can be used when opening a ChannelExec in order to run a single interactive command
+try (ClientChannel channel = session.createShellChannel(/* use internal defaults */)) {
+    try {
+        channel.open().verify(...some timeout...);
+        
+        spawnStdinThread(channel.getInvertedIn());
+        spawnStdoutThread(channel.getInvertedOut());
+        spawnStderrThread(channel.getInvertedErr());
+
+        // Wait (forever) for the channel to close - signalling shell exited
+        channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0L);
+    } finally {
+        // ... stop the pumping threads ...
+    }
+}
+
+```
+
+### Redirecting STDERR stream to STDOUT
+
+One can use a combined STDOUT/STDERR stream instead of separate ones:
+
+```java
+
+///////////////////////// Non-interactive ///////////////////////////////
+
+try (OutputStream mergedOutput = ...create/obtain output stream...;
+     ClientChannel channel = session.createExecChannel(command)) {
+    channel.setOut(mergedOutput);
+    channel.redirectErrorStream(true);
+    channel.open().verify(...some timeout...);
+    // Wait (forever) for the channel to close - signalling command finished
+    channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0L);
+}
+
+// Parse/handle the combined output/error streams
+
+////////////////////////// Interactive ////////////////////////////////////
+
+try (ClientChannel channel = session.createShellChannel(/* use internal defaults */)) {
     try {
+        channel.redirectErrorStream(true);
+        
         channel.open().verify(...some timeout...);
+        
+        spawnStdinThread(channel.getInvertedIn());
+        spawnCombinedOutputThread(channel.getInvertedOut());
+
         // Wait (forever) for the channel to close - signalling shell exited
         channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0L);
     } finally {
         // ... stop the pumping threads ...
     }
 }
+```
+
+**Note:** the call to *redirectErrorStream* must occur **before** channel is opened. Calling it afterwards has no effect - i.e.,
+the last state before opening the stream determines the channel's behavior.
+
+### PTY configuration
 
+When running a command or opening a shell, there is an extra concern regarding the PTY configuration and/or the
+reported environment variables. By default, unless specific instructions are provided, the code uses some internal
+defaults - which however, might not be adequate for the specific client/server.
+
+```java
 // In order to override the PTY and/or environment
 Map<String, ?> env = ...some environment...
 PtyChannelConfiguration ptyConfig = ...some configuration...
 try (ClientChannel channel = session.createShellChannel(ptyConfig, env)) {
     ... same code as before ...
 }
-
-// the same code can be used when opening a ChannelExec in order to run a single command
-
 ```
 
+
 One possible source of PTY configuration is code that provides some default initializations based on the detected O/S
 type - `PtyChannelConfigurationMutator#setupSensitiveDefaultPtyConfiguration`. Of course, the user may use whatever other
 considerations when opening such a channel.
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 1f6f56934..042bcdf85 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -75,6 +75,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
     protected InputStream invertedOut;
     protected OutputStream err;
     protected InputStream invertedErr;
+    protected boolean redirectErrorStream;
     protected final AtomicReference<Integer> exitStatusHolder = new AtomicReference<>(null);
     protected final AtomicReference<String> exitSignalHolder = new AtomicReference<>(null);
     protected int openFailureReason;
@@ -178,6 +179,15 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
         this.err = err;
     }
 
+    public boolean isRedirectErrorStream() {
+        return redirectErrorStream;
+    }
+
+    @Override
+    public void setRedirectErrorStream(boolean redirectErrorStream) {
+        this.redirectErrorStream = redirectErrorStream;
+    }
+
     @Override
     protected Closeable getInnerCloseable() {
         return builder()
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index 12223c214..de753cd79 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -51,8 +51,8 @@ import org.apache.sshd.core.CoreModuleProperties;
  */
 public class ChannelSession extends AbstractClientChannel {
 
-    private CloseableExecutorService pumperService;
-    private Future<?> pumper;
+    protected CloseableExecutorService pumperService;
+    protected Future<?> pumper;
     private final Map<String, Object> env = new LinkedHashMap<>();
 
     public ChannelSession() {
@@ -76,7 +76,11 @@ public class ChannelSession extends AbstractClientChannel {
                 }
             };
             asyncOut = new ChannelAsyncInputStream(this);
-            asyncErr = new ChannelAsyncInputStream(this);
+            if (redirectErrorStream) {
+                asyncErr = asyncOut;
+            } else {
+                asyncErr = new ChannelAsyncInputStream(this);
+            }
         } else {
             invertedIn = new ChannelOutputStream(
                     this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true);
@@ -88,11 +92,17 @@ public class ChannelSession extends AbstractClientChannel {
                 out = pos;
                 invertedOut = pis;
             }
+
             if (err == null) {
-                ChannelPipedInputStream pis = new ChannelPipedInputStream(this, wLocal);
-                ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
-                err = pos;
-                invertedErr = pis;
+                if (redirectErrorStream) {
+                    err = out;
+                    invertedErr = invertedOut;
+                } else {
+                    ChannelPipedInputStream pis = new ChannelPipedInputStream(this, wLocal);
+                    ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
+                    err = pos;
+                    invertedErr = pis;
+                }
             }
 
             if (in != null) {
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
index 7897ba7aa..80c255885 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
@@ -86,6 +86,14 @@ public interface ClientChannel extends Channel, StreamingChannel, ClientSessionH
 
     void setErr(OutputStream err);
 
+    /**
+     * @param redirectErrorStream If {@code true} then STDERR stream is set to be the same as STDOUT unless
+     *                            {@link #setErr(OutputStream)} was called. <B>Note:</B> the call must occur
+     *                            <U>before</U> channel is opened. Calling it afterwards has no effect - i.e., the last
+     *                            state before opening the stream determines the channel's behavior.
+     */
+    void setRedirectErrorStream(boolean redirectErrorStream);
+
     OpenFuture open() throws IOException;
 
     /**
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/session/ClientSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/client/session/ClientSessionTest.java
index d6c4cf25e..73a7a58db 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/session/ClientSessionTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/session/ClientSessionTest.java
@@ -19,18 +19,27 @@
 
 package org.apache.sshd.client.session;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.rmi.RemoteException;
 import java.rmi.ServerException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.channel.ClientChannelEvent;
 import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.common.AttributeRepository;
 import org.apache.sshd.common.AttributeRepository.AttributeKey;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.SessionListener;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.core.CoreModuleProperties;
 import org.apache.sshd.server.SshServer;
 import org.apache.sshd.server.auth.keyboard.KeyboardInteractiveAuthenticator;
@@ -298,4 +307,66 @@ public class ClientSessionTest extends BaseTestSupport {
             CoreModuleProperties.SERVER_EXTRA_IDENTIFICATION_LINES.set(sshd, null);
         }
     }
+
+    @Test   // SSHD-1276
+    public void testRedirectCommandErrorStream() throws Exception {
+        String expectedCommand = getCurrentTestName() + "-CMD";
+        String expectedStdout = getCurrentTestName() + "-STDOUT";
+        String expectedStderr = getCurrentTestName() + "-STDERR";
+        sshd.setCommandFactory((session, command) -> new CommandExecutionHelper(command) {
+            private boolean cmdProcessed;
+
+            @Override
+            protected boolean handleCommandLine(String command) throws Exception {
+                assertEquals("Mismatched incoming command", expectedCommand, command);
+                assertFalse("Duplicated command call", cmdProcessed);
+                writeResponse(getOutputStream(), expectedStdout);
+                writeResponse(getErrorStream(), expectedStderr);
+                cmdProcessed = true;
+                return false;
+            }
+
+            private void writeResponse(OutputStream out, String rsp) throws IOException {
+                out.write(rsp.getBytes(StandardCharsets.US_ASCII));
+                out.write((byte) '\n');
+                out.flush();
+            }
+        });
+
+        String response;
+        try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
+                .verify(CONNECT_TIMEOUT)
+                .getSession()) {
+            session.addPasswordIdentity(getCurrentTestName());
+            session.auth().verify(AUTH_TIMEOUT);
+
+            try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+                // NOTE !!! The LF is only because we are using a buffered reader on the server end to read the command
+                try (ClientChannel channel = session.createExecChannel(expectedCommand + "\n")) {
+                    channel.setOut(baos);
+                    channel.setRedirectErrorStream(true);
+
+                    channel.open().verify(OPEN_TIMEOUT);
+                    // Wait (forever) for the channel to close - signalling command finished
+                    channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), CLOSE_TIMEOUT);
+                }
+
+                byte[] bytes = baos.toByteArray();
+                response = new String(bytes, StandardCharsets.US_ASCII);
+            }
+        }
+
+        String[] lines = GenericUtils.split(response, '\n');
+        assertEquals("Mismatched response lines count", 2, lines.length);
+
+        Collection<String> values = new ArrayList<>(Arrays.asList(lines));
+        // We don't rely on the order the strings were written
+        for (String expected : new String[] { expectedStdout, expectedStderr }) {
+            if (!values.remove(expected)) {
+                fail(expected + " not in response=" + values);
+            }
+        }
+
+        assertTrue("Unexpected response remainders: " + values, values.isEmpty());
+    }
 }