You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/10/21 06:48:29 UTC

mina-sshd git commit: [SSHD-570] NPE while executing a command while redirecting

Repository: mina-sshd
Updated Branches:
  refs/heads/master b0b8d3466 -> 2f95f7b9d


[SSHD-570] NPE while executing a command while redirecting


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/2f95f7b9
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/2f95f7b9
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/2f95f7b9

Branch: refs/heads/master
Commit: 2f95f7b9d41756385cba1b8fdfb20f3ec3bd987d
Parents: b0b8d34
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Wed Oct 21 07:48:15 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Wed Oct 21 07:48:15 2015 +0300

----------------------------------------------------------------------
 .../sshd/server/shell/InvertedShellWrapper.java |  85 +++++---
 .../apache/sshd/server/shell/ProcessShell.java  | 143 ++++++++++++
 .../sshd/server/shell/ProcessShellFactory.java  | 218 +------------------
 .../sshd/server/shell/TtyFilterInputStream.java |  97 +++++++++
 .../server/shell/TtyFilterOutputStream.java     |  66 ++++++
 .../apache/sshd/server/shell/TtyOptions.java    |  55 +++++
 .../server/shell/InvertedShellWrapperTest.java  | 125 ++++++++++-
 .../sshd/util/test/BogusExitCallback.java       |  17 +-
 8 files changed, 554 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2f95f7b9/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java b/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java
