You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/05/26 14:08:07 UTC

[1/2] mina-sshd git commit: [SSHD-476] Allow direct SCP file upload/download to/from stream

Repository: mina-sshd
Updated Branches:
  refs/heads/master d7939e253 -> 9b60dcc53


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpReceiveLineHandler.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpReceiveLineHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpReceiveLineHandler.java
new file mode 100644
index 0000000..3f38e01
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpReceiveLineHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.scp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ScpReceiveLineHandler {
+    /**
+     * @param line Received SCP input line
+     * @param isDir Does the input line refer to a directory
+     * @param time The received {@link ScpTimestamp} - may be {@code null}
+     * @throws IOException If failed to process the line
+     */
+    void process(String line, boolean isDir, ScpTimestamp time) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpSourceStreamResolver.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpSourceStreamResolver.java b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpSourceStreamResolver.java
new file mode 100644
index 0000000..6749a66
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpSourceStreamResolver.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.scp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Collection;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ScpSourceStreamResolver {
+    /**
+     * @return The uploaded file name
+     * @throws IOException If failed to resolve the name
+     */
+    String getFileName() throws IOException;
+
+    /**
+     * @return The {@link Path} to use when invoking the {@link ScpTransferEventListener}
+     */
+    Path getEventListenerFilePath();
+
+    /**
+     * @return The permissions to be used for uploading a file
+     * @throws IOException If failed to generate the required permissions
+     */
+    Collection<PosixFilePermission> getPermissions() throws IOException;
+
+    /**
+     * @return The {@link ScpTimestamp} to use for uploading the file
+     * if {@code null} then no need to send this information
+     * @throws IOException If failed to generate the required data
+     */
+    ScpTimestamp getTimestamp() throws IOException;
+    
+    /**
+     * @return An estimated size of the expected number of bytes to be uploaded.
+     * If non-positive then assumed to be unknown.
+     * @throws IOException If failed to generate an estimate
+     */
+    long getSize() throws IOException;
+    
+    /**
+     * @return The {@link InputStream} containing the data to be uploaded
+     * @throws IOException If failed to create the stream
+     */
+    InputStream resolveSourceStream() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTargetStreamResolver.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTargetStreamResolver.java b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTargetStreamResolver.java
new file mode 100644
index 0000000..f3db61f
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTargetStreamResolver.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.scp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Set;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ScpTargetStreamResolver {
+    /**
+     * Called when receiving a file in order to obtain an output stream
+     * for the incoming data
+     * @param name File name as received from remote site
+     * @param length Number of bytes expected to receive
+     * @param perms The {@link Set} of {@link PosixFilePermission} expected
+     * @return The {@link OutputStream} to write the incoming data
+     * @throws IOException If failed to create the stream
+     */
+    OutputStream resolveTargetStream(String name, long length, Set<PosixFilePermission> perms) throws IOException;
+
+    /**
+     * @return The {@link Path} to use when invoking the {@link ScpTransferEventListener}
+     */
+    Path getEventListenerFilePath();
+
+    /**
+     * Called after successful reception of the data (and after closing the stream)
+     * @param name File name as received from remote site
+     * @param preserve If {@code true} then the resolver should attempt to preserve
+     * the specified permissions and timestamp
+     * @param perms The {@link Set} of {@link PosixFilePermission} expected
+     * @param time If not {@code null} then the required timestamp(s) on the
+     * incoming data
+     * @throws IOException If failed to post-process the incoming data
+     */
+    void postProcessReceivedData(String name, boolean preserve, Set<PosixFilePermission> perms, ScpTimestamp time) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTimestamp.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTimestamp.java b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTimestamp.java
new file mode 100644
index 0000000..eccf71a
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpTimestamp.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.scp;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.common.util.GenericUtils;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class ScpTimestamp {
+    public final long lastModifiedTime;
+    public final long lastAccessTime;
+
+    public ScpTimestamp(long modTime, long accTime) {
+        lastModifiedTime = modTime;
+        lastAccessTime = accTime;
+    }
+    
+    @Override
+    public String toString() {
+        return "modified=" + new Date(lastModifiedTime)
+             + ";accessed=" + new Date(lastAccessTime)
+             ;
+    }
+
+    /**
+     * @param line The time specification - format:
+     * {@code T<mtime-sec> <mtime-micros> <atime-sec> <atime-micros>}
+     * where specified times are since UTC 
+     * @return The {@link ScpTimestamp} value with the timestamps converted to
+     * <U>milliseconds</U>
+     * @throws NumberFormatException if bad numerical values - <B>Note:</B>
+     * does not check if 1st character is 'T'.
+     */
+    public static ScpTimestamp parseTime(String line) throws NumberFormatException {
+        String[] numbers = GenericUtils.split(line.substring(1), ' ');
+        return new ScpTimestamp(TimeUnit.SECONDS.toMillis(Long.parseLong(numbers[0])),
+                                TimeUnit.SECONDS.toMillis(Long.parseLong(numbers[2])));
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 9df5f4c..98c3f6e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -598,7 +598,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
                     return requestResult.get();
                 }
             } catch (InterruptedException e) {
-                throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+                throw (InterruptedIOException) new InterruptedIOException("Interrupted while waiting for request result").initCause(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/util/io/LimitInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/io/LimitInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/util/io/LimitInputStream.java
new file mode 100644
index 0000000..b4af206
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/io/LimitInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.util.io;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channel;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Reads from another {@link InputStream} up to specified max. length
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class LimitInputStream extends FilterInputStream implements Channel {
+    private final AtomicBoolean open = new AtomicBoolean(true);
+    private long remaining;
+
+    public LimitInputStream(InputStream in, long length) {
+        super(in);
+        remaining = length;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open.get();
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!isOpen()) {
+            throw new IOException("read() - stream is closed (remaining=" + remaining + ")");
+        }
+
+        if (remaining > 0) {
+            remaining--;
+            return super.read();
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("read(len=" + len + ") stream is closed (remaining=" + remaining + ")");
+        }
+
+        int nb = len;
+        if (nb > remaining) {
+            nb = (int) remaining;
+        }
+        if (nb > 0) {
+            int read = super.read(b, off, nb);
+            remaining -= read;
+            return read;
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (!isOpen()) {
+            throw new IOException("skip(" + n + ") stream is closed (remaining=" + remaining + ")");
+        }
+
+        long skipped = super.skip(n);
+        remaining -= skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (!isOpen()) {
+            throw new IOException("available() stream is closed (remaining=" + remaining + ")");
+        }
+
+        int av = super.available();
+        if (av > remaining) {
+            return (int) remaining;
+        } else {
+            return av;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do not close the original input stream since it serves for ACK(s)
+        if (open.getAndSet(false)) {
+            return; // debug breakpoint
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java
index 1a96157..92ba0d5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/command/ScpCommandFactory.java
@@ -41,11 +41,6 @@ import org.apache.sshd.server.CommandFactory;
  */
 public class ScpCommandFactory implements CommandFactory, Cloneable, ExecutorServiceConfigurer {
     /**
-     * Command prefix used to identify SCP commands
-     */
-    public static final String SCP_COMMAND_PREFIX = "scp";
-
-    /**
      * A useful {@link ObjectBuilder} for {@link ScpCommandFactory}
      */
     public static class Builder implements ObjectBuilder<ScpCommandFactory> {
@@ -227,7 +222,7 @@ public class ScpCommandFactory implements CommandFactory, Cloneable, ExecutorSer
      */
     @Override
     public Command createCommand(String command) {
-        if (command.startsWith(SCP_COMMAND_PREFIX)) {
+        if (command.startsWith(ScpHelper.SCP_COMMAND_PREFIX)) {
             return new ScpCommand(command, getExecutorService(), isShutdownOnExit(), getSendBufferSize(), getReceiveBufferSize(), listenerProxy);
         }
 
@@ -236,7 +231,7 @@ public class ScpCommandFactory implements CommandFactory, Cloneable, ExecutorSer
             return factory.createCommand(command);
         }
 
-        throw new IllegalArgumentException("Unknown command, does not begin with '" + SCP_COMMAND_PREFIX + "': " + command);
+        throw new IllegalArgumentException("Unknown command, does not begin with '" + ScpHelper.SCP_COMMAND_PREFIX + "': " + command);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/test/java/org/apache/sshd/client/ScpTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ScpTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ScpTest.java
index 2fb871a..0a5406b 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ScpTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ScpTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sshd.client;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
@@ -31,6 +32,7 @@ import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -38,9 +40,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.SshClient;
 import org.apache.sshd.SshServer;
+import org.apache.sshd.client.scp.ScpClient;
 import org.apache.sshd.common.Session;
 import org.apache.sshd.common.file.FileSystemFactory;
 import org.apache.sshd.common.file.root.RootedFileSystemProvider;
+import org.apache.sshd.common.scp.ScpHelper;
 import org.apache.sshd.common.scp.ScpTransferEventListener;
 import org.apache.sshd.common.util.OsUtils;
 import org.apache.sshd.server.command.ScpCommandFactory;
@@ -51,7 +55,6 @@ import org.apache.sshd.util.JSchLogger;
 import org.apache.sshd.util.SimpleUserInfo;
 import org.apache.sshd.util.Utils;
 import org.junit.After;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -141,7 +144,7 @@ public class ScpTest extends BaseTestSupport {
                     ScpClient scp = createScpClient(session);
                     Path targetPath = detectTargetFolder().toPath();
                     Path parentPath = targetPath.getParent();
-                    Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+                    Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
                     Utils.deleteRecursive(scpRoot);
 
                     Path localDir = assertHierarchyTargetFolderExists(scpRoot.resolve("local"));
@@ -186,7 +189,7 @@ public class ScpTest extends BaseTestSupport {
 
                     Path targetPath = detectTargetFolder().toPath();
                     Path parentPath = targetPath.getParent();
-                    Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+                    Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
                     Utils.deleteRecursive(scpRoot);
 
                     Path localDir = assertHierarchyTargetFolderExists(scpRoot.resolve("local"));
@@ -210,7 +213,7 @@ public class ScpTest extends BaseTestSupport {
     @Test
     public void testScpUploadZeroLengthFile() throws Exception {
         Path targetPath = detectTargetFolder().toPath();
-        Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+        Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
         Path localDir = assertHierarchyTargetFolderExists(scpRoot.resolve("local"));
         Path remoteDir = assertHierarchyTargetFolderExists(scpRoot.resolve("remote"));
         Path zeroLocal = localDir.resolve(getCurrentTestName());
@@ -249,7 +252,7 @@ public class ScpTest extends BaseTestSupport {
     @Test
     public void testScpDownloadZeroLengthFile() throws Exception {
         Path targetPath = detectTargetFolder().toPath();
-        Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+        Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
         Path localDir = assertHierarchyTargetFolderExists(scpRoot.resolve("local"));
         Path remoteDir = assertHierarchyTargetFolderExists(scpRoot.resolve("remote"));
         Path zeroLocal = localDir.resolve(getCurrentTestName());
@@ -299,7 +302,7 @@ public class ScpTest extends BaseTestSupport {
 
                     Path targetPath = detectTargetFolder().toPath();
                     Path parentPath = targetPath.getParent();
-                    Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+                    Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
                     Utils.deleteRecursive(scpRoot);
 
                     Path localDir = assertHierarchyTargetFolderExists(scpRoot.resolve("local"));
@@ -350,7 +353,7 @@ public class ScpTest extends BaseTestSupport {
                     ScpClient scp = createScpClient(session);
                     Path targetPath = detectTargetFolder().toPath();
                     Path parentPath = targetPath.getParent();
-                    Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+                    Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
                     Utils.deleteRecursive(scpRoot);
 
                     Path localDir = assertHierarchyTargetFolderExists(scpRoot.resolve("local"));
@@ -432,7 +435,7 @@ public class ScpTest extends BaseTestSupport {
                 ScpClient scp = createScpClient(session);
                 Path targetPath = detectTargetFolder().toPath();
                 Path parentPath = targetPath.getParent();
-                Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+                Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
                 Utils.deleteRecursive(scpRoot);
 
                 Path localDir = scpRoot.resolve("local");
@@ -472,7 +475,7 @@ public class ScpTest extends BaseTestSupport {
                 ScpClient scp = createScpClient(session);
                 Path targetPath = detectTargetFolder().toPath();
                 Path parentPath = targetPath.getParent();
-                Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+                Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
                 Utils.deleteRecursive(scpRoot);
 
                 Path localDir = assertHierarchyTargetFolderExists(scpRoot.resolve("local"));
@@ -510,7 +513,7 @@ public class ScpTest extends BaseTestSupport {
                 ScpClient scp = createScpClient(session);
                 Path targetPath = detectTargetFolder().toPath();
                 Path parentPath = targetPath.getParent();
-                Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+                Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
                 Utils.deleteRecursive(scpRoot);
 
                 Path localDir = scpRoot.resolve("local");
@@ -547,13 +550,9 @@ public class ScpTest extends BaseTestSupport {
 
     @Test
     public void testScpNativePreserveAttributes() throws Exception {
-        // Ignore this test if running a Windows system
-        Assume.assumeFalse("Skip test for Windows", OsUtils.isWin32());
-
         try (SshClient client = SshClient.setUpDefaultClient()) {
             client.start();
 
-
             try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).await().getSession()) {
                 session.addPasswordIdentity(getCurrentTestName());
                 session.auth().verify(5L, TimeUnit.SECONDS);
@@ -561,12 +560,13 @@ public class ScpTest extends BaseTestSupport {
                 ScpClient scp = createScpClient(session);
                 Path targetPath = detectTargetFolder().toPath();
                 Path parentPath = targetPath.getParent();
-                Path scpRoot = Utils.resolve(targetPath, "scp", getClass().getSimpleName());
+                Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
                 Utils.deleteRecursive(scpRoot);
 
                 Path localDir = scpRoot.resolve("local");
                 Path localSubDir = assertHierarchyTargetFolderExists(localDir.resolve("dir"));
-                long lastMod = Files.getLastModifiedTime(localSubDir).toMillis() - TimeUnit.DAYS.toMillis(1);
+                // convert everything to seconds since this is the SCP timestamps granularity
+                long lastMod = TimeUnit.MILLISECONDS.toSeconds(Files.getLastModifiedTime(localSubDir).toMillis() - TimeUnit.DAYS.toMillis(1));
                 Path local1 = localDir.resolve(getCurrentTestName() + "-1.txt");
                 byte[] data = writeFile(local1, getCurrentTestName() + "\n");
                 File lclFile1 = local1.toFile();
@@ -587,29 +587,77 @@ public class ScpTest extends BaseTestSupport {
                 assertFileLength(remote1, data.length, 5000);
                 
                 File remFile1 = remote1.toFile();
-                assertEquals("Mismatched uploaded last-modified time for " + remFile1, lastMod, remFile1.lastModified());
+                assertLastModifiedTimeEquals(remFile1, lastMod);
 
                 Path remoteSubDir = remoteDir.resolve(localSubDir.getFileName());
                 Path remoteSub2 = remoteSubDir.resolve(localSub2.getFileName());
                 assertFileLength(remoteSub2, data.length, 5000);
 
                 File remSubFile2 = remoteSub2.toFile();
-                assertEquals("Mismatched uploaded last-modified time for " + remSubFile2, lastMod, remSubFile2.lastModified());
+                assertLastModifiedTimeEquals(remSubFile2, lastMod);
 
                 Utils.deleteRecursive(localDir);
                 Files.createDirectories(localDir);
 
                 scp.download(remotePath + "/*", localDir, ScpClient.Option.Recursive, ScpClient.Option.PreserveAttributes);
                 assertFileLength(local1, data.length, 5000);
-                assertEquals("Mismatched downloaded last-modified time for " + lclFile1, lastMod, lclFile1.lastModified());
+                assertLastModifiedTimeEquals(lclFile1, lastMod);
                 assertFileLength(localSub2, data.length, 5000);
-                assertEquals("Mismatched downloaded last-modified time for " + lclSubFile2, lastMod, lclSubFile2.lastModified());
+                assertLastModifiedTimeEquals(lclSubFile2, lastMod);
+            } finally {
+                client.stop();
+            }
+        }
+    }
+
+    @Test
+    public void testStreamsUploadAndDownload() throws Exception {
+        try (SshClient client = SshClient.setUpDefaultClient()) {
+            client.start();
+
+            try (ClientSession session = client.connect(getCurrentTestName(), "localhost", port).await().getSession()) {
+                session.addPasswordIdentity(getCurrentTestName());
+                session.auth().verify(5L, TimeUnit.SECONDS);
+
+                ScpClient scp = createScpClient(session);
+                Path targetPath = detectTargetFolder().toPath();
+                Path parentPath = targetPath.getParent();
+                Path scpRoot = Utils.resolve(targetPath, ScpHelper.SCP_COMMAND_PREFIX, getClass().getSimpleName());
+                Utils.deleteRecursive(scpRoot);
+
+                Path remoteDir = assertHierarchyTargetFolderExists(scpRoot.resolve("remote"));
+                Path remoteFile = remoteDir.resolve(getCurrentTestName() + ".txt");
+                String remotePath = Utils.resolveRelativeRemotePath(parentPath, remoteFile);
+                byte[] data = (getClass().getName() + "#" + getCurrentTestName()).getBytes();
+                scp.upload(data, remotePath, EnumSet.allOf(PosixFilePermission.class), null);
+
+                byte[] uploaded = Files.readAllBytes(remoteFile);
+                assertArrayEquals("Mismatched uploaded data", data, uploaded);
+
+                byte[] downloaded = scp.downloadBytes(remotePath);
+                assertArrayEquals("Mismatched downloaded data", uploaded, downloaded);
             } finally {
                 client.stop();
             }
         }
     }
 
+    // see http://stackoverflow.com/questions/2717936/file-createnewfile-creates-files-with-last-modified-time-before-actual-creatio
+    // See https://msdn.microsoft.com/en-us/library/ms724290(VS.85).aspx
+    // The NTFS file system delays updates to the last access time for a file by up to 1 hour after the last access
+    private static void assertLastModifiedTimeEquals(File file, long expectedSeconds) {
+        long actualSeconds = TimeUnit.MILLISECONDS.toSeconds(file.lastModified());
+        if (OsUtils.isWin32()) {
+            if (expectedSeconds != actualSeconds) {
+                System.err.append("Mismatched last modified time for ").append(file.getAbsolutePath())
+                          .append(" - expected=").append(String.valueOf(expectedSeconds))
+                          .append(", actual=").println(actualSeconds);
+            }
+        } else {
+            assertEquals("Mismatched last modified time for " + file.getAbsolutePath(), expectedSeconds, actualSeconds);
+        }
+    }
+
     private static byte[] writeFile(Path path, String data) throws IOException {
         try(OutputStream fos = Files.newOutputStream(path)) {
             byte[]  bytes = data.getBytes();
@@ -643,7 +691,7 @@ public class ScpTest extends BaseTestSupport {
             sendFile(unixDir, fileName, data);
             assertFileLength(target, data.length(), 5000);
     
-            sendFileError("target", "scp", data);
+            sendFileError("target", ScpHelper.SCP_COMMAND_PREFIX, data);
     
             readFileError(unixDir);
     
@@ -653,7 +701,7 @@ public class ScpTest extends BaseTestSupport {
             target.delete();
             root.delete();
     
-            sendDir("target", "scp", fileName, data);
+            sendDir("target", ScpHelper.SCP_COMMAND_PREFIX, fileName, data);
             assertFileLength(target, data.length(), 5000);
         } finally {
             session.disconnect();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/test/java/org/apache/sshd/common/util/io/LimitInputStreamTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/util/io/LimitInputStreamTest.java b/sshd-core/src/test/java/org/apache/sshd/common/util/io/LimitInputStreamTest.java
new file mode 100644
index 0000000..c873611
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/common/util/io/LimitInputStreamTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.util.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.sshd.util.BaseTestSupport;
+import org.junit.Test;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class LimitInputStreamTest extends BaseTestSupport {
+    public LimitInputStreamTest() {
+        super();
+    }
+
+    @Test
+    public void testReadLimit() throws IOException {
+        Path targetPath = detectTargetFolder().toPath();
+        Path rootFolder = assertHierarchyTargetFolderExists(targetPath.resolve(getClass().getSimpleName()));
+        Path inputFile = rootFolder.resolve(getCurrentTestName() + ".bin"); 
+        byte[] data = (getClass().getName() + "#" + getCurrentTestName()).getBytes();
+        Files.write(inputFile, data);
+
+        try(InputStream in = Files.newInputStream(inputFile)) {
+            int maxLen = data.length / 2;
+            byte[] expected = new byte[maxLen];
+            System.arraycopy(data, 0, expected, 0, expected.length);
+
+            byte[] actual = new byte[expected.length];
+            try(LimitInputStream limited = new LimitInputStream(in, expected.length)) {
+                assertTrue("Limited stream not marked as open", limited.isOpen());
+                assertEquals("Mismatched initial available data size", expected.length, limited.available());
+
+                int readLen = limited.read(actual);
+                assertEquals("Incomplete actual data read", actual.length, readLen);
+                assertArrayEquals("Mismatched read data", expected, actual);
+                assertEquals("Mismatched remaining available data size", 0, limited.available());
+                
+                readLen = limited.read();
+                assertTrue("Unexpected success to read one more byte: " + readLen, readLen < 0);
+
+                readLen = limited.read(actual);
+                assertTrue("Unexpected success to read extra buffer: " + readLen, readLen < 0);
+                
+                limited.close();
+                assertFalse("Limited stream still marked as open", limited.isOpen());
+                
+                try {
+                    readLen = limited.read();
+                    fail("Unexpected one byte read success after close");
+                } catch(IOException e) {
+                    // expected
+                }
+
+                try {
+                    readLen = limited.read(actual);
+                    fail("Unexpected buffer read success after close: " + readLen);
+                } catch(IOException e) {
+                    // expected
+                }
+
+                try {
+                    readLen = limited.read(actual);
+                    fail("Unexpected buffer read success after close: " + readLen);
+                } catch(IOException e) {
+                    // expected
+                }
+
+                try {
+                    readLen = (int) limited.skip(Byte.SIZE);
+                    fail("Unexpected skip success after close: " + readLen);
+                } catch(IOException e) {
+                    // expected
+                }
+
+                try {
+                    readLen = limited.available();
+                    fail("Unexpected available success after close: " + readLen);
+                } catch(IOException e) {
+                    // expected
+                }
+            }
+            
+            // make sure underlying stream not closed
+            int readLen = in.read(actual);
+            assertEquals("Incomplete extra data read", Math.min(actual.length, data.length - expected.length), readLen);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/test/java/org/apache/sshd/util/Utils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/Utils.java b/sshd-core/src/test/java/org/apache/sshd/util/Utils.java
index 5f53f3e..fd42e38 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/Utils.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/Utils.java
@@ -116,7 +116,15 @@ public class Utils {
             }
         }
 
-        file.delete();
+        // seems that if a file is not writable it cannot be deleted
+        if (!file.canWrite()) {
+            file.setWritable(true, false);
+        }
+        
+        if (!file.delete()) {
+            System.err.append("Failed to delete ").println(file.getAbsolutePath());
+        }
+
         return file;
     }
     
@@ -142,6 +150,10 @@ public class Utils {
         }
         
         try {
+            // seems that if a file is not writable it cannot be deleted
+            if (!Files.isWritable(path)) {
+                path.toFile().setWritable(true, false);
+            }
             Files.delete(path);
         } catch(IOException e) {
             // same logic as deleteRecursive(File) which does not check if deletion succeeded


[2/2] mina-sshd git commit: [SSHD-476] Allow direct SCP file upload/download to/from stream

Posted by lg...@apache.org.
[SSHD-476] Allow direct SCP file upload/download to/from stream


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/9b60dcc5
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/9b60dcc5
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/9b60dcc5

Branch: refs/heads/master
Commit: 9b60dcc53b4b2363b1923aa2e4bb033d3e49f03d
Parents: d7939e2
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Tue May 26 15:07:53 2015 +0300
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Tue May 26 15:07:53 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/sshd/ClientSession.java     |   2 +-
 .../sshd/agent/local/AgentForwardedChannel.java |   2 +-
 .../org/apache/sshd/agent/unix/AgentClient.java |   2 +-
 .../java/org/apache/sshd/client/ScpClient.java  |  58 ---
 .../sshd/client/scp/AbstractScpClient.java      | 128 ++++++-
 .../sshd/client/scp/DefaultScpClient.java       | 189 +++++-----
 .../org/apache/sshd/client/scp/ScpClient.java   |  73 ++++
 .../sshd/client/session/ClientSessionImpl.java  |   2 +-
 .../sshd/client/sftp/DefaultSftpClient.java     |   2 +-
 .../apache/sshd/client/sftp/SftpFileSystem.java |   4 +-
 .../client/sftp/SftpFileSystemProvider.java     |  59 +--
 .../common/channel/ChannelOutputStream.java     |   2 +-
 .../common/channel/ChannelPipedInputStream.java |   2 +-
 .../sshd/common/file/util/MockFileSystem.java   | 113 ++++++
 .../apache/sshd/common/file/util/MockPath.java  | 186 ++++++++++
 .../scp/LocalFileScpSourceStreamResolver.java   |  90 +++++
 .../scp/LocalFileScpTargetStreamResolver.java   | 150 ++++++++
 .../org/apache/sshd/common/scp/ScpHelper.java   | 355 ++++++++-----------
 .../sshd/common/scp/ScpReceiveLineHandler.java  |  35 ++
 .../common/scp/ScpSourceStreamResolver.java     |  68 ++++
 .../common/scp/ScpTargetStreamResolver.java     |  59 +++
 .../apache/sshd/common/scp/ScpTimestamp.java    |  60 ++++
 .../sshd/common/session/AbstractSession.java    |   2 +-
 .../sshd/common/util/io/LimitInputStream.java   | 111 ++++++
 .../sshd/server/command/ScpCommandFactory.java  |   9 +-
 .../java/org/apache/sshd/client/ScpTest.java    |  92 +++--
 .../common/util/io/LimitInputStreamTest.java    | 111 ++++++
 .../test/java/org/apache/sshd/util/Utils.java   |  14 +-
 28 files changed, 1565 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/ClientSession.java b/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
index 822e072..0f43cf1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
@@ -24,13 +24,13 @@ import java.security.KeyPair;
 import java.util.Map;
 
 import org.apache.sshd.client.ClientFactoryManager;
-import org.apache.sshd.client.ScpClient;
 import org.apache.sshd.client.UserInteraction;
 import org.apache.sshd.client.channel.ChannelDirectTcpip;
 import org.apache.sshd.client.channel.ChannelExec;
 import org.apache.sshd.client.channel.ChannelShell;
 import org.apache.sshd.client.channel.ChannelSubsystem;
 import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.scp.ScpClient;
 import org.apache.sshd.client.sftp.SftpClient;
 import org.apache.sshd.common.Session;
 import org.apache.sshd.common.SshdSocketAddress;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index 7901af2..ea0779a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -65,7 +65,7 @@ public class AgentForwardedChannel extends AbstractClientChannel {
                 }
                 return messages.poll();
             } catch (InterruptedException e) {
-                throw (IOException) new InterruptedIOException().initCause(e);
+                throw (IOException) new InterruptedIOException("Interrupted while polling for messages").initCause(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
index 45b33e1..a8a5271 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentClient.java
@@ -134,7 +134,7 @@ public class AgentClient extends AbstractAgentProxy implements Runnable {
                 }
                 return messages.poll();
             } catch (InterruptedException e) {
-                throw (IOException) new InterruptedIOException().initCause(e);
+                throw (IOException) new InterruptedIOException(authSocket + ": Interrupted while polling for messages").initCause(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/client/ScpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/ScpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/ScpClient.java
deleted file mode 100644
index ed273d3..0000000
--- a/sshd-core/src/main/java/org/apache/sshd/client/ScpClient.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd.client;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collection;
-
-/**
- */
-public interface ScpClient {
-
-    enum Option {
-        Recursive,
-        PreserveAttributes,
-        TargetIsDirectory
-    }
-
-    void download(String remote, String local, Option... options) throws IOException;
-    void download(String remote, String local, Collection<Option> options) throws IOException;
-
-    void download(String remote, Path local, Option... options) throws IOException;
-    void download(String remote, Path local, Collection<Option> options) throws IOException;
-
-    void download(String[] remote, String local, Option... options) throws IOException;
-    void download(String[] remote, String local, Collection<Option> options) throws IOException;
-
-    void download(String[] remote, Path local, Option... options) throws IOException;
-    void download(String[] remote, Path local, Collection<Option> options) throws IOException;
-
-    void upload(String local, String remote, Option... options) throws IOException;
-    void upload(String local, String remote, Collection<Option> options) throws IOException;
-
-    void upload(Path local, String remote, Option... options) throws IOException;
-    void upload(Path local, String remote, Collection<Option> options) throws IOException;
-
-    void upload(String[] local, String remote, Option... options) throws IOException;
-    void upload(String[] local, String remote, Collection<Option> options) throws IOException;
-    
-    void upload(Path[] local, String remote, Option... options) throws IOException;
-    void upload(Path[] local, String remote, Collection<Option> options) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/client/scp/AbstractScpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/scp/AbstractScpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/scp/AbstractScpClient.java
index f005d2f..d59000d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/scp/AbstractScpClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/scp/AbstractScpClient.java
@@ -19,20 +19,32 @@
 
 package org.apache.sshd.client.scp;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystem;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
 import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 
-import org.apache.sshd.client.ScpClient;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.scp.ScpHelper;
+import org.apache.sshd.common.scp.ScpTimestamp;
+import org.apache.sshd.common.util.AbstractLoggingBean;
 import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.IoUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public abstract class AbstractScpClient implements ScpClient {
+public abstract class AbstractScpClient extends AbstractLoggingBean implements ScpClient {
     protected AbstractScpClient() {
         super();
     }
@@ -81,12 +93,50 @@ public abstract class AbstractScpClient implements ScpClient {
 
     @Override
     public void download(String remote, Path local, Option... options) throws IOException {
-        download(remote, local, GenericUtils.isEmpty(options) ? Collections.<Option>emptySet() : GenericUtils.of(options));
+        download(remote, local, GenericUtils.of(options));
+    }
+
+    @Override
+    public void download(String remote, Path local, Collection<Option> options) throws IOException {
+        local = ValidateUtils.checkNotNull(local, "Invalid argument local: %s", local);
+        remote = ValidateUtils.checkNotNullAndNotEmpty(remote, "Invalid argument remote: %s", remote);
+
+        LinkOption[]    opts = IoUtils.getLinkOptions(false);
+        if (Files.isDirectory(local, opts)) {
+            options = addTargetIsDirectory(options);
+        }
+
+        if (options.contains(Option.TargetIsDirectory)) {
+            Boolean status = IoUtils.checkFileExists(local, opts);
+            if (status == null) {
+                throw new SshException("Target directory " + local.toString() + " is probaly inaccesible");
+            }
+
+            if (!status.booleanValue()) {
+                throw new SshException("Target directory " + local.toString() + " does not exist");
+            }
+
+            if (!Files.isDirectory(local, opts)) {
+                throw new SshException("Target directory " + local.toString() + " is not a directory");
+            }
+        }
+
+        download(remote, local.getFileSystem(), local, options);
+    }
+
+    protected abstract void download(String remote, FileSystem fs, Path local, Collection<Option> options) throws IOException;
+
+    @Override
+    public byte[] downloadBytes(String remote) throws IOException {
+        try(ByteArrayOutputStream local = new ByteArrayOutputStream()) {
+            download(remote, local);
+            return local.toByteArray();
+        }
     }
 
     @Override
     public void upload(String local, String remote, Option... options) throws IOException {
-        upload(local, remote, GenericUtils.isEmpty(options) ? Collections.<Option>emptySet() : GenericUtils.of(options));
+        upload(local, remote, GenericUtils.of(options));
     }
 
     @Override
@@ -114,6 +164,42 @@ public abstract class AbstractScpClient implements ScpClient {
         upload(local, remote, GenericUtils.isEmpty(options) ? Collections.<Option>emptySet() : GenericUtils.of(options));
     }
 
+    @Override
+    public void upload(byte[] data, String remote, Collection<PosixFilePermission> perms, ScpTimestamp time) throws IOException {
+        upload(data, 0, data.length, remote, perms, time);
+    }
+
+    @Override
+    public void upload(byte[] data, int offset, int len, String remote, Collection<PosixFilePermission> perms, ScpTimestamp time) throws IOException {
+        try(InputStream local = new ByteArrayInputStream(data, offset, len)) {
+            upload(local, remote, len, perms, time);
+        }
+    }
+
+    @Override
+    public void upload(String[] local, String remote, Collection<Option> options) throws IOException {
+        final Collection<String>    paths=Arrays.asList(ValidateUtils.checkNotNullAndNotEmpty(local, "Invalid argument local: %s", (Object) local));
+        runUpload(remote, options, paths, new ScpOperationExecutor<String>() {
+            @Override
+            public void execute(ScpHelper helper, Collection<String> local, Collection<Option> sendOptions) throws IOException {
+                helper.send(local, sendOptions.contains(Option.Recursive), sendOptions.contains(Option.PreserveAttributes), ScpHelper.DEFAULT_SEND_BUFFER_SIZE);
+            }
+        });
+    }
+
+    @Override
+    public void upload(Path[] local, String remote, Collection<Option> options) throws IOException {
+        final Collection<Path>    paths=Arrays.asList(ValidateUtils.checkNotNullAndNotEmpty(local, "Invalid argument local: %s", (Object) local));
+        runUpload(remote, options, paths, new ScpOperationExecutor<Path>() {
+            @Override
+            public void execute(ScpHelper helper, Collection<Path> local, Collection<Option> sendOptions) throws IOException {
+                helper.sendPaths(local, sendOptions.contains(Option.Recursive), sendOptions.contains(Option.PreserveAttributes), ScpHelper.DEFAULT_SEND_BUFFER_SIZE);
+            }
+        });
+    }
+
+    protected abstract <T> void runUpload(String remote, Collection<Option> options, Collection<T> local, AbstractScpClient.ScpOperationExecutor<T> executor) throws IOException;
+
     protected Collection<Option> addTargetIsDirectory(Collection<Option> options) {
         if (GenericUtils.isEmpty(options) || (!options.contains(Option.TargetIsDirectory))) {
             // create a copy in case the original collection is un-modifiable
@@ -123,4 +209,38 @@ public abstract class AbstractScpClient implements ScpClient {
         
         return options;
     }
+
+    public static String createSendCommand(String remote, Collection<Option> options) {
+        StringBuilder sb = new StringBuilder(remote.length() + Long.SIZE).append(ScpHelper.SCP_COMMAND_PREFIX);
+        if (options.contains(Option.Recursive)) {
+            sb.append(" -r");
+        }
+        if (options.contains(Option.TargetIsDirectory)) {
+            sb.append(" -d");
+        }
+        if (options.contains(Option.PreserveAttributes)) {
+            sb.append(" -p");
+        }
+
+        sb.append(" -t").append(" --").append(" ").append(remote);
+        return sb.toString();
+    }
+
+    public static String createReceiveCommand(String remote, Collection<Option> options) {
+        ValidateUtils.checkNotNullAndNotEmpty(remote, "No remote location specified", GenericUtils.EMPTY_OBJECT_ARRAY);
+        StringBuilder sb = new StringBuilder(remote.length() + Long.SIZE).append(ScpHelper.SCP_COMMAND_PREFIX);
+        if (options.contains(Option.Recursive)) {
+            sb.append(" -r");
+        }
+        if (options.contains(Option.PreserveAttributes)) {
+            sb.append(" -p");
+        }
+
+        sb.append(" -f").append(" --").append(' ').append(remote);
+        return sb.toString();
+    }
+
+    public static interface ScpOperationExecutor<T> {
+        void execute(ScpHelper helper, Collection<T> local, Collection<Option> options) throws IOException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/client/scp/DefaultScpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/scp/DefaultScpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/scp/DefaultScpClient.java
index 9c72dda..c4a977a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/scp/DefaultScpClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/scp/DefaultScpClient.java
@@ -19,21 +19,27 @@
 package org.apache.sshd.client.scp;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.nio.file.FileSystem;
-import java.nio.file.Files;
-import java.nio.file.LinkOption;
 import java.nio.file.Path;
-import java.util.Arrays;
+import java.nio.file.attribute.PosixFilePermission;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
 
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.client.channel.ChannelExec;
-import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.file.FileSystemFactory;
+import org.apache.sshd.common.file.util.MockFileSystem;
+import org.apache.sshd.common.file.util.MockPath;
 import org.apache.sshd.common.scp.ScpHelper;
+import org.apache.sshd.common.scp.ScpSourceStreamResolver;
+import org.apache.sshd.common.scp.ScpTimestamp;
 import org.apache.sshd.common.scp.ScpTransferEventListener;
-import org.apache.sshd.common.util.IoUtils;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 
 /**
@@ -57,7 +63,8 @@ public class DefaultScpClient extends AbstractScpClient {
     public void download(String remote, String local, Collection<Option> options) throws IOException {
         local = ValidateUtils.checkNotNullAndNotEmpty(local, "Invalid argument local: %s", local);
 
-        FileSystemFactory factory = clientSession.getFactoryManager().getFileSystemFactory();
+        FactoryManager manager = clientSession.getFactoryManager();
+        FileSystemFactory factory = manager.getFileSystemFactory();
         FileSystem fs = factory.createFileSystem(clientSession);
         try {
             download(remote, fs, fs.getPath(local), options);
@@ -71,56 +78,36 @@ public class DefaultScpClient extends AbstractScpClient {
     }
 
     @Override
-    public void download(String remote, Path local, Collection<Option> options) throws IOException {
-        local = ValidateUtils.checkNotNull(local, "Invalid argument local: %s", local);
-        download(remote, local.getFileSystem(), local, options);
-    }
-
-    protected void download(String remote, FileSystem fs, Path local, Collection<Option> options) throws IOException {
-        local = ValidateUtils.checkNotNull(local, "Invalid argument local: %s", local);
-        remote = ValidateUtils.checkNotNullAndNotEmpty(remote, "Invalid argument remote: %s", remote);
-
-        LinkOption[]    opts = IoUtils.getLinkOptions(false);
-        if (Files.isDirectory(local, opts)) {
-            options = addTargetIsDirectory(options);
-        }
-
-        if (options.contains(Option.TargetIsDirectory)) {
-            Boolean         status = IoUtils.checkFileExists(local, opts);
-            if (status == null) {
-                throw new SshException("Target directory " + local.toString() + " is probaly inaccesible");
-            }
-
-            if (!status.booleanValue()) {
-                throw new SshException("Target directory " + local.toString() + " does not exist");
-            }
-            if (!Files.isDirectory(local, opts)) {
-                throw new SshException("Target directory " + local.toString() + " is not a directory");
+    public void download(String remote, OutputStream local) throws IOException {
+        String cmd = createReceiveCommand(remote, Collections.<Option>emptyList());
+        ChannelExec channel = clientSession.createExecChannel(cmd);
+        try {
+            try {
+                channel.open().await();
+            } catch (InterruptedException e) {
+                throw (IOException) new InterruptedIOException("Interrupted while await channel open for download of " + remote).initCause(e);
             }
-        }
 
-        StringBuilder sb = new StringBuilder("scp");
-        if (options.contains(Option.Recursive)) {
-            sb.append(" -r");
-        }
-        if (options.contains(Option.PreserveAttributes)) {
-            sb.append(" -p");
+            // NOTE: we use a mock file system since we expect no invocations for it
+            ScpHelper helper = new ScpHelper(channel.getInvertedOut(), channel.getInvertedIn(), new MockFileSystem(remote), listener);
+            helper.receiveFileStream(local, ScpHelper.DEFAULT_RECEIVE_BUFFER_SIZE);
+        } finally {
+            channel.close(false);
         }
-        sb.append(" -f");
-        sb.append(" --");
-        sb.append(" ");
-        sb.append(remote);
+    }
 
-        ChannelExec channel = clientSession.createExecChannel(sb.toString());
+    @Override
+    protected void download(String remote, FileSystem fs, Path local, Collection<Option> options) throws IOException {
+        String cmd = createReceiveCommand(remote, options);
+        ChannelExec channel = clientSession.createExecChannel(cmd);
         try {
             try {
                 channel.open().await();
             } catch (InterruptedException e) {
-                throw (IOException) new InterruptedIOException().initCause(e);
+                throw (IOException) new InterruptedIOException("Interrupted while await channel open for download of " + remote + " to " + local).initCause(e);
             }
 
             ScpHelper helper = new ScpHelper(channel.getInvertedOut(), channel.getInvertedIn(), fs, listener);
-
             helper.receive(local,
                            options.contains(Option.Recursive),
                            options.contains(Option.TargetIsDirectory),
@@ -132,64 +119,84 @@ public class DefaultScpClient extends AbstractScpClient {
     }
 
     @Override
-    public void upload(String[] local, String remote, Collection<Option> options) throws IOException {
-        final Collection<String>    paths=Arrays.asList(ValidateUtils.checkNotNullAndNotEmpty(local, "Invalid argument local: %s", (Object) local));
-        runUpload(remote, options, paths, new ScpOperationExecutor<String>() {
-            @Override
-            public void execute(ScpHelper helper, Collection<String> local, Collection<Option> options) throws IOException {
-                helper.send(local,
-                        options.contains(Option.Recursive),
-                        options.contains(Option.PreserveAttributes),
-                        ScpHelper.DEFAULT_SEND_BUFFER_SIZE);
-            }
-        });
-    }
+    public void upload(final InputStream local, final String remote, final long size, final Collection<PosixFilePermission> perms, final ScpTimestamp time) throws IOException {
+        int namePos = ValidateUtils.checkNotNullAndNotEmpty(remote, "No remote location specified", GenericUtils.EMPTY_OBJECT_ARRAY).lastIndexOf('/');
+        final String name = (namePos < 0)
+                          ? remote
+                          : ValidateUtils.checkNotNullAndNotEmpty(remote.substring(namePos + 1), "No name value in remote=%s", remote)
+                          ;
+        final String cmd = createSendCommand(remote, (time != null) ? EnumSet.of(Option.PreserveAttributes) : Collections.<Option>emptySet());
+        ChannelExec channel = clientSession.createExecChannel(cmd);
+        try {
+            channel.open().await();
+        } catch (InterruptedException e) {
+            throw (IOException) new InterruptedIOException("Interrupted while await channel open for stream upload to " + remote).initCause(e);
+        }
 
-    @Override
-    public void upload(Path[] local, String remote, Collection<Option> options) throws IOException {
-        final Collection<Path>    paths=Arrays.asList(ValidateUtils.checkNotNullAndNotEmpty(local, "Invalid argument local: %s", (Object) local));
-        runUpload(remote, options, paths, new ScpOperationExecutor<Path>() {
-            @Override
-            public void execute(ScpHelper helper, Collection<Path> local, Collection<Option> options) throws IOException {
-                helper.sendPaths(local,
-                        options.contains(Option.Recursive),
-                        options.contains(Option.PreserveAttributes),
-                        ScpHelper.DEFAULT_SEND_BUFFER_SIZE);
-            }
-        });
+        try {
+            ScpHelper helper = new ScpHelper(channel.getInvertedOut(), channel.getInvertedIn(), new MockFileSystem(remote), listener);
+            final Path mockPath = new MockPath(remote);
+            helper.sendStream(new ScpSourceStreamResolver() {
+                                    @Override
+                                    public String getFileName() throws IOException {
+                                        return name;
+                                    }
+                    
+                                    @Override
+                                    public Path getEventListenerFilePath() {
+                                        return mockPath;
+                                    }
+                    
+                                    @Override
+                                    public Collection<PosixFilePermission> getPermissions() throws IOException {
+                                        return perms;
+                                    }
+                    
+                                    @Override
+                                    public ScpTimestamp getTimestamp() throws IOException {
+                                        return time;
+                                    }
+                    
+                                    @Override
+                                    public long getSize() throws IOException {
+                                        return size;
+                                    }
+                    
+                                    @Override
+                                    public InputStream resolveSourceStream() throws IOException {
+                                        return local;
+                                    }
+                                    
+                                    @Override
+                                    public String toString() {
+                                        return cmd;
+                                    }
+                              },
+                              (time != null), ScpHelper.DEFAULT_SEND_BUFFER_SIZE);
+        } finally {
+            channel.close(false);
+        }
     }
 
-    protected <T> void runUpload(String remote, Collection<Option> options, Collection<T> local, ScpOperationExecutor<T> executor) throws IOException {
+    @Override
+    protected <T> void runUpload(String remote, Collection<Option> options, Collection<T> local, AbstractScpClient.ScpOperationExecutor<T> executor) throws IOException {
         local = ValidateUtils.checkNotNullAndNotEmpty(local, "Invalid argument local: %s", local);
         remote = ValidateUtils.checkNotNullAndNotEmpty(remote, "Invalid argument remote: %s", remote);
         if (local.size() > 1) {
             options = addTargetIsDirectory(options);
         }
         
-        StringBuilder sb = new StringBuilder("scp");
-        if (options.contains(Option.Recursive)) {
-            sb.append(" -r");
-        }
-        if (options.contains(Option.TargetIsDirectory)) {
-            sb.append(" -d");
-        }
-        if (options.contains(Option.PreserveAttributes)) {
-            sb.append(" -p");
-        }
-        sb.append(" -t");
-        sb.append(" --");
-        sb.append(" ");
-        sb.append(remote);
-
-        ChannelExec channel = clientSession.createExecChannel(sb.toString());
+        String cmd = createSendCommand(remote, options);
+        ChannelExec channel = clientSession.createExecChannel(cmd);
         try {
             channel.open().await();
         } catch (InterruptedException e) {
-            throw (IOException) new InterruptedIOException().initCause(e);
+            throw (IOException) new InterruptedIOException("Interrupted while await channel open for upload to " + remote).initCause(e);
         }
 
         try {
-            FileSystemFactory factory = clientSession.getFactoryManager().getFileSystemFactory();
+            FactoryManager manager = clientSession.getFactoryManager();
+            FileSystemFactory factory = manager.getFileSystemFactory();
             FileSystem fs = factory.createFileSystem(clientSession);
             try {
                 ScpHelper helper = new ScpHelper(channel.getInvertedOut(), channel.getInvertedIn(), fs, listener);
@@ -197,7 +204,7 @@ public class DefaultScpClient extends AbstractScpClient {
             } finally {
                 try {
                     fs.close();
-                } catch (UnsupportedOperationException e) {
+                } catch(UnsupportedOperationException e) {
                     // Ignore
                 }
             }
@@ -205,8 +212,4 @@ public class DefaultScpClient extends AbstractScpClient {
             channel.close(false);
         }
     }
-    
-    public static interface ScpOperationExecutor<T> {
-        void execute(ScpHelper helper, Collection<T> local, Collection<Option> options) throws IOException;
-    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/client/scp/ScpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/scp/ScpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/scp/ScpClient.java
new file mode 100644
index 0000000..ae7bb1e
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/client/scp/ScpClient.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.scp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Collection;
+
+import org.apache.sshd.common.scp.ScpTimestamp;
+
+/**
+ */
+public interface ScpClient {
+
+    enum Option {
+        Recursive,
+        PreserveAttributes,
+        TargetIsDirectory
+    }
+
+    void download(String remote, String local, Option... options) throws IOException;
+    void download(String remote, String local, Collection<Option> options) throws IOException;
+
+    void download(String remote, Path local, Option... options) throws IOException;
+    void download(String remote, Path local, Collection<Option> options) throws IOException;
+
+    // NOTE: the remote location MUST be a file or an exception is generated
+    void download(String remote, OutputStream local) throws IOException;
+    byte[] downloadBytes(String remote) throws IOException;
+
+    void download(String[] remote, String local, Option... options) throws IOException;
+    void download(String[] remote, String local, Collection<Option> options) throws IOException;
+
+    void download(String[] remote, Path local, Option... options) throws IOException;
+    void download(String[] remote, Path local, Collection<Option> options) throws IOException;
+
+    void upload(String local, String remote, Option... options) throws IOException;
+    void upload(String local, String remote, Collection<Option> options) throws IOException;
+
+    void upload(Path local, String remote, Option... options) throws IOException;
+    void upload(Path local, String remote, Collection<Option> options) throws IOException;
+
+    void upload(String[] local, String remote, Option... options) throws IOException;
+    void upload(String[] local, String remote, Collection<Option> options) throws IOException;
+    
+    void upload(Path[] local, String remote, Option... options) throws IOException;
+    void upload(Path[] local, String remote, Collection<Option> options) throws IOException;
+    
+    // NOTE: due to SCP command limitations, the amount of data to be uploaded must be known a-priori
+    // To upload a dynamic amount of data use SFTP
+    void upload(byte[] data, String remote, Collection<PosixFilePermission> perms, ScpTimestamp time) throws IOException;
+    void upload(byte[] data, int offset, int len, String remote, Collection<PosixFilePermission> perms, ScpTimestamp time) throws IOException;
+    void upload(InputStream local, String remote, long size, Collection<PosixFilePermission> perms, ScpTimestamp time) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index c9a1656..09286d8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import org.apache.sshd.ClientChannel;
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.client.ClientFactoryManager;
-import org.apache.sshd.client.ScpClient;
 import org.apache.sshd.client.ServerKeyVerifier;
 import org.apache.sshd.client.UserInteraction;
 import org.apache.sshd.client.channel.ChannelDirectTcpip;
@@ -40,6 +39,7 @@ import org.apache.sshd.client.channel.ChannelSubsystem;
 import org.apache.sshd.client.future.AuthFuture;
 import org.apache.sshd.client.future.DefaultAuthFuture;
 import org.apache.sshd.client.scp.DefaultScpClient;
+import org.apache.sshd.client.scp.ScpClient;
 import org.apache.sshd.client.sftp.DefaultSftpClient;
 import org.apache.sshd.client.sftp.SftpClient;
 import org.apache.sshd.client.sftp.SftpFileSystem;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/client/sftp/DefaultSftpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/sftp/DefaultSftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/sftp/DefaultSftpClient.java
index 3ee81c2..ebf80ed 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/sftp/DefaultSftpClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/sftp/DefaultSftpClient.java
@@ -269,7 +269,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
                 try {
                     messages.wait();
                 } catch (InterruptedException e) {
-                    throw (IOException) new InterruptedIOException().initCause(e);
+                    throw (IOException) new InterruptedIOException("Interrupted while waiting for messages").initCause(e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystem.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystem.java b/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystem.java
index dfc88f4..820099c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystem.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystem.java
@@ -84,7 +84,9 @@ public class SftpFileSystem extends BaseFileSystem<SftpPath> {
 
     @Override
     public void close() throws IOException {
-        session.close(true);
+        if (isOpen()) {
+            session.close(true);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystemProvider.java b/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystemProvider.java
index fc53d04..6eb2e61 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystemProvider.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/sftp/SftpFileSystemProvider.java
@@ -75,14 +75,14 @@ import java.util.concurrent.TimeUnit;
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.SshBuilder;
 import org.apache.sshd.SshClient;
-import org.apache.sshd.client.sftp.SftpClient.Attributes;
 import org.apache.sshd.client.SftpException;
+import org.apache.sshd.client.sftp.SftpClient.Attributes;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.config.SshConfigFileReader;
 import org.apache.sshd.common.sftp.SftpConstants;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.IoUtils;
-import org.apache.sshd.server.sftp.SftpSubsystemFactory;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,27 +118,46 @@ public class SftpFileSystemProvider extends FileSystemProvider {
             if (fileSystem != null) {
                 throw new FileSystemAlreadyExistsException(authority);
             }
-            String host = uri.getHost();
-            String userInfo = uri.getUserInfo();
-            if (host == null) {
-                throw new IllegalArgumentException("Host not provided");
-            }
-            if (userInfo == null) {
-                throw new IllegalArgumentException("UserInfo not provided");
+            String host = ValidateUtils.checkNotNullAndNotEmpty(uri.getHost(), "Host not provided", GenericUtils.EMPTY_OBJECT_ARRAY);
+            String userInfo = ValidateUtils.checkNotNullAndNotEmpty(uri.getUserInfo(), "UserInfo not provided", GenericUtils.EMPTY_OBJECT_ARRAY);
+            String[] ui = GenericUtils.split(userInfo, ':');
+            int port = uri.getPort();
+            if (port <= 0) {
+                port = SshConfigFileReader.DEFAULT_PORT;
             }
-            String[] ui = userInfo.split(":");
-            ClientSession session;
+
+            ClientSession session=null;
             try {
-                session = client.connect(ui[0], host, uri.getPort() > 0 ? uri.getPort() : SshConfigFileReader.DEFAULT_PORT)
-                        .await().getSession();
-            } catch (InterruptedException e) {
-                throw new InterruptedIOException();
+                session = client.connect(ui[0], host, port).await().getSession();
+                session.addPasswordIdentity(ui[1]);
+                session.auth().verify();
+                fileSystem = new SftpFileSystem(this, session);
+                fileSystems.put(authority, fileSystem);
+                return fileSystem;
+            } catch(Exception e) {
+                if (session != null) {
+                    try {
+                        session.close();
+                    } catch(IOException t) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Failed (" + t.getClass().getSimpleName() + ")"
+                                    + " to close session for new file system on " + host + ":" + port
+                                    + " due to " + e.getClass().getSimpleName() + "[" + e.getMessage() + "]"
+                                    + ": " + t.getMessage());
+                        }
+                    }
+                }
+                
+                if (e instanceof IOException) {
+                    throw (IOException) e;
+                } else if (e instanceof InterruptedException) {
+                    throw (IOException) new InterruptedIOException("Interrupted while waiting for connection to " + host + ":" + port).initCause(e);
+                } else if (e instanceof RuntimeException) {
+                    throw (RuntimeException) e;
+                } else {
+                    throw new IOException(e);
+                }
             }
-            session.addPasswordIdentity(ui[1]);
-            session.auth().verify();
-            fileSystem = new SftpFileSystem(this, session);
-            fileSystems.put(authority, fileSystem);
-            return fileSystem;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 92fee57..83aaaff 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -98,7 +98,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
                         closed = true;
                         throw e;
                     } catch (InterruptedException e) {
-                        throw (IOException)new InterruptedIOException().initCause(e);
+                        throw (IOException)new InterruptedIOException("Interrupted while waiting for remote space").initCause(e);
                     }
                 }
                 continue;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
index e42b12c..f9fd7e1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelPipedInputStream.java
@@ -119,7 +119,7 @@ public class ChannelPipedInputStream extends InputStream implements ChannelPiped
                         dataAvailable.await();
                     }
                 } catch (InterruptedException e) {
-                    throw (IOException) new InterruptedIOException().initCause(e);
+                    throw (IOException) new InterruptedIOException("Interrupted while waiting for data to become available").initCause(e);
                 }
             }
             if (len > buffer.available()) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/file/util/MockFileSystem.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/file/util/MockFileSystem.java b/sshd-core/src/main/java/org/apache/sshd/common/file/util/MockFileSystem.java
new file mode 100644
index 0000000..db5fab8
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/file/util/MockFileSystem.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.file.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.nio.file.WatchService;
+import java.nio.file.attribute.UserPrincipalLookupService;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class MockFileSystem extends FileSystem {
+    private final AtomicBoolean open = new AtomicBoolean(true);
+    private final String name;
+
+    public MockFileSystem(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public FileSystemProvider provider() {
+        throw new UnsupportedOperationException("provider() N/A");
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (open.getAndSet(false)) {
+            return; // debug breakpoint
+        }
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open.get();
+    }
+
+    @Override
+    public boolean isReadOnly() {
+        return true;
+    }
+
+    @Override
+    public String getSeparator() {
+        return File.separator;
+    }
+
+    @Override
+    public Iterable<Path> getRootDirectories() {
+        return Collections.<Path>emptyList();
+    }
+
+    @Override
+    public Iterable<FileStore> getFileStores() {
+        return Collections.<FileStore>emptyList();
+    }
+
+    @Override
+    public Set<String> supportedFileAttributeViews() {
+        return Collections.<String>emptySet();
+    }
+
+    @Override
+    public Path getPath(String first, String... more) {
+        throw new UnsupportedOperationException("getPath(" + first + ") " + Arrays.toString(more));
+    }
+
+    @Override
+    public PathMatcher getPathMatcher(String syntaxAndPattern) {
+        throw new UnsupportedOperationException("getPathMatcher(" + syntaxAndPattern + ")");
+    }
+
+    @Override
+    public UserPrincipalLookupService getUserPrincipalLookupService() {
+        throw new UnsupportedOperationException("getUserPrincipalLookupService() N/A");
+    }
+
+    @Override
+    public WatchService newWatchService() throws IOException {
+        throw new IOException("newWatchService() N/A");
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/file/util/MockPath.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/file/util/MockPath.java b/sshd-core/src/main/java/org/apache/sshd/common/file/util/MockPath.java
new file mode 100644
index 0000000..85711d4
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/file/util/MockPath.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.file.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchEvent.Modifier;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class MockPath implements Path {
+    private final String path;
+    private final FileSystem fs;
+
+    public MockPath(String path) {
+        this.path = path;
+        this.fs = new MockFileSystem(path);
+    }
+
+    @Override
+    public FileSystem getFileSystem() {
+        return fs;
+    }
+
+    @Override
+    public boolean isAbsolute() {
+        return true;
+    }
+
+    @Override
+    public Path getRoot() {
+        return this;
+    }
+
+    @Override
+    public Path getFileName() {
+        return this;
+    }
+
+    @Override
+    public Path getParent() {
+        return null;
+    }
+
+    @Override
+    public int getNameCount() {
+        return 0;
+    }
+
+    @Override
+    public Path getName(int index) {
+        if (index == 0) {
+            return this;
+        } else {
+            throw new IllegalArgumentException("getName - bad index: " + index);
+        }
+    }
+
+    @Override
+    public Path subpath(int beginIndex, int endIndex) {
+        throw new UnsupportedOperationException("subPath(" + beginIndex + "," + endIndex + ") N/A");
+    }
+
+    @Override
+    public boolean startsWith(Path other) {
+        return startsWith(other.toString());
+    }
+
+    @Override
+    public boolean startsWith(String other) {
+        return path.startsWith(other);
+    }
+
+    @Override
+    public boolean endsWith(Path other) {
+        return endsWith(other.toString());
+    }
+
+    @Override
+    public boolean endsWith(String other) {
+        return path.endsWith(other);
+    }
+
+    @Override
+    public Path normalize() {
+        return this;
+    }
+
+    @Override
+    public Path resolve(Path other) {
+        return resolve(other.toString());
+    }
+
+    @Override
+    public Path resolve(String other) {
+        throw new UnsupportedOperationException("resolve(" + other + ") N/A");
+    }
+
+    @Override
+    public Path resolveSibling(Path other) {
+        return resolveSibling(other.toString());
+    }
+
+    @Override
+    public Path resolveSibling(String other) {
+        throw new UnsupportedOperationException("resolveSibling(" + other + ") N/A");
+    }
+
+    @Override
+    public Path relativize(Path other) {
+        throw new UnsupportedOperationException("relativize(" + other + ") N/A");
+    }
+
+    @Override
+    public URI toUri() {
+        throw new UnsupportedOperationException("toUri() N/A");
+    }
+
+    @Override
+    public Path toAbsolutePath() {
+        return this;
+    }
+
+    @Override
+    public Path toRealPath(LinkOption... options) throws IOException {
+        return this;
+    }
+
+    @Override
+    public File toFile() {
+        throw new UnsupportedOperationException("toFile() N/A");
+    }
+
+    @Override
+    public WatchKey register(WatchService watcher, Kind<?>... events) throws IOException {
+        return register(watcher, events, (Modifier[]) null);
+    }
+
+    @Override
+    public WatchKey register(WatchService watcher, Kind<?>[] events, Modifier... modifiers) throws IOException {
+        throw new IOException("register(" + path + ") N/A");
+    }
+
+    @Override
+    public Iterator<Path> iterator() {
+        return Collections.<Path>singleton(this).iterator();
+    }
+
+    @Override
+    public int compareTo(Path other) {
+        return path.compareTo(other.toString());
+    }
+
+    @Override
+    public String toString() {
+        return path;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/scp/LocalFileScpSourceStreamResolver.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/scp/LocalFileScpSourceStreamResolver.java b/sshd-core/src/main/java/org/apache/sshd/common/scp/LocalFileScpSourceStreamResolver.java
new file mode 100644
index 0000000..7c2311e
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/scp/LocalFileScpSourceStreamResolver.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.scp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributeView;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.sshd.common.util.AbstractLoggingBean;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.IoUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class LocalFileScpSourceStreamResolver extends AbstractLoggingBean implements ScpSourceStreamResolver {
+    private final Path path, name;
+    private final Set<PosixFilePermission> perms;
+    private final long size;
+    private final ScpTimestamp time;
+
+    public LocalFileScpSourceStreamResolver(Path path) throws IOException {
+        this.path = ValidateUtils.checkNotNull(path, "No path specified", GenericUtils.EMPTY_OBJECT_ARRAY);
+        this.name = path.getFileName();
+        this.perms = IoUtils.getPermissions(path);
+
+        BasicFileAttributes basic = Files.getFileAttributeView(path, BasicFileAttributeView.class).readAttributes();
+        this.size = basic.size();
+        this.time = new ScpTimestamp(basic.lastModifiedTime().toMillis(), basic.lastAccessTime().toMillis());
+    }
+
+    @Override
+    public String getFileName() throws IOException {
+        return name.toString();
+    }
+
+    @Override
+    public Collection<PosixFilePermission> getPermissions() throws IOException {
+        return perms;
+    }
+
+    @Override
+    public ScpTimestamp getTimestamp() throws IOException {
+        return time;
+    }
+
+    @Override
+    public long getSize() throws IOException {
+        return size;
+    }
+
+    @Override
+    public Path getEventListenerFilePath() {
+        return path;
+    }
+
+    @Override
+    public InputStream resolveSourceStream() throws IOException {
+        return Files.newInputStream(getEventListenerFilePath());
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(getEventListenerFilePath());
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/scp/LocalFileScpTargetStreamResolver.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/scp/LocalFileScpTargetStreamResolver.java b/sshd-core/src/main/java/org/apache/sshd/common/scp/LocalFileScpTargetStreamResolver.java
new file mode 100644
index 0000000..efd7f46
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/scp/LocalFileScpTargetStreamResolver.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.scp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.StreamCorruptedException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributeView;
+import java.nio.file.attribute.FileTime;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.common.util.AbstractLoggingBean;
+import org.apache.sshd.common.util.IoUtils;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class LocalFileScpTargetStreamResolver extends AbstractLoggingBean implements ScpTargetStreamResolver {
+    private final Path path;
+    private final Boolean status;
+    private Path file;
+
+    public LocalFileScpTargetStreamResolver(Path path) throws IOException {
+        LinkOption[] options = IoUtils.getLinkOptions(false);
+        this.status = IoUtils.checkFileExists(path, options);
+        if (status == null) {
+            throw new AccessDeniedException("Receive target file path existence status cannot be determined: " + path);
+        }
+
+        this.path = path;
+    }
+
+    @Override
+    public OutputStream resolveTargetStream(String name, long length, Set<PosixFilePermission> perms) throws IOException {
+        if (file != null) {
+            throw new StreamCorruptedException("resolveTargetStream(" + name + ")[" + perms + "] already resolved: " + file);
+        }
+
+        LinkOption[] options = IoUtils.getLinkOptions(false);
+        if (status.booleanValue() && Files.isDirectory(path, options)) {
+            String localName = name.replace('/', File.separatorChar);   // in case we are running on Windows
+            file = path.resolve(localName);
+        } else if (status.booleanValue() && Files.isRegularFile(path, options)) {
+            file = path;
+        } else if (!status.booleanValue()) {
+            Path parent = path.getParent();
+            
+            Boolean parentStatus = IoUtils.checkFileExists(parent, options);
+            if (parentStatus == null) {
+                throw new AccessDeniedException("Receive file parent (" + parent + ") existence status cannot be determined for " + path);
+            }
+
+            if (parentStatus.booleanValue() && Files.isDirectory(parent, options)) {
+                file = path;
+            }
+        }
+        
+        if (file == null) {
+            throw new IOException("Can not write to " + path);
+        }
+        
+        Boolean fileStatus = IoUtils.checkFileExists(file, options);
+        if (fileStatus == null) {
+            throw new AccessDeniedException("Receive file existence status cannot be determined: " + file);
+        }
+
+        if (fileStatus.booleanValue()) {
+            if (Files.isDirectory(file, options)) {
+                throw new IOException("File is a directory: " + file);
+            }
+
+            if (!Files.isWritable(file)) {
+                throw new IOException("Can not write to file: " + file);
+            }
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("resolveTargetStream(" + name + "): " + file);
+        }
+
+        return Files.newOutputStream(file);
+    }
+
+    @Override
+    public Path getEventListenerFilePath() {
+        if (file == null) {
+            return path;
+        } else {
+            return file;
+        }
+    }
+
+    @Override
+    public void postProcessReceivedData(String name, boolean preserve, Set<PosixFilePermission> perms, ScpTimestamp time) throws IOException {
+        if (file == null) {
+            throw new StreamCorruptedException("postProcessReceivedData(" + name + ")[" + perms + "] No currently resolved data");
+        }
+
+        if (preserve) {
+            updateFileProperties(name, file, perms, time);
+        }
+    }
+
+    protected void updateFileProperties(String name, Path path, Set<PosixFilePermission> perms, ScpTimestamp time) throws IOException {
+        if (log.isTraceEnabled()) {
+            log.trace("updateFileProperties(" + name + ")[" + path + "] permissions: " + perms);
+        }
+        IoUtils.setPermissions(path, perms);
+
+        if (time != null) {
+            BasicFileAttributeView view = Files.getFileAttributeView(path, BasicFileAttributeView.class);
+            FileTime lastModified = FileTime.from(time.lastModifiedTime, TimeUnit.MILLISECONDS);
+            FileTime lastAccess = FileTime.from(time.lastAccessTime, TimeUnit.MILLISECONDS);
+            if (log.isTraceEnabled()) {
+                log.trace("updateFileProperties(" + name + ")[" + path + "] last-modified=" + lastModified + ", last-access=" + lastAccess);
+            }
+
+            view.setTimes(lastModified, lastAccess, null);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(getEventListenerFilePath());
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9b60dcc5/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpHelper.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpHelper.java b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpHelper.java
index d1ee197..b7d244e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpHelper.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/scp/ScpHelper.java
@@ -21,10 +21,10 @@ package org.apache.sshd.common.scp;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.File;
-import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.StreamCorruptedException;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.FileSystem;
@@ -41,16 +41,23 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.file.util.MockPath;
 import org.apache.sshd.common.scp.ScpTransferEventListener.FileOperation;
 import org.apache.sshd.common.util.AbstractLoggingBean;
 import org.apache.sshd.common.util.DirectoryScanner;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.IoUtils;
+import org.apache.sshd.common.util.io.LimitInputStream;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class ScpHelper extends AbstractLoggingBean {
+    /**
+     * Command prefix used to identify SCP commands
+     */
+    public static final String SCP_COMMAND_PREFIX = "scp";
+
     public static final int OK = 0;
     public static final int WARNING = 1;
     public static final int ERROR = 2;
@@ -91,46 +98,98 @@ public class ScpHelper extends AbstractLoggingBean {
         this.listener = (eventListener == null) ? ScpTransferEventListener.EMPTY : eventListener;
     }
 
-    public void receive(Path path, boolean recursive, boolean shouldBeDir, boolean preserve, int bufferSize) throws IOException {
+    public void receiveFileStream(final OutputStream local, final int bufferSize) throws IOException {
+        receive(new ScpReceiveLineHandler() {
+            @Override
+            public void process(final String line, boolean isDir, ScpTimestamp timestamp) throws IOException {
+                if (isDir) {
+                    throw new StreamCorruptedException("Cannot download a directory into a file stream: " + line);
+                }
+
+                final Path path = new MockPath(line);
+                receiveStream(line, new ScpTargetStreamResolver() {
+                    @SuppressWarnings("synthetic-access")
+                    @Override
+                    public OutputStream resolveTargetStream(String name, long length, Set<PosixFilePermission> perms) throws IOException {
+                        if (log.isDebugEnabled()) {
+                            log.debug("resolveTargetStream(" + name + ")[" + perms + "][len=" + length + "] started local stream download");
+                        }
+                        return local;
+                    }
+
+                    @Override
+                    public Path getEventListenerFilePath() {
+                        return path;
+                    }
+
+                    @Override
+                    @SuppressWarnings("synthetic-access")
+                    public void postProcessReceivedData(String name, boolean preserve, Set<PosixFilePermission> perms, ScpTimestamp time) throws IOException {
+                        if (log.isDebugEnabled()) {
+                            log.debug("postProcessReceivedData(" + name + ")[" + perms + "][time=" + time + "] ended local stream download");
+                        }
+                    }
+
+                    @Override
+                    public String toString() {
+                        return line;
+                    }
+                }, timestamp, false, bufferSize);
+            }
+        });
+    }
+
+    public void receive(final Path path, final boolean recursive, boolean shouldBeDir, final boolean preserve, final int bufferSize) throws IOException {
         if (shouldBeDir) {
-            LinkOption[]    options=IoUtils.getLinkOptions(false);
-            Boolean         status=IoUtils.checkFileExists(path, options);
+            LinkOption[] options = IoUtils.getLinkOptions(false);
+            Boolean      status = IoUtils.checkFileExists(path, options);
             if (status == null) {
-                throw new SshException("Target directory " + path.toString() + " is most like inaccessible");
+                throw new SshException("Target directory " + path + " is most like inaccessible");
             }
             if (!status.booleanValue()) {
-                throw new SshException("Target directory " + path.toString() + " does not exist");
+                throw new SshException("Target directory " + path + " does not exist");
             }
             if (!Files.isDirectory(path, options)) {
-                throw new SshException("Target directory " + path.toString() + " is not a directory");
+                throw new SshException("Target directory " + path + " is not a directory");
             }
         }
 
+        receive(new ScpReceiveLineHandler() {
+            @Override
+            public void process(String line, boolean isDir, ScpTimestamp time) throws IOException {
+                if (recursive && isDir) {
+                    receiveDir(line, path, time, preserve, bufferSize);
+                } else {
+                    receiveFile(line, path, time, preserve, bufferSize);
+                }
+            }
+        });
+    }
+
+    protected void receive(ScpReceiveLineHandler handler) throws IOException {
         ack();
-        long[] time = null;
-        for (;;)
-        {
+        ScpTimestamp time = null;
+        for (;;) {
             String line;
             boolean isDir = false;
             int c = readAck(true);
-            switch (c)
-            {
+            switch (c) {
                 case -1:
                     return;
                 case 'D':
                     isDir = true;
                 case 'C':
-                    line = ((char) c) + readLine();
+                    line = String.valueOf((char) c) + readLine();
                     log.debug("Received header: " + line);
                     break;
                 case 'T':
-                    line = ((char) c) + readLine();
+                    line = String.valueOf((char) c) + readLine();
                     log.debug("Received header: " + line);
-                    time = parseTime(line);
+                    time = ScpTimestamp.parseTime(line);
                     ack();
                     continue;
                 case 'E':
-                    line = ((char) c) + readLine();
+                    line = String.valueOf((char) c) + readLine();
                     log.debug("Received header: " + line);
                     ack();
                     return;
@@ -139,21 +198,14 @@ public class ScpHelper extends AbstractLoggingBean {
                     continue;
             }
 
-            if (recursive && isDir)
-            {
-                receiveDir(line, path, time, preserve, bufferSize);
-                time = null;
-            }
-            else
-            {
-                receiveFile(line, path, time, preserve, bufferSize);
+            try {
+                handler.process(line, isDir, time);
+            } finally {
                 time = null;
             }
         }
     }
-
-
-    public void receiveDir(String header, Path path, long[] time, boolean preserve, int bufferSize) throws IOException {
+    public void receiveDir(String header, Path path, ScpTimestamp time, boolean preserve, int bufferSize) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("Receiving directory {}", path);
         }
@@ -169,8 +221,8 @@ public class ScpHelper extends AbstractLoggingBean {
             throw new IOException("Expected 0 length for directory but got " + length);
         }
 
-        LinkOption[]    options=IoUtils.getLinkOptions(false);
-        Boolean         status=IoUtils.checkFileExists(path, options);
+        LinkOption[] options = IoUtils.getLinkOptions(false);
+        Boolean status = IoUtils.checkFileExists(path, options);
         if (status == null) {
             throw new AccessDeniedException("Receive directory existence status cannot be determined: " + path);
         }
@@ -180,7 +232,7 @@ public class ScpHelper extends AbstractLoggingBean {
             String localName = name.replace('/', File.separatorChar);
             file = path.resolve(localName);
         } else if (!status.booleanValue()) {
-            Path    parent=path.getParent();
+            Path parent = path.getParent();
 
             status = IoUtils.checkFileExists(parent, options);
             if (status == null) {
@@ -206,13 +258,7 @@ public class ScpHelper extends AbstractLoggingBean {
         }
 
         if (preserve) {
-            IoUtils.setPermissions(path, perms);
-            if (time != null) {
-                Files.getFileAttributeView(file, BasicFileAttributeView.class)
-                        .setTimes(FileTime.from(time[0], TimeUnit.SECONDS),
-                                FileTime.from(time[1], TimeUnit.SECONDS),
-                                null);
-            }
+            updateFileProperties(file, perms, time);
         }
 
         ack();
@@ -235,7 +281,7 @@ public class ScpHelper extends AbstractLoggingBean {
                     ack();
                     break;
                 } else if (header.startsWith("T")) {
-                    time = parseTime(header);
+                    time = ScpTimestamp.parseTime(header);
                     ack();
                 } else {
                     throw new IOException("Unexpected message: '" + header + "'");
@@ -247,30 +293,35 @@ public class ScpHelper extends AbstractLoggingBean {
         }
     }
 
-    public void receiveFile(String header, Path path, long[] time, boolean preserve, int bufferSize) throws IOException {
+    public void receiveFile(String header, Path path, ScpTimestamp time, boolean preserve, int bufferSize) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("Receiving file {}", path);
         }
+
+        receiveStream(header, new LocalFileScpTargetStreamResolver(path), time, preserve, bufferSize);
+    }
+
+    public void receiveStream(String header, ScpTargetStreamResolver resolver, ScpTimestamp time, boolean preserve, int bufferSize) throws IOException {
         if (!header.startsWith("C")) {
-            throw new IOException("Expected a C message but got '" + header + "'");
+            throw new IOException("receiveStream(" + resolver + ") Expected a C message but got '" + header + "'");
         }
 
         if (bufferSize < MIN_RECEIVE_BUFFER_SIZE) {
-            throw new IOException("receiveFile(" + path + ") buffer size (" + bufferSize + ") below minimum (" + MIN_RECEIVE_BUFFER_SIZE + ")");
+            throw new IOException("receiveStream(" + resolver + ") buffer size (" + bufferSize + ") below minimum (" + MIN_RECEIVE_BUFFER_SIZE + ")");
         }
 
         Set<PosixFilePermission> perms = parseOctalPerms(header.substring(1, 5));
         final long length = Long.parseLong(header.substring(6, header.indexOf(' ', 6)));
         String name = header.substring(header.indexOf(' ', 6) + 1);
         if (length < 0L) { // TODO consider throwing an exception...
-            log.warn("receiveFile(" + path + ") bad length in header: " + header);
+            log.warn("receiveStream(" + resolver + ") bad length in header: " + header);
         }
 
         // if file size is less than buffer size allocate only expected file size
         int bufSize;
         if (length == 0L) {
             if (log.isDebugEnabled()) {
-                log.debug("receiveFile(" + path + ") zero file size (perhaps special file) using copy buffer size=" + MIN_RECEIVE_BUFFER_SIZE);
+                log.debug("receiveStream(" + resolver + ") zero file size (perhaps special file) using copy buffer size=" + MIN_RECEIVE_BUFFER_SIZE);
             }
             bufSize = MIN_RECEIVE_BUFFER_SIZE;
         } else {
@@ -278,60 +329,17 @@ public class ScpHelper extends AbstractLoggingBean {
         }
 
         if (bufSize < 0) { // TODO consider throwing an exception
-            log.warn("receiveFile(" + path + ") bad buffer size (" + bufSize + ") using default (" + MIN_RECEIVE_BUFFER_SIZE + ")");
+            log.warn("receiveFile(" + resolver + ") bad buffer size (" + bufSize + ") using default (" + MIN_RECEIVE_BUFFER_SIZE + ")");
             bufSize = MIN_RECEIVE_BUFFER_SIZE;
         }
 
-        LinkOption[]    options=IoUtils.getLinkOptions(false);
-        Boolean         status=IoUtils.checkFileExists(path, options);
-        if (status == null) {
-            throw new AccessDeniedException("Receive target file path existence status cannot be determined: " + path);
-        }
-
-        Path file=null;
-        if (status.booleanValue() && Files.isDirectory(path, options)) {
-            String localName = name.replace('/', File.separatorChar);
-            file = path.resolve(localName);
-        } else if (status.booleanValue() && Files.isRegularFile(path, options)) {
-            file = path;
-        } else if (!status.booleanValue()) {
-            Path    parent=path.getParent();
-            
-            status = IoUtils.checkFileExists(parent, options);
-            if (status == null) {
-                throw new AccessDeniedException("Receive file parent (" + parent + ") existence status cannot be determined for " + path);
-            }
-
-            if (status.booleanValue() && Files.isDirectory(parent, options)) {
-                file = path;
-            }
-        }
-        
-        if (file == null) {
-            throw new IOException("Can not write to " + path);
-        }
-        
-        status = IoUtils.checkFileExists(file, options);
-        if (status == null) {
-            throw new AccessDeniedException("Receive file existence status cannot be determined: " + file);
-        }
-
-        if (status.booleanValue()) {
-            if (Files.isDirectory(file, options)) {
-                throw new IOException("File is a directory: " + file);
-            }
-
-            if (!Files.isWritable(file)) {
-                throw new IOException("Can not write to file: " + file);
-            }
-        }
-
         try (
                 InputStream is = new LimitInputStream(this.in, length);
-                OutputStream os = Files.newOutputStream(file)
+                OutputStream os = resolver.resolveTargetStream(name, length, perms)
         ) {
             ack();
 
+            Path file = resolver.getEventListenerFilePath();
             try {
                 listener.startFileEvent(FileOperation.RECEIVE, file, length, perms);
                 IoUtils.copy(is, os, bufSize);
@@ -342,18 +350,28 @@ public class ScpHelper extends AbstractLoggingBean {
             }
         }
 
-        if (preserve) {
-            IoUtils.setPermissions(file, perms);
-            if (time != null) {
-                Files.getFileAttributeView(file, BasicFileAttributeView.class)
-                        .setTimes(FileTime.from(time[0], TimeUnit.SECONDS),
-                                FileTime.from(time[1], TimeUnit.SECONDS),
-                                null);
-            }
-        }
+        resolver.postProcessReceivedData(name, preserve, perms, time);
 
         ack();
         readAck(false);
+
+    }
+
+    protected void updateFileProperties(Path file, Set<PosixFilePermission> perms, ScpTimestamp time) throws IOException {
+        if (log.isTraceEnabled()) {
+            log.trace("updateFileProperties(" + file + ") permissions: " + perms);
+        }
+        IoUtils.setPermissions(file, perms);
+
+        if (time != null) {
+            BasicFileAttributeView view = Files.getFileAttributeView(file, BasicFileAttributeView.class);
+            FileTime lastModified = FileTime.from(time.lastModifiedTime, TimeUnit.MILLISECONDS);
+            FileTime lastAccess = FileTime.from(time.lastAccessTime, TimeUnit.MILLISECONDS);
+            if (log.isTraceEnabled()) {
+                log.trace("updateFileProperties(" + file + ") last-modified=" + lastModified + ", last-access=" + lastAccess);
+            }
+            view.setTimes(lastModified, lastAccess, null);
+        }
     }
 
     public String readLine() throws IOException {
@@ -472,50 +490,20 @@ public class ScpHelper extends AbstractLoggingBean {
             log.debug("Sending file {}", path);
         }
 
-        if (bufferSize < MIN_SEND_BUFFER_SIZE) {
-            throw new IOException("sendFile(" + path + ") buffer size (" + bufferSize + ") below minimum (" + MIN_SEND_BUFFER_SIZE + ")");
-        }
-
-        BasicFileAttributes basic = Files.getFileAttributeView(path, BasicFileAttributeView.class).readAttributes();
-        if (preserve) {
-            StringBuilder buf = new StringBuilder();
-            buf.append("T");
-            buf.append(basic.lastModifiedTime().to(TimeUnit.SECONDS));
-            buf.append(" ");
-            buf.append("0");
-            buf.append(" ");
-            buf.append(basic.lastAccessTime().to(TimeUnit.SECONDS));
-            buf.append(" ");
-            buf.append("0");
-            buf.append("\n");
-            out.write(buf.toString().getBytes());
-            out.flush();
-            readAck(false);
-        }
-
-        Set<PosixFilePermission> perms = IoUtils.getPermissions(path);
-        StringBuilder buf = new StringBuilder();
-        buf.append("C");
-        buf.append(preserve ? getOctalPerms(perms) : "0644");
-        buf.append(" ");
-        buf.append(basic.size()); // length
-        buf.append(" ");
-        buf.append(path.getFileName().toString());
-        buf.append("\n");
-        out.write(buf.toString().getBytes());
-        out.flush();
-        readAck(false);
+        sendStream(new LocalFileScpSourceStreamResolver(path), preserve, bufferSize);
+    }
 
-        long fileSize = Files.size(path);
-        if (fileSize < 0L) { // TODO consider throwing an exception...
-            log.warn("sendFile(" + path + ") bad file size: " + fileSize);
+    public void sendStream(ScpSourceStreamResolver resolver, boolean preserve, int bufferSize) throws IOException {
+        if (bufferSize < MIN_SEND_BUFFER_SIZE) {
+            throw new IOException("sendStream(" + resolver + ") buffer size (" + bufferSize + ") below minimum (" + MIN_SEND_BUFFER_SIZE + ")");
         }
 
+        long fileSize = resolver.getSize();
         // if file size is less than buffer size allocate only expected file size
         int bufSize;
-        if (fileSize == 0L) {
+        if (fileSize <= 0L) {
             if (log.isDebugEnabled()) {
-                log.debug("sendFile(" + path + ") zero file size (perhaps special file) using copy buffer size=" + MIN_SEND_BUFFER_SIZE);
+                log.debug("sendStream(" + resolver + ") unknown file size (" + fileSize + ")  perhaps special file - using copy buffer size=" + MIN_SEND_BUFFER_SIZE);
             }
             bufSize = MIN_SEND_BUFFER_SIZE;
         } else {
@@ -523,11 +511,37 @@ public class ScpHelper extends AbstractLoggingBean {
         }
 
         if (bufSize < 0) { // TODO consider throwing an exception
-            log.warn("sendFile(" + path + ") bad buffer size (" + bufSize + ") using default (" + MIN_SEND_BUFFER_SIZE + ")");
+            log.warn("sendStream(" + resolver + ") bad buffer size (" + bufSize + ") using default (" + MIN_SEND_BUFFER_SIZE + ")");
             bufSize = MIN_SEND_BUFFER_SIZE;
         }
 
-        try (InputStream in = Files.newInputStream(path)) {
+        ScpTimestamp time = resolver.getTimestamp();
+        if (preserve && (time != null)) {
+            String cmd = new StringBuilder(Long.SIZE)
+                    .append('T').append(TimeUnit.MILLISECONDS.toSeconds(time.lastModifiedTime)).append(' ').append('0')
+                    .append(' ').append(TimeUnit.MILLISECONDS.toSeconds(time.lastAccessTime)).append(' ').append('0')
+                    .append('\n')
+                    .toString();
+            out.write(cmd.getBytes());
+            out.flush();
+            readAck(false);
+        }
+
+        Set<PosixFilePermission> perms = EnumSet.copyOf(resolver.getPermissions());
+        String octalPerms = preserve ? getOctalPerms(perms) : "0644";
+        String fileName = resolver.getFileName();
+        String cmd = new StringBuilder(octalPerms.length() + fileName.length() + Long.SIZE /* some extra delimiters */)
+            .append('C').append(octalPerms)
+            .append(' ').append(fileSize)
+            .append(' ').append(fileName)
+            .append('\n')
+            .toString();
+        out.write(cmd.getBytes());
+        out.flush();
+        readAck(false);
+
+        try (InputStream in = resolver.resolveSourceStream()) {
+            Path path = resolver.getEventListenerFilePath();
             try {
                 listener.startFileEvent(FileOperation.SEND, path, fileSize, perms);
                 IoUtils.copy(in, out, bufSize);
@@ -545,6 +559,7 @@ public class ScpHelper extends AbstractLoggingBean {
         if (log.isDebugEnabled()) {
             log.debug("Sending directory {}", path);
         }
+
         BasicFileAttributes basic = Files.getFileAttributeView(path, BasicFileAttributeView.class).readAttributes();
         if (preserve) {
             StringBuilder buf = new StringBuilder();
@@ -579,7 +594,7 @@ public class ScpHelper extends AbstractLoggingBean {
             listener.startFolderEvent(FileOperation.SEND, path, perms);
 
             try {
-                LinkOption[]    options = IoUtils.getLinkOptions(false);
+                LinkOption[] options = IoUtils.getLinkOptions(false);
                 for (Path child : children) {
                     if (Files.isRegularFile(child, options)) {
                         sendFile(child, preserve, bufferSize);
@@ -600,11 +615,6 @@ public class ScpHelper extends AbstractLoggingBean {
         readAck(false);
     }
 
-    private long[] parseTime(String line) {
-        String[] numbers = line.substring(1).split(" ");
-        return new long[]{Long.parseLong(numbers[0]), Long.parseLong(numbers[2])};
-    }
-
     public static String getOctalPerms(Path path) throws IOException {
         return getOctalPerms(IoUtils.getPermissions(path));
     }
@@ -713,61 +723,4 @@ public class ScpHelper extends AbstractLoggingBean {
         }
         return c;
     }
-
-    private static class LimitInputStream extends FilterInputStream {
-
-        private long remaining;
-
-        public LimitInputStream(InputStream in, long length) {
-            super(in);
-            remaining = length;
-        }
-
-        @Override
-        public int read() throws IOException {
-            if (remaining > 0) {
-                remaining--;
-                return super.read();
-            } else {
-                return -1;
-            }
-        }
-
-        @Override
-        public int read(byte[] b, int off, int len) throws IOException {
-            int nb = len;
-            if (nb > remaining) {
-                nb = (int) remaining;
-            }
-            if (nb > 0) {
-                int read = super.read(b, off, nb);
-                remaining -= read;
-                return read;
-            } else {
-                return -1;
-            }
-        }
-
-        @Override
-        public long skip(long n) throws IOException {
-            long skipped = super.skip(n);
-            remaining -= skipped;
-            return skipped;
-        }
-
-        @Override
-        public int available() throws IOException {
-            int av = super.available();
-            if (av > remaining) {
-                return (int) remaining;
-            } else {
-                return av;
-            }
-        }
-
-        @Override
-        public void close() throws IOException {
-            // do not close the original input stream since it serves for ACK(s)
-        }
-    }
 }