index f3ee901..5f167df 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/shell/InvertedShellWrapper.java
@@ -24,6 +24,10 @@ import java.io.OutputStream;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.io.IoUtils;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.Environment;
@@ -40,12 +44,24 @@ import org.apache.sshd.server.session.ServerSession;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class InvertedShellWrapper implements Command, SessionAware {
+public class InvertedShellWrapper extends AbstractLoggingBean implements Command, SessionAware {
 
     /**
-     * default buffer size for the IO pumps.
+     * Default buffer size for the IO pumps.
      */
-    public static final int DEFAULT_BUFFER_SIZE = 8192;
+    public static final int DEFAULT_BUFFER_SIZE = IoUtils.DEFAULT_COPY_SIZE;
+
+    /**
+     * Value used to control the &quot;busy-wait&quot; sleep time (millis) on
+     * the pumping loop if nothing was pumped - must be <U>positive</U>
+     * @see #DEFAULT_PUMP_SLEEP_TIME
+     */
+    public static final String PUMP_SLEEP_TIME = "inverted-shell-wrapper-pump-sleep";
+
+    /**
+     * Default value for {@link #PUMP_SLEEP_TIME} if none set
+     */
+    public static final long DEFAULT_PUMP_SLEEP_TIME = 1L;
 
     private final InvertedShell shell;
     private final Executor executor;
@@ -58,31 +74,44 @@ public class InvertedShellWrapper implements Command, SessionAware {
     private InputStream shellErr;
     private ExitCallback callback;
     private boolean shutdownExecutor;
+    private long pumpSleepTime = DEFAULT_PUMP_SLEEP_TIME;
 
+    /**
+     * Auto-allocates an {@link Executor} in order to create the streams pump thread
+     * and uses the {@link #DEFAULT_BUFFER_SIZE}
+     *
+     * @param shell The {@link InvertedShell}
+     * @see #InvertedShellWrapper(InvertedShell, int)
+     */
     public InvertedShellWrapper(InvertedShell shell) {
         this(shell, DEFAULT_BUFFER_SIZE);
     }
 
-    public InvertedShellWrapper(InvertedShell shell, Executor executor) {
-        this(shell, executor, DEFAULT_BUFFER_SIZE);
-    }
-
+    /**
+     * Auto-allocates an {@link Executor} in order to create the streams pump thread
+     *
+     * @param shell      The {@link InvertedShell}
+     * @param bufferSize Buffer size to use - must be above min. size ({@link Byte#SIZE})
+     * @see #InvertedShellWrapper(InvertedShell, Executor, boolean, int)
+     */
     public InvertedShellWrapper(InvertedShell shell, int bufferSize) {
-        this(shell,
-                ThreadUtils.newSingleThreadExecutor("shell[" + Integer.toHexString(shell.hashCode()) + "]"),
-                true,
-                bufferSize);
-    }
-
-    public InvertedShellWrapper(InvertedShell shell, Executor executor, int bufferSize) {
-        this(shell, executor, false, bufferSize);
+        this(shell, null, true, bufferSize);
     }
 
+    /**
+     * @param shell            The {@link InvertedShell}
+     * @param executor         The {@link Executor} to use in order to create the streams pump thread.
+     *                         If {@code null} one is auto-allocated and shutdown when wrapper is {@link #destroy()}-ed.
+     * @param shutdownExecutor If {@code true} the executor is shut down when shell wrapper is {@link #destroy()}-ed.
+     *                         Ignored if executor service auto-allocated
+     * @param bufferSize       Buffer size to use - must be above min. size ({@link Byte#SIZE})
+     */
     public InvertedShellWrapper(InvertedShell shell, Executor executor, boolean shutdownExecutor, int bufferSize) {
-        this.shell = shell;
-        this.executor = executor;
+        this.shell = ValidateUtils.checkNotNull(shell, "No shell");
+        this.executor = (executor == null) ? ThreadUtils.newSingleThreadExecutor("shell[0x" + Integer.toHexString(shell.hashCode()) + "]") : executor;
+        ValidateUtils.checkTrue(bufferSize > Byte.SIZE, "Copy buffer size too small: %d", bufferSize);
         this.bufferSize = bufferSize;
-        this.shutdownExecutor = shutdownExecutor;
+        this.shutdownExecutor = (executor == null) ? true : shutdownExecutor;
     }
 
     @Override
@@ -107,6 +136,9 @@ public class InvertedShellWrapper implements Command, SessionAware {
 
     @Override
     public void setSession(ServerSession session) {
+        pumpSleepTime = PropertyResolverUtils.getLongProperty(session, PUMP_SLEEP_TIME, DEFAULT_PUMP_SLEEP_TIME);
+        ValidateUtils.checkTrue(pumpSleepTime > 0L, "Invalid " + PUMP_SLEEP_TIME + ": %d", pumpSleepTime);
+
         if (shell instanceof SessionAware) {
             ((SessionAware) shell).setSession(session);
         }
@@ -130,7 +162,7 @@ public class InvertedShellWrapper implements Command, SessionAware {
     @Override
     public synchronized void destroy() {
         shell.destroy();
-        if (shutdownExecutor && executor instanceof ExecutorService) {
+        if (shutdownExecutor && (executor instanceof ExecutorService)) {
             ((ExecutorService) executor).shutdown();
         }
     }
@@ -140,8 +172,7 @@ public class InvertedShellWrapper implements Command, SessionAware {
             // Use a single thread to correctly sequence the output and error streams.
             // If any bytes are available from the output stream, send them first, then
             // check the error stream, or wait until more data is available.
-            byte[] buffer = new byte[bufferSize];
-            for (;;) {
+            for (byte[] buffer = new byte[bufferSize];;) {
                 if (pumpStream(in, shellIn, buffer)) {
                     continue;
                 }
@@ -158,15 +189,20 @@ public class InvertedShellWrapper implements Command, SessionAware {
                 // Sleep a bit.  This is not very good, as it consumes CPU, but the
                 // input streams are not selectable for nio, and any other blocking
                 // method would consume at least two threads
-                Thread.sleep(1);
+                Thread.sleep(pumpSleepTime);
             }
         } catch (Exception e) {
             shell.destroy();
-            callback.onExit(shell.exitValue());
+
+            int exitValue = shell.exitValue();
+            if (log.isDebugEnabled()) {
+                log.debug(e.getClass().getSimpleName() + " while pumping the streams (exit=" + exitValue + "): " + e.getMessage(), e);
+            }
+            callback.onExit(exitValue, e.getClass().getSimpleName());
         }
     }
 
-    private boolean pumpStream(InputStream in, OutputStream out, byte[] buffer) throws IOException {
+    protected boolean pumpStream(InputStream in, OutputStream out, byte[] buffer) throws IOException {
         int available = in.available();
         if (available > 0) {
             int len = in.read(buffer);
@@ -180,5 +216,4 @@ public class InvertedShellWrapper implements Command, SessionAware {
         }
         return false;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2f95f7b9/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShell.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShell.java b/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShell.java
new file mode 100644
index 0000000..1fa9b6f
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShell.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.shell;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.io.IoUtils;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+
+/**
+ * Bridges the I/O streams between the SSH command and the process that executes it
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class ProcessShell extends AbstractLoggingBean implements InvertedShell {
+    private final Set<TtyOptions> ttyOptions;
+    private final String[] command;
+    private String cmdValue;
+    private Process process;
+    private TtyFilterOutputStream in;
+    private TtyFilterInputStream out;
+    private TtyFilterInputStream err;
+
+    public ProcessShell(Collection<TtyOptions> ttyOptions, String ... command) {
+        // we create a copy of the options so as to avoid concurrent modifications
+        this.ttyOptions = GenericUtils.of(ttyOptions);
+        // we clone the original array so as not to change it
+        this.command = ValidateUtils.checkNotNullAndNotEmpty(command, "No process shell command(s)").clone();
+        this.cmdValue = GenericUtils.join(command, ' ');
+    }
+
+    @Override
+    public void start(Map<String, String> env) throws IOException {
+        for (int i = 0; i < command.length; i++) {
+            String cmd = command[i];
+            if ("$USER".equals(cmd)) {
+                cmd = env.get("USER");
+                command[i] = cmd;
+                cmdValue = GenericUtils.join(command, ' ');
+            }
+        }
+
+        ProcessBuilder builder = new ProcessBuilder(command);
+        if (GenericUtils.size(env) > 0) {
+            try {
+                Map<String, String> procEnv = builder.environment();
+                procEnv.putAll(env);
+            } catch (Exception e) {
+                log.warn("Could not set environment for command=" + cmdValue, e);
+            }
+        }
+
+        log.info("Starting shell with command: '{}' and env: {}", builder.command(), builder.environment());
+        process = builder.start();
+        out = new TtyFilterInputStream(process.getInputStream(), ttyOptions);
+        err = new TtyFilterInputStream(process.getErrorStream(), ttyOptions);
+        in = new TtyFilterOutputStream(process.getOutputStream(), err, ttyOptions);
+    }
+
+    @Override
+    public OutputStream getInputStream() {
+        return in;
+    }
+
+    @Override
+    public InputStream getOutputStream() {
+        return out;
+    }
+
+    @Override
+    public InputStream getErrorStream() {
+        return err;
+    }
+
+    @Override
+    public boolean isAlive() {
+        // TODO in JDK-8 call process.isAlive()
+        try {
+            process.exitValue();
+            return false;
+        } catch (IllegalThreadStateException e) {
+            return true;
+        }
+    }
+
+    @Override
+    public int exitValue() {
+        // TODO in JDK-8 call process.isAlive()
+        if (isAlive()) {
+            try {
+                return process.waitFor();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return process.exitValue();
+        }
+    }
+
+    @Override
+    public void destroy() {
+        // NOTE !!! DO NOT NULL-IFY THE PROCESS SINCE "exitValue" is called subsequently
+        if (process != null) {
+            log.debug("Destroy process for " + cmdValue);
+            process.destroy();
+        }
+
+        IOException e = IoUtils.closeQuietly(getInputStream(), getOutputStream(), getErrorStream());
+        if (e != null) {
+            if (log.isDebugEnabled()) {
+                log.debug(e.getClass().getSimpleName() + " while destroy streams of '" + cmdValue + "': " + e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return GenericUtils.isEmpty(cmdValue) ? super.toString() : cmdValue;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2f95f7b9/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShellFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShellFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShellFactory.java
index 94755fc..b29c46b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShellFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/shell/ProcessShellFactory.java
@@ -18,22 +18,10 @@
  */
 package org.apache.sshd.server.shell;
 
-import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Map;
 import java.util.Set;
-
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.util.GenericUtils;
-import org.apache.sshd.common.util.OsUtils;
-import org.apache.sshd.common.util.buffer.Buffer;
-import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
 import org.apache.sshd.server.Command;
 
@@ -44,33 +32,6 @@ import org.apache.sshd.server.Command;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class ProcessShellFactory extends AbstractLoggingBean implements Factory<Command> {
-
-    public enum TtyOptions {
-        Echo,
-        INlCr,
-        ICrNl,
-        ONlCr,
-        OCrNl;
-
-        public static final Set<TtyOptions> LINUX_OPTIONS =
-                Collections.unmodifiableSet(EnumSet.of(TtyOptions.ONlCr));
-
-        public static final Set<TtyOptions> WINDOWS_OPTIONS =
-                Collections.unmodifiableSet(EnumSet.of(TtyOptions.Echo, TtyOptions.ICrNl, TtyOptions.ONlCr));
-
-        public static Set<TtyOptions> resolveDefaultTtyOptions() {
-            return resolveTtyOptions(OsUtils.isWin32());
-        }
-
-        public static Set<TtyOptions> resolveTtyOptions(boolean isWin32) {
-            if (isWin32) {
-                return WINDOWS_OPTIONS;
-            } else {
-                return LINUX_OPTIONS;
-            }
-        }
-    }
-
     private String[] command;
     private final Set<TtyOptions> ttyOptions;
 
@@ -84,7 +45,7 @@ public class ProcessShellFactory extends AbstractLoggingBean implements Factory<
 
     public ProcessShellFactory(String[] command, Collection<TtyOptions> ttyOptions) {
         this.command = command;
-        this.ttyOptions = GenericUtils.isEmpty(ttyOptions) ? Collections.<TtyOptions>emptySet() : GenericUtils.of(ttyOptions);
+        this.ttyOptions = GenericUtils.of(ttyOptions);
     }
 
     public String[] getCommand() {
@@ -97,181 +58,10 @@ public class ProcessShellFactory extends AbstractLoggingBean implements Factory<
 
     @Override
     public Command create() {
-        return new InvertedShellWrapper(new ProcessShell());
+        return new InvertedShellWrapper(createInvertedShell());
     }
 
-    public class ProcessShell implements InvertedShell {
-
-        private Process process;
-        private TtyFilterOutputStream in;
-        private TtyFilterInputStream out;
-        private TtyFilterInputStream err;
-
-        public ProcessShell() {
-            super();
-        }
-
-        @SuppressWarnings("synthetic-access")
-        @Override
-        public void start(Map<String, String> env) throws IOException {
-            String[] cmds = new String[command.length];
-            for (int i = 0; i < cmds.length; i++) {
-                if ("$USER".equals(command[i])) {
-                    cmds[i] = env.get("USER");
-                } else {
-                    cmds[i] = command[i];
-                }
-            }
-            ProcessBuilder builder = new ProcessBuilder(cmds);
-            if (GenericUtils.size(env) > 0) {
-                try {
-                    Map<String, String> procEnv = builder.environment();
-                    procEnv.putAll(env);
-                } catch (Exception e) {
-                    log.warn("Could not set environment for command=" + GenericUtils.join(cmds, ' '), e);
-                }
-            }
-
-            log.info("Starting shell with command: '{}' and env: {}", builder.command(), builder.environment());
-            process = builder.start();
-            out = new TtyFilterInputStream(process.getInputStream());
-            err = new TtyFilterInputStream(process.getErrorStream());
-            in = new TtyFilterOutputStream(process.getOutputStream(), err);
-        }
-
-        @Override
-        public OutputStream getInputStream() {
-            return in;
-        }
-
-        @Override
-        public InputStream getOutputStream() {
-            return out;
-        }
-
-        @Override
-        public InputStream getErrorStream() {
-            return err;
-        }
-
-        @Override
-        public boolean isAlive() {
-            try {
-                process.exitValue();
-                return false;
-            } catch (IllegalThreadStateException e) {
-                return true;
-            }
-        }
-
-        @Override
-        public int exitValue() {
-            try {
-                return process.waitFor();
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public void destroy() {
-            if (process != null) {
-                try {
-                    process.destroy();
-                } finally {
-                    process = null;
-                }
-            }
-        }
-
-        protected class TtyFilterInputStream extends FilterInputStream {
-            private Buffer buffer;
-            private int lastChar;
-
-            public TtyFilterInputStream(InputStream in) {
-                super(in);
-                buffer = new ByteArrayBuffer(32);
-            }
-
-            synchronized void write(int c) {
-                buffer.putByte((byte) c);
-            }
-
-            synchronized void write(byte[] buf, int off, int len) {
-                buffer.putBytes(buf, off, len);
-            }
-
-            @Override
-            public int available() throws IOException {
-                return super.available() + buffer.available();
-            }
-
-            @SuppressWarnings("synthetic-access")
-            @Override
-            public synchronized int read() throws IOException {
-                int c;
-                if (buffer.available() > 0) {
-                    c = buffer.getByte();
-                    buffer.compact();
-                } else {
-                    c = super.read();
-                }
-                if (c == '\n' && ttyOptions.contains(TtyOptions.ONlCr) && lastChar != '\r') {
-                    c = '\r';
-                    Buffer buf = new ByteArrayBuffer();
-                    buf.putByte((byte) '\n');
-                    buf.putBuffer(buffer);
-                    buffer = buf;
-                } else if (c == '\r' && ttyOptions.contains(TtyOptions.OCrNl)) {
-                    c = '\n';
-                }
-                lastChar = c;
-                return c;
-            }
-
-            @Override
-            public synchronized int read(byte[] b, int off, int len) throws IOException {
-                if (buffer.available() == 0) {
-                    int nb = super.read(b, off, len);
-                    buffer.putRawBytes(b, off, nb);
-                }
-                int nb = 0;
-                while (nb < len && buffer.available() > 0) {
-                    b[off + nb++] = (byte) read();
-                }
-                return nb;
-            }
-        }
-
-        protected class TtyFilterOutputStream extends FilterOutputStream {
-            private TtyFilterInputStream echo;
-
-            public TtyFilterOutputStream(OutputStream out, TtyFilterInputStream echo) {
-                super(out);
-                this.echo = echo;
-            }
-
-            @SuppressWarnings("synthetic-access")
-            @Override
-            public void write(int c) throws IOException {
-                if (c == '\n' && ttyOptions.contains(TtyOptions.INlCr)) {
-                    c = '\r';
-                } else if (c == '\r' && ttyOptions.contains(TtyOptions.ICrNl)) {
-                    c = '\n';
-                }
-                super.write(c);
-                if (ttyOptions.contains(TtyOptions.Echo)) {
-                    echo.write(c);
-                }
-            }
-
-            @Override
-            public void write(byte[] b, int off, int len) throws IOException {
-                for (int i = off; i < len; i++) {
-                    write(b[i]);
-                }
-            }
-        }
+    protected InvertedShell createInvertedShell() {
+        return new ProcessShell(ttyOptions, getCommand());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2f95f7b9/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyFilterInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyFilterInputStream.java b/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyFilterInputStream.java
new file mode 100644
index 0000000..fd5ffcb
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyFilterInputStream.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.shell;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+
+/**
+ * Handles the input while taking into account the {@link TtyOptions} for
+ * handling CR / LF
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class TtyFilterInputStream extends FilterInputStream {
+    private final Set<TtyOptions> ttyOptions;
+    private Buffer buffer;
+    private int lastChar;
+
+    public TtyFilterInputStream(InputStream in, Collection<TtyOptions> ttyOptions) {
+        super(in);
+        // we create a copy of the options so as to avoid concurrent modifications
+        this.ttyOptions = GenericUtils.of(ttyOptions);
+        this.buffer = new ByteArrayBuffer(32);
+    }
+
+    public synchronized void write(int c) {
+        buffer.putByte((byte) c);
+    }
+
+    public synchronized void write(byte[] buf, int off, int len) {
+        buffer.putBytes(buf, off, len);
+    }
+
+    @Override
+    public int available() throws IOException {
+        return super.available() + buffer.available();
+    }
+
+    @Override
+    public synchronized int read() throws IOException {
+        int c;
+        if (buffer.available() > 0) {
+            c = buffer.getByte();
+            buffer.compact();
+        } else {
+            c = super.read();
+        }
+
+        if ((c == '\n') && ttyOptions.contains(TtyOptions.ONlCr) && (lastChar != '\r')) {
+            c = '\r';
+            Buffer buf = new ByteArrayBuffer();
+            buf.putByte((byte) '\n');
+            buf.putBuffer(buffer);
+            buffer = buf;
+        } else if ((c == '\r') && ttyOptions.contains(TtyOptions.OCrNl)) {
+            c = '\n';
+        }
+        lastChar = c;
+        return c;
+    }
+
+    @Override
+    public synchronized int read(byte[] b, int off, int len) throws IOException {
+        if (buffer.available() == 0) {
+            int nb = super.read(b, off, len);
+            buffer.putRawBytes(b, off, nb);
+        }
+        int nb = 0;
+        while ((nb < len) && (buffer.available() > 0)) {
+            b[off + nb++] = (byte) read();
+        }
+        return nb;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2f95f7b9/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyFilterOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyFilterOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyFilterOutputStream.java
new file mode 100644
index 0000000..6461ef0
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyFilterOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.shell;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+
+/**
+ * Handles the output stream while taking care of the {@link TtyOptions} for CR / LF
+ * and ECHO settings
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class TtyFilterOutputStream extends FilterOutputStream {
+    private final Set<TtyOptions> ttyOptions;
+    private TtyFilterInputStream echo;
+
+    public TtyFilterOutputStream(OutputStream out, TtyFilterInputStream echo, Collection<TtyOptions> ttyOptions) {
+        super(out);
+        // we create a copy of the options so as to avoid concurrent modifications
+        this.ttyOptions = GenericUtils.of(ttyOptions);
+        this.echo = this.ttyOptions.contains(TtyOptions.Echo) ? ValidateUtils.checkNotNull(echo, "No echo stream") : echo;
+    }
+
+    @Override
+    public void write(int c) throws IOException {
+        if ((c == '\n') && ttyOptions.contains(TtyOptions.INlCr)) {
+            c = '\r';
+        } else if ((c == '\r') && ttyOptions.contains(TtyOptions.ICrNl)) {
+            c = '\n';
+        }
+        super.write(c);
+        if (ttyOptions.contains(TtyOptions.Echo)) {
+            echo.write(c);
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        for (int i = off; i < len; i++) {
+            write(b[i]);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2f95f7b9/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyOptions.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyOptions.java b/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyOptions.java
new file mode 100644
index 0000000..0334631
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/server/shell/TtyOptions.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.shell;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.sshd.common.util.OsUtils;
+
+/**
+ * Options controlling the I/O streams behavior
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public enum TtyOptions {
+    Echo,
+    INlCr,
+    ICrNl,
+    ONlCr,
+    OCrNl;
+
+    public static final Set<TtyOptions> LINUX_OPTIONS =
+            Collections.unmodifiableSet(EnumSet.of(TtyOptions.ONlCr));
+
+    public static final Set<TtyOptions> WINDOWS_OPTIONS =
+            Collections.unmodifiableSet(EnumSet.of(TtyOptions.Echo, TtyOptions.ICrNl, TtyOptions.ONlCr));
+
+    public static Set<TtyOptions> resolveDefaultTtyOptions() {
+        return resolveTtyOptions(OsUtils.isWin32());
+    }
+
+    public static Set<TtyOptions> resolveTtyOptions(boolean isWin32) {
+        if (isWin32) {
+            return WINDOWS_OPTIONS;
+        } else {
+            return LINUX_OPTIONS;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2f95f7b9/sshd-core/src/test/java/org/apache/sshd/server/shell/InvertedShellWrapperTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/shell/InvertedShellWrapperTest.java b/sshd-core/src/test/java/org/apache/sshd/server/shell/InvertedShellWrapperTest.java
index fec3bda..53955da 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/shell/InvertedShellWrapperTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/shell/InvertedShellWrapperTest.java
@@ -20,7 +20,12 @@ package org.apache.sshd.server.shell;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 
 import org.apache.sshd.util.test.BaseTestSupport;
 import org.apache.sshd.util.test.BogusEnvironment;
@@ -32,28 +37,124 @@ import org.junit.runners.MethodSorters;
 
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class InvertedShellWrapperTest extends BaseTestSupport {
+    public InvertedShellWrapperTest() {
+        super();
+    }
 
     @Test
     public void testStreamsAreFlushedBeforeClosing() throws Exception {
         BogusInvertedShell shell = newShell("out", "err");
         shell.setAlive(false);
+
         try (ByteArrayInputStream in = new ByteArrayInputStream("in".getBytes(StandardCharsets.UTF_8));
              ByteArrayOutputStream out = new ByteArrayOutputStream(50);
              ByteArrayOutputStream err = new ByteArrayOutputStream()) {
 
             InvertedShellWrapper wrapper = new InvertedShellWrapper(shell);
-            wrapper.setInputStream(in);
-            wrapper.setOutputStream(out);
-            wrapper.setErrorStream(err);
-            wrapper.setExitCallback(new BogusExitCallback());
-            wrapper.start(new BogusEnvironment());
-
-            wrapper.pumpStreams();
-
-            // check the streams were flushed before exiting
-            assertEquals("in", shell.getInputStream().toString());
-            assertEquals("out", out.toString());
-            assertEquals("err", err.toString());
+            try {
+                wrapper.setInputStream(in);
+                wrapper.setOutputStream(out);
+                wrapper.setErrorStream(err);
+                wrapper.setExitCallback(new BogusExitCallback());
+                wrapper.start(new BogusEnvironment());
+
+                wrapper.pumpStreams();
+
+                // check the streams were flushed before exiting
+                assertEquals("stdin", "in", shell.getInputStream().toString());
+                assertEquals("stdout", "out", out.toString());
+                assertEquals("stderr", "err", err.toString());
+            } finally {
+                wrapper.destroy();
+            }
+        }
+    }
+
+    @Test   // see SSHD-570
+    public void testExceptionWhilePumpStreams() throws Exception {
+        final BogusInvertedShell bogusShell = newShell("out", "err");
+        bogusShell.setAlive(false);
+
+        final int DESTROYED_EXIT_VALUE = 7365;
+        InvertedShell shell = new InvertedShell() {
+            private boolean destroyed;
+
+            @Override
+            public void start(Map<String, String> env) throws IOException {
+                bogusShell.start(env);
+            }
+
+            @Override
+            public boolean isAlive() {
+                return bogusShell.isAlive();
+            }
+
+            @Override
+            public InputStream getOutputStream() {
+                return bogusShell.getOutputStream();
+            }
+
+            @Override
+            public OutputStream getInputStream() {
+                return bogusShell.getInputStream();
+            }
+
+            @Override
+            public InputStream getErrorStream() {
+                return bogusShell.getErrorStream();
+            }
+
+            @Override
+            public int exitValue() {
+                return destroyed ? DESTROYED_EXIT_VALUE : bogusShell.exitValue();
+            }
+
+            @Override
+            public void destroy() {
+                bogusShell.destroy();
+                bogusShell.setAlive(false);
+                destroyed = true;
+            }
+        };
+
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+             ByteArrayOutputStream err = new ByteArrayOutputStream();
+             InputStream stdin = new InputStream() {
+                private final byte[] data = getCurrentTestName().getBytes(StandardCharsets.UTF_8);
+                private int readPos;
+
+                @Override
+                public int read() throws IOException {
+                    if (readPos >= data.length) {
+                        throw new EOFException("Data exhausted");
+                    }
+
+                    return data[readPos++];
+                }
+
+                @Override
+                public int available() throws IOException {
+                    return data.length;
+                }
+             }) {
+
+            BogusExitCallback exitCallback = new BogusExitCallback();
+            InvertedShellWrapper wrapper = new InvertedShellWrapper(shell);
+            try {
+                wrapper.setInputStream(stdin);
+                wrapper.setOutputStream(out);
+                wrapper.setErrorStream(err);
+
+                wrapper.setExitCallback(exitCallback);
+                wrapper.start(new BogusEnvironment());
+
+                wrapper.pumpStreams();
+            } finally {
+                wrapper.destroy();
+            }
+
+            assertEquals("Mismatched exit value", DESTROYED_EXIT_VALUE, exitCallback.getExitValue());
+            assertEquals("Mismatched exit message", EOFException.class.getSimpleName(), exitCallback.getExitMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2f95f7b9/sshd-core/src/test/java/org/apache/sshd/util/test/BogusExitCallback.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusExitCallback.java b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusExitCallback.java
index 7e2a88f..a4f1ff6 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusExitCallback.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusExitCallback.java
@@ -23,19 +23,34 @@ import org.apache.sshd.server.ExitCallback;
 public class BogusExitCallback implements ExitCallback {
 
     private boolean exited;
+    private int exitValue;
+    private String exitMessage;
+
+    public BogusExitCallback() {
+        super();
+    }
 
     @Override
     public void onExit(int exitValue) {
-        this.exited = true;
+        onExit(exitValue, String.valueOf(exitValue));
     }
 
     @Override
     public void onExit(int exitValue, String exitMessage) {
         this.exited = true;
+        this.exitValue = exitValue;
+        this.exitMessage = exitMessage;
     }
 
     public boolean isExited() {
         return exited;
     }
 
+    public int getExitValue() {
+        return exitValue;
+    }
+
+    public String getExitMessage() {
+        return exitMessage;
+    }
 }