You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/04/16 11:47:53 UTC

[06/30] mina-sshd git commit: [SSHD-815] Extract SFTP in its own module

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
new file mode 100644
index 0000000..3743477
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.UnknownServiceException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystemLoopException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.NotDirectoryException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.sshd.common.Factory;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.digest.BuiltinDigests;
+import org.apache.sshd.common.digest.DigestFactory;
+import org.apache.sshd.common.file.FileSystemAware;
+import org.apache.sshd.common.random.Random;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.subsystem.sftp.extensions.openssh.FsyncExtensionParser;
+import org.apache.sshd.common.subsystem.sftp.extensions.openssh.HardLinkExtensionParser;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.BufferUtils;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.IoUtils;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.session.ServerSession;
+
+/**
+ * SFTP subsystem
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class SftpSubsystem
+        extends AbstractSftpSubsystemHelper
+        implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier {
+
+    /**
+     * Properties key for the maximum of available open handles per session.
+     */
+    public static final String MAX_OPEN_HANDLES_PER_SESSION = "max-open-handles-per-session";
+    public static final int DEFAULT_MAX_OPEN_HANDLES = Integer.MAX_VALUE;
+
+    /**
+     * Size in bytes of the opaque handle value
+     *
+     * @see #DEFAULT_FILE_HANDLE_SIZE
+     */
+    public static final String FILE_HANDLE_SIZE = "sftp-handle-size";
+    public static final int MIN_FILE_HANDLE_SIZE = 4;  // ~uint32
+    public static final int DEFAULT_FILE_HANDLE_SIZE = 16;
+    public static final int MAX_FILE_HANDLE_SIZE = 64;  // ~sha512
+
+    /**
+     * Max. rounds to attempt to create a unique file handle - if all handles
+     * already in use after these many rounds, then an exception is thrown
+     *
+     * @see #generateFileHandle(Path)
+     * @see #DEFAULT_FILE_HANDLE_ROUNDS
+     */
+    public static final String MAX_FILE_HANDLE_RAND_ROUNDS = "sftp-handle-rand-max-rounds";
+    public static final int MIN_FILE_HANDLE_ROUNDS = 1;
+    public static final int DEFAULT_FILE_HANDLE_ROUNDS = MIN_FILE_HANDLE_SIZE;
+    public static final int MAX_FILE_HANDLE_ROUNDS = MAX_FILE_HANDLE_SIZE;
+
+    /**
+     * Maximum amount of data allocated for listing the contents of a directory
+     * in any single invocation of {@link #doReadDir(Buffer, int)}
+     *
+     * @see #DEFAULT_MAX_READDIR_DATA_SIZE
+     */
+    public static final String MAX_READDIR_DATA_SIZE_PROP = "sftp-max-readdir-data-size";
+    public static final int DEFAULT_MAX_READDIR_DATA_SIZE = 16 * 1024;
+
+    protected ExitCallback callback;
+    protected InputStream in;
+    protected OutputStream out;
+    protected OutputStream err;
+    protected Environment env;
+    protected Random randomizer;
+    protected int fileHandleSize = DEFAULT_FILE_HANDLE_SIZE;
+    protected int maxFileHandleRounds = DEFAULT_FILE_HANDLE_ROUNDS;
+    protected Future<?> pendingFuture;
+    protected byte[] workBuf = new byte[Math.max(DEFAULT_FILE_HANDLE_SIZE, Integer.BYTES)];
+    protected FileSystem fileSystem = FileSystems.getDefault();
+    protected Path defaultDir = fileSystem.getPath(System.getProperty("user.dir"));
+    protected long requestsCount;
+    protected int version;
+    protected final Map<String, byte[]> extensions = new TreeMap<>(Comparator.naturalOrder());
+    protected final Map<String, Handle> handles = new HashMap<>();
+
+    private ServerSession serverSession;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private ExecutorService executorService;
+    private boolean shutdownOnExit;
+
+    /**
+     * @param executorService The {@link ExecutorService} to be used by
+     *                        the {@link SftpSubsystem} command when starting execution. If
+     *                        {@code null} then a single-threaded ad-hoc service is used.
+     * @param shutdownOnExit  If {@code true} the {@link ExecutorService#shutdownNow()}
+     *                        will be called when subsystem terminates - unless it is the ad-hoc
+     *                        service, which will be shutdown regardless
+     * @param policy          The {@link UnsupportedAttributePolicy} to use if failed to access
+     *                        some local file attributes
+     * @param accessor        The {@link SftpFileSystemAccessor} to use for opening files and directories
+     * @param errorStatusDataHandler The (never {@code null}) {@link SftpErrorStatusDataHandler} to
+     * use when generating failed commands error messages
+     * @see ThreadUtils#newSingleThreadExecutor(String)
+     */
+    public SftpSubsystem(ExecutorService executorService, boolean shutdownOnExit, UnsupportedAttributePolicy policy,
+            SftpFileSystemAccessor accessor, SftpErrorStatusDataHandler errorStatusDataHandler) {
+        super(policy, accessor, errorStatusDataHandler);
+
+        if (executorService == null) {
+            this.executorService = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName());
+            this.shutdownOnExit = true;    // we always close the ad-hoc executor service
+        } else {
+            this.executorService = executorService;
+            this.shutdownOnExit = shutdownOnExit;
+        }
+    }
+
+    @Override
+    public int getVersion() {
+        return version;
+    }
+
+    @Override
+    public Path getDefaultDirectory() {
+        return defaultDir;
+    }
+
+    @Override
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    @Override
+    public boolean isShutdownOnExit() {
+        return shutdownOnExit;
+    }
+
+    @Override
+    public void setSession(ServerSession session) {
+        this.serverSession = Objects.requireNonNull(session, "No session");
+
+        FactoryManager manager = session.getFactoryManager();
+        Factory<? extends Random> factory = manager.getRandomFactory();
+        this.randomizer = factory.create();
+
+        this.fileHandleSize = session.getIntProperty(FILE_HANDLE_SIZE, DEFAULT_FILE_HANDLE_SIZE);
+        ValidateUtils.checkTrue(this.fileHandleSize >= MIN_FILE_HANDLE_SIZE, "File handle size too small: %d", this.fileHandleSize);
+        ValidateUtils.checkTrue(this.fileHandleSize <= MAX_FILE_HANDLE_SIZE, "File handle size too big: %d", this.fileHandleSize);
+
+        this.maxFileHandleRounds = session.getIntProperty(MAX_FILE_HANDLE_RAND_ROUNDS, DEFAULT_FILE_HANDLE_ROUNDS);
+        ValidateUtils.checkTrue(this.maxFileHandleRounds >= MIN_FILE_HANDLE_ROUNDS, "File handle rounds too small: %d", this.maxFileHandleRounds);
+        ValidateUtils.checkTrue(this.maxFileHandleRounds <= MAX_FILE_HANDLE_ROUNDS, "File handle rounds too big: %d", this.maxFileHandleRounds);
+
+        if (workBuf.length < this.fileHandleSize) {
+            workBuf = new byte[this.fileHandleSize];
+        }
+    }
+
+    @Override
+    public ServerSession getServerSession() {
+        return serverSession;
+    }
+
+    @Override
+    public void setFileSystem(FileSystem fileSystem) {
+        if (fileSystem != this.fileSystem) {
+            this.fileSystem = fileSystem;
+
+            Iterable<Path> roots = Objects.requireNonNull(fileSystem.getRootDirectories(), "No root directories");
+            Iterator<Path> available = Objects.requireNonNull(roots.iterator(), "No roots iterator");
+            ValidateUtils.checkTrue(available.hasNext(), "No available root");
+            this.defaultDir = available.next();
+        }
+    }
+
+    @Override
+    public void setExitCallback(ExitCallback callback) {
+        this.callback = callback;
+    }
+
+    @Override
+    public void setInputStream(InputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public void setOutputStream(OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void setErrorStream(OutputStream err) {
+        this.err = err;
+    }
+
+    @Override
+    public void start(Environment env) throws IOException {
+        this.env = env;
+        try {
+            ExecutorService executor = getExecutorService();
+            pendingFuture = executor.submit(this);
+        } catch (RuntimeException e) {    // e.g., RejectedExecutionException
+            log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.toString(), e);
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (long count = 1L;; count++) {
+                int length = BufferUtils.readInt(in, workBuf, 0, workBuf.length);
+                ValidateUtils.checkTrue(length >= (Integer.BYTES + 1 /* command */), "Bad length to read: %d", length);
+
+                Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES + Long.SIZE /* a bit extra */, false);
+                buffer.putInt(length);
+                for (int remainLen = length; remainLen > 0;) {
+                    int l = in.read(buffer.array(), buffer.wpos(), remainLen);
+                    if (l < 0) {
+                        throw new IllegalArgumentException("Premature EOF at buffer #" + count + " while read length=" + length + " and remain=" + remainLen);
+                    }
+                    buffer.wpos(buffer.wpos() + l);
+                    remainLen -= l;
+                }
+
+                process(buffer);
+            }
+        } catch (Throwable t) {
+            if ((!closed.get()) && (!(t instanceof EOFException))) { // Ignore
+                log.error("run({}) {} caught in SFTP subsystem: {}",
+                          getServerSession(), t.getClass().getSimpleName(), t.getMessage());
+                if (log.isDebugEnabled()) {
+                    log.debug("run(" + getServerSession() + ") caught exception details", t);
+                }
+            }
+        } finally {
+            boolean debugEnabled = log.isDebugEnabled();
+            handles.forEach((id, handle) -> {
+                try {
+                    handle.close();
+                    if (debugEnabled) {
+                        log.debug("run({}) closed pending handle {} [{}]", getServerSession(), id, handle);
+                    }
+                } catch (IOException ioe) {
+                    log.error("run({}) failed ({}) to close handle={}[{}]: {}",
+                          getServerSession(), ioe.getClass().getSimpleName(), id, handle, ioe.getMessage());
+                }
+            });
+
+            callback.onExit(0);
+        }
+    }
+
+    @Override
+    protected void process(Buffer buffer) throws IOException {
+        int length = buffer.getInt();
+        int type = buffer.getUByte();
+        int id = buffer.getInt();
+        if (log.isDebugEnabled()) {
+            log.debug("process({})[length={}, type={}, id={}] processing",
+                      getServerSession(), length, SftpConstants.getCommandMessageName(type), id);
+        }
+
+        switch (type) {
+            case SftpConstants.SSH_FXP_INIT:
+                doInit(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_OPEN:
+                doOpen(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_CLOSE:
+                doClose(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_READ:
+                doRead(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_WRITE:
+                doWrite(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_LSTAT:
+                doLStat(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_FSTAT:
+                doFStat(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_SETSTAT:
+                doSetStat(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_FSETSTAT:
+                doFSetStat(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_OPENDIR:
+                doOpenDir(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_READDIR:
+                doReadDir(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_REMOVE:
+                doRemove(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_MKDIR:
+                doMakeDirectory(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_RMDIR:
+                doRemoveDirectory(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_REALPATH:
+                doRealPath(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_STAT:
+                doStat(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_RENAME:
+                doRename(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_READLINK:
+                doReadLink(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_SYMLINK:
+                doSymLink(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_LINK:
+                doLink(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_BLOCK:
+                doBlock(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_UNBLOCK:
+                doUnblock(buffer, id);
+                break;
+            case SftpConstants.SSH_FXP_EXTENDED:
+                doExtended(buffer, id);
+                break;
+            default:
+            {
+                String name = SftpConstants.getCommandMessageName(type);
+                log.warn("process({})[length={}, type={}, id={}] unknown command",
+                         getServerSession(), length, name, id);
+                sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command " + name + " is unsupported or not implemented");
+            }
+        }
+
+        if (type != SftpConstants.SSH_FXP_INIT) {
+            requestsCount++;
+        }
+    }
+
+    @Override
+    protected void executeExtendedCommand(Buffer buffer, int id, String extension) throws IOException {
+        switch (extension) {
+            case SftpConstants.EXT_TEXT_SEEK:
+                doTextSeek(buffer, id);
+                break;
+            case SftpConstants.EXT_VERSION_SELECT:
+                doVersionSelect(buffer, id);
+                break;
+            case SftpConstants.EXT_COPY_FILE:
+                doCopyFile(buffer, id);
+                break;
+            case SftpConstants.EXT_COPY_DATA:
+                doCopyData(buffer, id);
+                break;
+            case SftpConstants.EXT_MD5_HASH:
+            case SftpConstants.EXT_MD5_HASH_HANDLE:
+                doMD5Hash(buffer, id, extension);
+                break;
+            case SftpConstants.EXT_CHECK_FILE_HANDLE:
+            case SftpConstants.EXT_CHECK_FILE_NAME:
+                doCheckFileHash(buffer, id, extension);
+                break;
+            case FsyncExtensionParser.NAME:
+                doOpenSSHFsync(buffer, id);
+                break;
+            case SftpConstants.EXT_SPACE_AVAILABLE:
+                doSpaceAvailable(buffer, id);
+                break;
+            case HardLinkExtensionParser.NAME:
+                doOpenSSHHardLink(buffer, id);
+                break;
+            default:
+                if (log.isDebugEnabled()) {
+                    log.debug("executeExtendedCommand({}) received unsupported SSH_FXP_EXTENDED({})", getServerSession(), extension);
+                }
+                sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command SSH_FXP_EXTENDED(" + extension + ") is unsupported or not implemented");
+                break;
+        }
+    }
+
+    @Override
+    protected void createLink(int id, String existingPath, String linkPath, boolean symLink) throws IOException {
+        Path link = resolveFile(linkPath);
+        Path existing = fileSystem.getPath(existingPath);
+        if (log.isDebugEnabled()) {
+            log.debug("createLink({})[id={}], existing={}[{}], link={}[{}], symlink={})",
+                      getServerSession(), id, linkPath, link, existingPath, existing, symLink);
+        }
+
+        SftpEventListener listener = getSftpEventListenerProxy();
+        ServerSession session = getServerSession();
+        listener.linking(session, link, existing, symLink);
+        try {
+            if (symLink) {
+                Files.createSymbolicLink(link, existing);
+            } else {
+                Files.createLink(link, existing);
+            }
+        } catch (IOException | RuntimeException e) {
+            listener.linked(session, link, existing, symLink, e);
+            throw e;
+        }
+        listener.linked(session, link, existing, symLink, null);
+    }
+
+    @Override
+    protected void doTextSeek(int id, String handle, long line) throws IOException {
+        Handle h = handles.get(handle);
+        if (log.isDebugEnabled()) {
+            log.debug("doTextSeek({})[id={}] SSH_FXP_EXTENDED(text-seek) (handle={}[{}], line={})",
+                      getServerSession(), id, handle, h, line);
+        }
+
+        FileHandle fileHandle = validateHandle(handle, h, FileHandle.class);
+        throw new UnknownServiceException("doTextSeek(" + fileHandle + ")");
+    }
+
+    @Override
+    protected void doOpenSSHFsync(int id, String handle) throws IOException {
+        Handle h = handles.get(handle);
+        if (log.isDebugEnabled()) {
+            log.debug("doOpenSSHFsync({})[id={}] {}[{}]", getServerSession(), id, handle, h);
+        }
+
+        FileHandle fileHandle = validateHandle(handle, h, FileHandle.class);
+        SftpFileSystemAccessor accessor = getFileSystemAccessor();
+        ServerSession session = getServerSession();
+        accessor.syncFileData(session, this, fileHandle.getFile(), fileHandle.getFileHandle(), fileHandle.getFileChannel());
+    }
+
+    @Override
+    protected void doCheckFileHash(
+            int id, String targetType, String target, Collection<String> algos,
+            long startOffset, long length, int blockSize, Buffer buffer)
+                    throws Exception {
+        Path path;
+        if (SftpConstants.EXT_CHECK_FILE_HANDLE.equalsIgnoreCase(targetType)) {
+            Handle h = handles.get(target);
+            FileHandle fileHandle = validateHandle(target, h, FileHandle.class);
+            path = fileHandle.getFile();
+
+            /*
+             * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section 9.1.2:
+             *
+             *       If ACE4_READ_DATA was not included when the file was opened,
+             *       the server MUST return STATUS_PERMISSION_DENIED.
+             */
+            int access = fileHandle.getAccessMask();
+            if ((access & SftpConstants.ACE4_READ_DATA) == 0) {
+                throw new AccessDeniedException(path.toString(), path.toString(), "File not opened for read");
+            }
+        } else {
+            path = resolveFile(target);
+
+            /*
+             * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section 9.1.2:
+             *
+             *      If 'check-file-name' refers to a SSH_FILEXFER_TYPE_SYMLINK, the
+             *      target should be opened.
+             */
+            for (int index = 0; Files.isSymbolicLink(path) && (index < Byte.MAX_VALUE /* TODO make this configurable */); index++) {
+                path = Files.readSymbolicLink(path);
+            }
+
+            if (Files.isSymbolicLink(path)) {
+                throw new FileSystemLoopException(target);
+            }
+
+            if (Files.isDirectory(path, IoUtils.getLinkOptions(false))) {
+                throw new NotDirectoryException(path.toString());
+            }
+        }
+
+        ValidateUtils.checkNotNullAndNotEmpty(algos, "No hash algorithms specified");
+
+        DigestFactory factory = null;
+        for (String a : algos) {
+            factory = BuiltinDigests.fromFactoryName(a);
+            if ((factory != null) && factory.isSupported()) {
+                break;
+            }
+        }
+        ValidateUtils.checkNotNull(factory, "No matching digest factory found for %s", algos);
+
+        doCheckFileHash(id, path, factory, startOffset, length, blockSize, buffer);
+    }
+
+    @Override
+    protected byte[] doMD5Hash(
+            int id, String targetType, String target, long startOffset, long length, byte[] quickCheckHash)
+                    throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("doMD5Hash({})({})[{}] offset={}, length={}, quick-hash={}",
+                      getServerSession(), targetType, target, startOffset, length,
+                      BufferUtils.toHex(':', quickCheckHash));
+        }
+
+        Path path;
+        if (SftpConstants.EXT_MD5_HASH_HANDLE.equalsIgnoreCase(targetType)) {
+            Handle h = handles.get(target);
+            FileHandle fileHandle = validateHandle(target, h, FileHandle.class);
+            path = fileHandle.getFile();
+
+            /*
+             * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section 9.1.1:
+             *
+             *      The handle MUST be a file handle, and ACE4_READ_DATA MUST
+             *      have been included in the desired-access when the file
+             *      was opened
+             */
+            int access = fileHandle.getAccessMask();
+            if ((access & SftpConstants.ACE4_READ_DATA) == 0) {
+                throw new AccessDeniedException(path.toString(), path.toString(), "File not opened for read");
+            }
+        } else {
+            path = resolveFile(target);
+            if (Files.isDirectory(path, IoUtils.getLinkOptions(true))) {
+                throw new NotDirectoryException(path.toString());
+            }
+        }
+
+        /*
+         * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section 9.1.1:
+         *
+         *      If both start-offset and length are zero, the entire file should be included
+         */
+        long effectiveLength = length;
+        long totalSize = Files.size(path);
+        if ((startOffset == 0L) && (length == 0L)) {
+            effectiveLength = totalSize;
+        } else {
+            long maxRead = startOffset + effectiveLength;
+            if (maxRead > totalSize) {
+                effectiveLength = totalSize - startOffset;
+            }
+        }
+
+        return doMD5Hash(id, path, startOffset, effectiveLength, quickCheckHash);
+    }
+
+    protected void doVersionSelect(Buffer buffer, int id) throws IOException {
+        String proposed = buffer.getString();
+        ServerSession session = getServerSession();
+        /*
+         * The 'version-select' MUST be the first request from the client to the
+         * server; if it is not, the server MUST fail the request and close the
+         * channel.
+         */
+        if (requestsCount > 0L) {
+            sendStatus(BufferUtils.clear(buffer), id,
+                       SftpConstants.SSH_FX_FAILURE,
+                       "Version selection not the 1st request for proposal = " + proposed);
+            session.close(true);
+            return;
+        }
+
+        Boolean result = validateProposedVersion(buffer, id, proposed);
+        /*
+         * "MUST then close the channel without processing any further requests"
+         */
+        if (result == null) {   // response sent internally
+            session.close(true);
+            return;
+        }
+        if (result) {
+            version = Integer.parseInt(proposed);
+            sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+        } else {
+            sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed);
+            session.close(true);
+        }
+    }
+
+    @Override
+    protected void doBlock(int id, String handle, long offset, long length, int mask) throws IOException {
+        Handle p = handles.get(handle);
+        if (log.isDebugEnabled()) {
+            log.debug("doBlock({})[id={}] SSH_FXP_BLOCK (handle={}[{}], offset={}, length={}, mask=0x{})",
+                      getServerSession(), id, handle, p, offset, length, Integer.toHexString(mask));
+        }
+
+        FileHandle fileHandle = validateHandle(handle, p, FileHandle.class);
+        SftpEventListener listener = getSftpEventListenerProxy();
+        ServerSession session = getServerSession();
+        listener.blocking(session, handle, fileHandle, offset, length, mask);
+        try {
+            fileHandle.lock(offset, length, mask);
+        } catch (IOException | RuntimeException e) {
+            listener.blocked(session, handle, fileHandle, offset, length, mask, e);
+            throw e;
+        }
+        listener.blocked(session, handle, fileHandle, offset, length, mask, null);
+    }
+
+    @Override
+    protected void doUnblock(int id, String handle, long offset, long length) throws IOException {
+        Handle p = handles.get(handle);
+        if (log.isDebugEnabled()) {
+            log.debug("doUnblock({})[id={}] SSH_FXP_UNBLOCK (handle={}[{}], offset={}, length={})",
+                      getServerSession(), id, handle, p, offset, length);
+        }
+
+        FileHandle fileHandle = validateHandle(handle, p, FileHandle.class);
+        SftpEventListener listener = getSftpEventListenerProxy();
+        ServerSession session = getServerSession();
+        listener.unblocking(session, handle, fileHandle, offset, length);
+        try {
+            fileHandle.unlock(offset, length);
+        } catch (IOException | RuntimeException e) {
+            listener.unblocked(session, handle, fileHandle, offset, length, e);
+            throw e;
+        }
+        listener.unblocked(session, handle, fileHandle, offset, length, null);
+    }
+
+    @Override
+    @SuppressWarnings("resource")
+    protected void doCopyData(int id, String readHandle, long readOffset, long readLength, String writeHandle, long writeOffset) throws IOException {
+        boolean inPlaceCopy = readHandle.equals(writeHandle);
+        Handle rh = handles.get(readHandle);
+        Handle wh = inPlaceCopy ? rh : handles.get(writeHandle);
+        if (log.isDebugEnabled()) {
+            log.debug("doCopyData({})[id={}] SSH_FXP_EXTENDED[{}] read={}[{}], read-offset={}, read-length={}, write={}[{}], write-offset={})",
+                      getServerSession(), id, SftpConstants.EXT_COPY_DATA,
+                      readHandle, rh, readOffset, readLength,
+                      writeHandle, wh, writeOffset);
+        }
+
+        FileHandle srcHandle = validateHandle(readHandle, rh, FileHandle.class);
+        Path srcPath = srcHandle.getFile();
+        int srcAccess = srcHandle.getAccessMask();
+        if ((srcAccess & SftpConstants.ACE4_READ_DATA) != SftpConstants.ACE4_READ_DATA) {
+            throw new AccessDeniedException(srcPath.toString(), srcPath.toString(), "Source file not opened for read");
+        }
+
+        ValidateUtils.checkTrue(readLength >= 0L, "Invalid read length: %d", readLength);
+        ValidateUtils.checkTrue(readOffset >= 0L, "Invalid read offset: %d", readOffset);
+
+        long totalSize = Files.size(srcHandle.getFile());
+        long effectiveLength = readLength;
+        if (effectiveLength == 0L) {
+            effectiveLength = totalSize - readOffset;
+        } else {
+            long maxRead = readOffset + effectiveLength;
+            if (maxRead > totalSize) {
+                effectiveLength = totalSize - readOffset;
+            }
+        }
+        ValidateUtils.checkTrue(effectiveLength > 0L, "Non-positive effective copy data length: %d", effectiveLength);
+
+        FileHandle dstHandle = inPlaceCopy ? srcHandle : validateHandle(writeHandle, wh, FileHandle.class);
+        int dstAccess = dstHandle.getAccessMask();
+        if ((dstAccess & SftpConstants.ACE4_WRITE_DATA) != SftpConstants.ACE4_WRITE_DATA) {
+            throw new AccessDeniedException(srcHandle.toString(), srcHandle.toString(), "Source handle not opened for write");
+        }
+
+        ValidateUtils.checkTrue(writeOffset >= 0L, "Invalid write offset: %d", writeOffset);
+        // check if overlapping ranges as per the draft
+        if (inPlaceCopy) {
+            long maxRead = readOffset + effectiveLength;
+            if (maxRead > totalSize) {
+                maxRead = totalSize;
+            }
+
+            long maxWrite = writeOffset + effectiveLength;
+            if (maxWrite > readOffset) {
+                throw new IllegalArgumentException("Write range end [" + writeOffset + "-" + maxWrite + "]"
+                        + " overlaps with read range [" + readOffset + "-" + maxRead + "]");
+            } else if (maxRead > writeOffset) {
+                throw new IllegalArgumentException("Read range end [" + readOffset + "-" + maxRead + "]"
+                        + " overlaps with write range [" + writeOffset + "-" + maxWrite + "]");
+            }
+        }
+
+        byte[] copyBuf = new byte[Math.min(IoUtils.DEFAULT_COPY_SIZE, (int) effectiveLength)];
+        while (effectiveLength > 0L) {
+            int remainLength = Math.min(copyBuf.length, (int) effectiveLength);
+            int readLen = srcHandle.read(copyBuf, 0, remainLength, readOffset);
+            if (readLen < 0) {
+                throw new EOFException("Premature EOF while still remaining " + effectiveLength + " bytes");
+            }
+            dstHandle.write(copyBuf, 0, readLen, writeOffset);
+
+            effectiveLength -= readLen;
+            readOffset += readLen;
+            writeOffset += readLen;
+        }
+    }
+
+    @Override
+    protected void doReadDir(Buffer buffer, int id) throws IOException {
+        String handle = buffer.getString();
+        Handle h = handles.get(handle);
+        boolean debugEnabled = log.isDebugEnabled();
+        if (debugEnabled) {
+            log.debug("doReadDir({})[id={}] SSH_FXP_READDIR (handle={}[{}])",
+                      getServerSession(), id, handle, h);
+        }
+
+        Buffer reply = null;
+        try {
+            DirectoryHandle dh = validateHandle(handle, h, DirectoryHandle.class);
+            if (dh.isDone()) {
+                sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Directory reading is done");
+                return;
+            }
+
+            Path file = dh.getFile();
+            LinkOption[] options =
+                getPathResolutionLinkOption(SftpConstants.SSH_FXP_READDIR, "", file);
+            Boolean status = IoUtils.checkFileExists(file, options);
+            if (status == null) {
+                throw new AccessDeniedException(file.toString(), file.toString(), "Cannot determine existence of read-dir");
+            }
+
+            if (!status) {
+                throw new NoSuchFileException(file.toString(), file.toString(), "Non-existent directory");
+            } else if (!Files.isDirectory(file, options)) {
+                throw new NotDirectoryException(file.toString());
+            } else if (!Files.isReadable(file)) {
+                throw new AccessDeniedException(file.toString(), file.toString(), "Not readable");
+            }
+
+            if (dh.isSendDot() || dh.isSendDotDot() || dh.hasNext()) {
+                // There is at least one file in the directory or we need to send the "..".
+                // Send only a few files at a time to not create packets of a too
+                // large size or have a timeout to occur.
+
+                reply = BufferUtils.clear(buffer);
+                reply.putByte((byte) SftpConstants.SSH_FXP_NAME);
+                reply.putInt(id);
+
+                int lenPos = reply.wpos();
+                reply.putInt(0);
+
+                ServerSession session = getServerSession();
+                int maxDataSize = session.getIntProperty(MAX_READDIR_DATA_SIZE_PROP, DEFAULT_MAX_READDIR_DATA_SIZE);
+                int count = doReadDir(id, handle, dh, reply, maxDataSize, IoUtils.getLinkOptions(false));
+                BufferUtils.updateLengthPlaceholder(reply, lenPos, count);
+                if ((!dh.isSendDot()) && (!dh.isSendDotDot()) && (!dh.hasNext())) {
+                    dh.markDone();
+                }
+
+                Boolean indicator =
+                    SftpHelper.indicateEndOfNamesList(reply, getVersion(), session, dh.isDone());
+                if (debugEnabled) {
+                    log.debug("doReadDir({})({})[{}] - seding {} entries - eol={}", session, handle, h, count, indicator);
+                }
+            } else {
+                // empty directory
+                dh.markDone();
+                sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Empty directory");
+                return;
+            }
+
+            Objects.requireNonNull(reply, "No reply buffer created");
+        } catch (IOException | RuntimeException e) {
+            sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READDIR, handle);
+            return;
+        }
+
+        send(reply);
+    }
+
+    @Override
+    protected String doOpenDir(int id, String path, Path p, LinkOption... options) throws IOException {
+        Boolean status = IoUtils.checkFileExists(p, options);
+        if (status == null) {
+            throw new AccessDeniedException(p.toString(), p.toString(), "Cannot determine open-dir existence");
+        }
+
+        if (!status) {
+            throw new NoSuchFileException(path, path, "Referenced target directory N/A");
+        } else if (!Files.isDirectory(p, options)) {
+            throw new NotDirectoryException(path);
+        } else if (!Files.isReadable(p)) {
+            throw new AccessDeniedException(p.toString(), p.toString(), "Not readable");
+        } else {
+            String handle = generateFileHandle(p);
+            DirectoryHandle dirHandle = new DirectoryHandle(this, p, handle);
+            handles.put(handle, dirHandle);
+            return handle;
+        }
+    }
+
+    @Override
+    protected void doFSetStat(int id, String handle, Map<String, ?> attrs) throws IOException {
+        Handle h = handles.get(handle);
+        if (log.isDebugEnabled()) {
+            log.debug("doFsetStat({})[id={}] SSH_FXP_FSETSTAT (handle={}[{}], attrs={})",
+                      getServerSession(), id, handle, h, attrs);
+        }
+
+        doSetAttributes(validateHandle(handle, h, Handle.class).getFile(), attrs);
+    }
+
+    @Override
+    protected Map<String, Object> doFStat(int id, String handle, int flags) throws IOException {
+        Handle h = handles.get(handle);
+        if (log.isDebugEnabled()) {
+            log.debug("doFStat({})[id={}] SSH_FXP_FSTAT (handle={}[{}], flags=0x{})",
+                      getServerSession(), id, handle, h, Integer.toHexString(flags));
+        }
+
+        Handle fileHandle = validateHandle(handle, h, Handle.class);
+        return resolveFileAttributes(fileHandle.getFile(), flags, IoUtils.getLinkOptions(true));
+    }
+
+    @Override
+    protected void doWrite(int id, String handle, long offset, int length, byte[] data, int doff, int remaining) throws IOException {
+        Handle h = handles.get(handle);
+        if (log.isTraceEnabled()) {
+            log.trace("doWrite({})[id={}] SSH_FXP_WRITE (handle={}[{}], offset={}, data=byte[{}])",
+                      getServerSession(), id, handle, h, offset, length);
+        }
+
+        FileHandle fh = validateHandle(handle, h, FileHandle.class);
+        if (length < 0) {
+            throw new IllegalStateException("Bad length (" + length + ") for writing to " + fh);
+        }
+
+        if (remaining < length) {
+            throw new IllegalStateException("Not enough buffer data for writing to " + fh + ": required=" + length + ", available=" + remaining);
+        }
+
+        SftpEventListener listener = getSftpEventListenerProxy();
+        listener.writing(getServerSession(), handle, fh, offset, data, doff, length);
+        try {
+            if (fh.isOpenAppend()) {
+                fh.append(data, doff, length);
+            } else {
+                fh.write(data, doff, length, offset);
+            }
+        } catch (IOException | RuntimeException e) {
+            listener.written(getServerSession(), handle, fh, offset, data, doff, length, e);
+            throw e;
+        }
+        listener.written(getServerSession(), handle, fh, offset, data, doff, length, null);
+    }
+
+    @Override
+    protected int doRead(int id, String handle, long offset, int length, byte[] data, int doff) throws IOException {
+        Handle h = handles.get(handle);
+        if (log.isTraceEnabled()) {
+            log.trace("doRead({})[id={}] SSH_FXP_READ (handle={}[{}], offset={}, length={})",
+                      getServerSession(), id, handle, h, offset, length);
+        }
+
+        ValidateUtils.checkTrue(length > 0L, "Invalid read length: %d", length);
+        FileHandle fh = validateHandle(handle, h, FileHandle.class);
+        SftpEventListener listener = getSftpEventListenerProxy();
+        ServerSession serverSession = getServerSession();
+        int readLen;
+        listener.reading(serverSession, handle, fh, offset, data, doff, length);
+        try {
+            readLen = fh.read(data, doff, length, offset);
+        } catch (IOException | RuntimeException e) {
+            listener.read(serverSession, handle, fh, offset, data, doff, length, -1, e);
+            throw e;
+        }
+        listener.read(serverSession, handle, fh, offset, data, doff, length, readLen, null);
+        return readLen;
+    }
+
+    @Override
+    protected void doClose(int id, String handle) throws IOException {
+        Handle h = handles.remove(handle);
+        if (log.isDebugEnabled()) {
+            log.debug("doClose({})[id={}] SSH_FXP_CLOSE (handle={}[{}])",
+                      getServerSession(), id, handle, h);
+        }
+        validateHandle(handle, h, Handle.class).close();
+
+        SftpEventListener listener = getSftpEventListenerProxy();
+        listener.close(getServerSession(), handle, h);
+    }
+
+    @Override
+    protected String doOpen(int id, String path, int pflags, int access, Map<String, Object> attrs) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("doOpen({})[id={}] SSH_FXP_OPEN (path={}, access=0x{}, pflags=0x{}, attrs={})",
+                      getServerSession(), id, path, Integer.toHexString(access), Integer.toHexString(pflags), attrs);
+        }
+        int curHandleCount = handles.size();
+        int maxHandleCount = getServerSession().getIntProperty(MAX_OPEN_HANDLES_PER_SESSION, DEFAULT_MAX_OPEN_HANDLES);
+        if (curHandleCount > maxHandleCount) {
+            throw new IllegalStateException("Too many open handles: current=" + curHandleCount + ", max.=" + maxHandleCount);
+        }
+
+        Path file = resolveFile(path);
+        String handle = generateFileHandle(file);
+        FileHandle fileHandle = new FileHandle(this, file, handle, pflags, access, attrs);
+        handles.put(handle, fileHandle);
+        return handle;
+    }
+
+    // we stringify our handles and treat them as such on decoding as well as it is easier to use as a map key
+    protected String generateFileHandle(Path file) {
+        // use several rounds in case the file handle size is relatively small so we might get conflicts
+        for (int index = 0; index < maxFileHandleRounds; index++) {
+            randomizer.fill(workBuf, 0, fileHandleSize);
+            String handle = BufferUtils.toHex(workBuf, 0, fileHandleSize, BufferUtils.EMPTY_HEX_SEPARATOR);
+            if (handles.containsKey(handle)) {
+                if (log.isTraceEnabled()) {
+                    log.trace("generateFileHandle({})[{}] handle={} in use at round {}",
+                              getServerSession(), file, handle, index);
+                }
+                continue;
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("generateFileHandle({})[{}] {}", getServerSession(), file, handle);
+            }
+            return handle;
+        }
+
+        throw new IllegalStateException("Failed to generate a unique file handle for " + file);
+    }
+
+    protected void doInit(Buffer buffer, int id) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("doInit({})[id={}] SSH_FXP_INIT (version={})", getServerSession(), id, id);
+        }
+
+        String all = checkVersionCompatibility(buffer, id, id, SftpConstants.SSH_FX_OP_UNSUPPORTED);
+        if (GenericUtils.isEmpty(all)) { // i.e. validation failed
+            return;
+        }
+
+        version = id;
+        while (buffer.available() > 0) {
+            String name = buffer.getString();
+            byte[] data = buffer.getBytes();
+            extensions.put(name, data);
+        }
+
+        buffer.clear();
+
+        buffer.putByte((byte) SftpConstants.SSH_FXP_VERSION);
+        buffer.putInt(version);
+        appendExtensions(buffer, all);
+
+        SftpEventListener listener = getSftpEventListenerProxy();
+        listener.initialized(getServerSession(), version);
+
+        send(buffer);
+    }
+
+    @Override
+    protected void send(Buffer buffer) throws IOException {
+        int len = buffer.available();
+        BufferUtils.writeInt(out, len, workBuf, 0, workBuf.length);
+        out.write(buffer.array(), buffer.rpos(), len);
+        out.flush();
+    }
+
+    @Override
+    public void destroy() {
+        if (closed.getAndSet(true)) {
+            return; // ignore if already closed
+        }
+
+        ServerSession session = getServerSession();
+        boolean debugEnabled = log.isDebugEnabled();
+        if (debugEnabled) {
+            log.debug("destroy({}) - mark as closed", session);
+        }
+
+        try {
+            SftpEventListener listener = getSftpEventListenerProxy();
+            listener.destroying(session);
+        } catch (Exception e) {
+            log.warn("destroy({}) Failed ({}) to announce destruction event: {}",
+                session, e.getClass().getSimpleName(), e.getMessage());
+            if (debugEnabled) {
+                log.debug("destroy(" + session + ") destruction announcement failure details", e);
+            }
+        }
+
+        // if thread has not completed, cancel it
+        if ((pendingFuture != null) && (!pendingFuture.isDone())) {
+            boolean result = pendingFuture.cancel(true);
+            // TODO consider waiting some reasonable (?) amount of time for cancellation
+            if (debugEnabled) {
+                log.debug("destroy(" + session + ") - cancel pending future=" + result);
+            }
+        }
+
+        pendingFuture = null;
+
+        ExecutorService executors = getExecutorService();
+        if ((executors != null) && (!executors.isShutdown()) && isShutdownOnExit()) {
+            Collection<Runnable> runners = executors.shutdownNow();
+            if (debugEnabled) {
+                log.debug("destroy(" + session + ") - shutdown executor service - runners count=" + runners.size());
+            }
+        }
+        this.executorService = null;
+
+        try {
+            fileSystem.close();
+        } catch (UnsupportedOperationException e) {
+            if (debugEnabled) {
+                log.debug("destroy(" + session + ") closing the file system is not supported");
+            }
+        } catch (IOException e) {
+            if (debugEnabled) {
+                log.debug("destroy(" + session + ")"
+                        + " failed (" + e.getClass().getSimpleName() + ")"
+                        + " to close file system: " + e.getMessage(), e);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemEnvironment.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemEnvironment.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemEnvironment.java
new file mode 100644
index 0000000..493a450
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemEnvironment.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.nio.file.Path;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.server.session.ServerSessionHolder;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface SftpSubsystemEnvironment extends ServerSessionHolder {
+    /**
+     * Force the use of a given sftp version
+     */
+    String SFTP_VERSION = "sftp-version";
+
+    int LOWER_SFTP_IMPL = SftpConstants.SFTP_V3; // Working implementation from v3
+
+    int HIGHER_SFTP_IMPL = SftpConstants.SFTP_V6; //  .. up to and including
+
+    String ALL_SFTP_IMPL = IntStream.rangeClosed(LOWER_SFTP_IMPL, HIGHER_SFTP_IMPL)
+            .mapToObj(Integer::toString)
+            .collect(Collectors.joining(","));
+
+    /**
+     * @return The negotiated version
+     */
+    int getVersion();
+
+    /**
+     * @return The {@link SftpFileSystemAccessor} used to access effective
+     * server-side paths
+     */
+    SftpFileSystemAccessor getFileSystemAccessor();
+
+    /**
+     * @return The selected behavior in case some unsupported attributes are requested
+     */
+    UnsupportedAttributePolicy getUnsupportedAttributePolicy();
+
+    /**
+     * @return The default root directory used to resolve relative paths
+     * - a.k.a. the {@code chroot} location
+     */
+    Path getDefaultDirectory();
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemFactory.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemFactory.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemFactory.java
new file mode 100644
index 0000000..4e4aa77
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ObjectBuilder;
+import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.subsystem.SubsystemFactory;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class SftpSubsystemFactory
+        extends AbstractSftpEventListenerManager
+        implements SubsystemFactory, ExecutorServiceConfigurer, SftpEventListenerManager, SftpFileSystemAccessorManager {
+    public static final String NAME = SftpConstants.SFTP_SUBSYSTEM_NAME;
+    public static final UnsupportedAttributePolicy DEFAULT_POLICY = UnsupportedAttributePolicy.Warn;
+
+    public static class Builder extends AbstractSftpEventListenerManager implements ObjectBuilder<SftpSubsystemFactory> {
+        private ExecutorService executors;
+        private boolean shutdownExecutor;
+        private UnsupportedAttributePolicy policy = DEFAULT_POLICY;
+        private SftpFileSystemAccessor fileSystemAccessor = SftpFileSystemAccessor.DEFAULT;
+        private SftpErrorStatusDataHandler errorStatusDataHandler = SftpErrorStatusDataHandler.DEFAULT;
+
+        public Builder() {
+            super();
+        }
+
+        public Builder withExecutorService(ExecutorService service) {
+            executors = service;
+            return this;
+        }
+
+        public Builder withShutdownOnExit(boolean shutdown) {
+            shutdownExecutor = shutdown;
+            return this;
+        }
+
+        public Builder withUnsupportedAttributePolicy(UnsupportedAttributePolicy p) {
+            policy = Objects.requireNonNull(p, "No policy");
+            return this;
+        }
+
+        public Builder withFileSystemAccessor(SftpFileSystemAccessor accessor) {
+            fileSystemAccessor = Objects.requireNonNull(accessor, "No accessor");
+            return this;
+        }
+
+        public Builder withSftpErrorStatusDataHandler(SftpErrorStatusDataHandler handler) {
+            errorStatusDataHandler = Objects.requireNonNull(handler, "No error status handler");
+            return this;
+        }
+
+        @Override
+        public SftpSubsystemFactory build() {
+            SftpSubsystemFactory factory = new SftpSubsystemFactory();
+            factory.setExecutorService(executors);
+            factory.setShutdownOnExit(shutdownExecutor);
+            factory.setUnsupportedAttributePolicy(policy);
+            factory.setFileSystemAccessor(fileSystemAccessor);
+            factory.setErrorStatusDataHandler(errorStatusDataHandler);
+            GenericUtils.forEach(getRegisteredListeners(), factory::addSftpEventListener);
+            return factory;
+        }
+    }
+
+    private ExecutorService executors;
+    private boolean shutdownExecutor;
+    private UnsupportedAttributePolicy policy = DEFAULT_POLICY;
+    private SftpFileSystemAccessor fileSystemAccessor = SftpFileSystemAccessor.DEFAULT;
+    private SftpErrorStatusDataHandler errorStatusDataHandler = SftpErrorStatusDataHandler.DEFAULT;
+
+    public SftpSubsystemFactory() {
+        super();
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public ExecutorService getExecutorService() {
+        return executors;
+    }
+
+    /**
+     * @param service The {@link ExecutorService} to be used by the {@link SftpSubsystem}
+     * command when starting execution. If {@code null} then a single-threaded ad-hoc service is used.
+     */
+    @Override
+    public void setExecutorService(ExecutorService service) {
+        executors = service;
+    }
+
+    @Override
+    public boolean isShutdownOnExit() {
+        return shutdownExecutor;
+    }
+
+    /**
+     * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()}
+     * will be called when subsystem terminates - unless it is the ad-hoc service, which
+     *                       will be shutdown regardless
+     */
+    @Override
+    public void setShutdownOnExit(boolean shutdownOnExit) {
+        shutdownExecutor = shutdownOnExit;
+    }
+
+    public UnsupportedAttributePolicy getUnsupportedAttributePolicy() {
+        return policy;
+    }
+
+    /**
+     * @param p The {@link UnsupportedAttributePolicy} to use if failed to access
+     * some local file attributes - never {@code null}
+     */
+    public void setUnsupportedAttributePolicy(UnsupportedAttributePolicy p) {
+        policy = Objects.requireNonNull(p, "No policy");
+    }
+
+    @Override
+    public SftpFileSystemAccessor getFileSystemAccessor() {
+        return fileSystemAccessor;
+    }
+
+    @Override
+    public void setFileSystemAccessor(SftpFileSystemAccessor accessor) {
+        fileSystemAccessor = Objects.requireNonNull(accessor, "No accessor");
+    }
+
+    public SftpErrorStatusDataHandler getErrorStatusDataHandler() {
+        return errorStatusDataHandler;
+    }
+
+    public void setErrorStatusDataHandler(SftpErrorStatusDataHandler handler) {
+        errorStatusDataHandler = Objects.requireNonNull(handler, "No error status data handler provided");
+    }
+
+    @Override
+    public Command create() {
+        SftpSubsystem subsystem =
+            new SftpSubsystem(getExecutorService(), isShutdownOnExit(),
+                getUnsupportedAttributePolicy(), getFileSystemAccessor(),
+                getErrorStatusDataHandler());
+        GenericUtils.forEach(getRegisteredListeners(), subsystem::addSftpEventListener);
+        return subsystem;
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/TreeLockExecutor.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/TreeLockExecutor.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/TreeLockExecutor.java
new file mode 100644
index 0000000..b0ed061
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/TreeLockExecutor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.io.Closeable;
+import java.nio.file.Path;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+public class TreeLockExecutor implements Closeable {
+
+    private static final Runnable CLOSE = () -> { };
+
+    private final ExecutorService executor;
+    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    private final Future<?> future;
+    private final Function<String, Path> resolver;
+
+    public TreeLockExecutor(ExecutorService executor, Function<String, Path> resolver) {
+        this.executor = executor;
+        this.resolver = resolver;
+        this.future = executor.submit(this::run);
+    }
+
+    public void submit(Runnable work, String... paths) {
+        queue.add(work);
+    }
+
+    protected void run() {
+        while (true) {
+            try {
+                Runnable work = queue.take();
+                if (work == CLOSE) {
+                    break;
+                }
+                work.run();
+            } catch (Throwable t) {
+                // ignore
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        queue.clear();
+        queue.add(CLOSE);
+        try {
+            future.get(5, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            // Ignore
+        }
+        future.cancel(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnixDateFormat.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnixDateFormat.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnixDateFormat.java
new file mode 100644
index 0000000..3ce474a
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnixDateFormat.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.nio.file.attribute.FileTime;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public final class UnixDateFormat {
+
+    /**
+     * A {@link List} of <U>short</U> months names where Jan=0, Feb=1, etc.
+     */
+    public static final List<String> MONTHS =
+        Collections.unmodifiableList(Arrays.asList(
+            "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"
+        ));
+
+    /**
+     * Six months duration in msec.
+     */
+    public static final long SIX_MONTHS = 183L * 24L * 60L * 60L * 1000L;
+
+    private UnixDateFormat() {
+        throw new UnsupportedOperationException("No instance allowed");
+    }
+
+    /**
+     * Get unix style date string.
+     *
+     * @param time The {@link FileTime} to format - ignored if {@code null}
+     * @return The formatted date string
+     * @see #getUnixDate(long)
+     */
+    public static String getUnixDate(FileTime time) {
+        return getUnixDate((time != null) ? time.toMillis() : -1L);
+    }
+
+    public static String getUnixDate(long millis) {
+        if (millis < 0L) {
+            return "------------";
+        }
+
+        StringBuilder sb = new StringBuilder(16);
+        Calendar cal = new GregorianCalendar();
+        cal.setTimeInMillis(millis);
+
+        // month
+        sb.append(MONTHS.get(cal.get(Calendar.MONTH)));
+        sb.append(' ');
+
+        // day
+        int day = cal.get(Calendar.DATE);
+        if (day < 10) {
+            sb.append(' ');
+        }
+        sb.append(day);
+        sb.append(' ');
+
+        long nowTime = System.currentTimeMillis();
+        if (Math.abs(nowTime - millis) > SIX_MONTHS) {
+
+            // year
+            int year = cal.get(Calendar.YEAR);
+            sb.append(' ');
+            sb.append(year);
+        } else {
+            // hour
+            int hh = cal.get(Calendar.HOUR_OF_DAY);
+            if (hh < 10) {
+                sb.append('0');
+            }
+            sb.append(hh);
+            sb.append(':');
+
+            // minute
+            int mm = cal.get(Calendar.MINUTE);
+            if (mm < 10) {
+                sb.append('0');
+            }
+            sb.append(mm);
+        }
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnsupportedAttributePolicy.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnsupportedAttributePolicy.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnsupportedAttributePolicy.java
new file mode 100644
index 0000000..ca763e3
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnsupportedAttributePolicy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public enum UnsupportedAttributePolicy {
+    Ignore,
+    Warn,
+    ThrowException;
+
+    public static final Set<UnsupportedAttributePolicy> VALUES =
+            Collections.unmodifiableSet(EnumSet.allOf(UnsupportedAttributePolicy.class));
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/ClientTest.java
new file mode 100644
index 0000000..594c756
--- /dev/null
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.channel.ChannelExec;
+import org.apache.sshd.client.channel.ChannelShell;
+import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.SubsystemClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
+import org.apache.sshd.common.Factory;
+import org.apache.sshd.common.NamedResource;
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.Service;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.channel.ChannelListenerManager;
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.SessionListener;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.net.SshdSocketAddress;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.channel.ChannelSession;
+import org.apache.sshd.server.channel.ChannelSessionFactory;
+import org.apache.sshd.server.forward.DirectTcpipFactory;
+import org.apache.sshd.server.session.ServerConnectionServiceFactory;
+import org.apache.sshd.server.session.ServerUserAuthService;
+import org.apache.sshd.server.session.ServerUserAuthServiceFactory;
+import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.apache.sshd.util.test.EchoShell;
+import org.apache.sshd.util.test.EchoShellFactory;
+import org.apache.sshd.util.test.TestChannelListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ClientTest extends BaseTestSupport {
+    private SshServer sshd;
+    private SshClient client;
+    private int port;
+    private CountDownLatch authLatch;
+    private CountDownLatch channelLatch;
+
+    private final AtomicReference<ClientSession> clientSessionHolder = new AtomicReference<>(null);
+    @SuppressWarnings("synthetic-access")
+    private final SessionListener clientSessionListener = new SessionListener() {
+        @Override
+        public void sessionCreated(Session session) {
+            assertObjectInstanceOf("Non client session creation notification", ClientSession.class, session);
+            assertNull("Multiple creation notifications", clientSessionHolder.getAndSet((ClientSession) session));
+        }
+
+        @Override
+        public void sessionEvent(Session session, Event event) {
+            assertObjectInstanceOf("Non client session event notification: " + event, ClientSession.class, session);
+            assertSame("Mismatched client session event instance: " + event, clientSessionHolder.get(), session);
+        }
+
+        @Override
+        public void sessionException(Session session, Throwable t) {
+            assertObjectInstanceOf("Non client session exception notification", ClientSession.class, session);
+            assertNotNull("No session exception data", t);
+        }
+
+        @Override
+        public void sessionClosed(Session session) {
+            assertObjectInstanceOf("Non client session closure notification", ClientSession.class, session);
+            assertSame("Mismatched client session closure instance", clientSessionHolder.getAndSet(null), session);
+        }
+    };
+
+    public ClientTest() {
+        super();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        authLatch = new CountDownLatch(0);
+        channelLatch = new CountDownLatch(0);
+
+        sshd = setupTestServer();
+        sshd.setShellFactory(new TestEchoShellFactory());
+        sshd.setServiceFactories(Arrays.asList(
+                new ServerUserAuthServiceFactory() {
+                    @Override
+                    public Service create(Session session) throws IOException {
+                        return new ServerUserAuthService(session) {
+                            @SuppressWarnings("synthetic-access")
+                            @Override
+                            public void process(int cmd, Buffer buffer) throws Exception {
+                                authLatch.await();
+                                super.process(cmd, buffer);
+                            }
+                        };
+                    }
+                },
+                ServerConnectionServiceFactory.INSTANCE
+        ));
+        sshd.setChannelFactories(Arrays.asList(
+                new ChannelSessionFactory() {
+                    @Override
+                    public Channel create() {
+                        return new ChannelSession() {
+                            @SuppressWarnings("synthetic-access")
+                            @Override
+                            public OpenFuture open(int recipient, long rwsize, long rmpsize, Buffer buffer) {
+                                try {
+                                    channelLatch.await();
+                                } catch (InterruptedException e) {
+                                    throw new RuntimeSshException(e);
+                                }
+                                return super.open(recipient, rwsize, rmpsize, buffer);
+                            }
+
+                            @Override
+                            public String toString() {
+                                return "ChannelSession" + "[id=" + getId() + ", recipient=" + getRecipient() + "]";
+                            }
+                        };
+                    }
+                },
+                DirectTcpipFactory.INSTANCE));
+        sshd.start();
+        port = sshd.getPort();
+
+        client = setupTestClient();
+        clientSessionHolder.set(null);  // just making sure
+        client.addSessionListener(clientSessionListener);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (sshd != null) {
+            sshd.stop(true);
+        }
+        if (client != null) {
+            client.stop();
+        }
+        clientSessionHolder.set(null);  // just making sure
+    }
+
+    @Test
+    public void testSimpleClientListener() throws Exception {
+        AtomicReference<Channel> channelHolder = new AtomicReference<>(null);
+        client.addChannelListener(new ChannelListener() {
+            @Override
+            public void channelOpenSuccess(Channel channel) {
+                assertSame("Mismatched opened channel instances", channel, channelHolder.get());
+            }
+
+            @Override
+            public void channelOpenFailure(Channel channel, Throwable reason) {
+                assertSame("Mismatched failed open channel instances", channel, channelHolder.get());
+            }
+
+            @Override
+            public void channelInitialized(Channel channel) {
+                assertNull("Multiple channel initialization notifications", channelHolder.getAndSet(channel));
+            }
+
+            @Override
+            public void channelStateChanged(Channel channel, String hint) {
+                outputDebugMessage("channelStateChanged(%s): %s", channel, hint);
+            }
+
+            @Override
+            public void channelClosed(Channel channel, Throwable reason) {
+                assertSame("Mismatched closed channel instances", channel, channelHolder.getAndSet(null));
+            }
+        });
+        sshd.setSubsystemFactories(Collections.singletonList(new SftpSubsystemFactory()));
+
+        client.start();
+
+        try (ClientSession session = createTestClientSession()) {
+            testClientListener(channelHolder, ChannelShell.class, () -> {
+                try {
+                    return session.createShellChannel();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            testClientListener(channelHolder, ChannelExec.class, () -> {
+                try {
+                    return session.createExecChannel(getCurrentTestName());
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            testClientListener(channelHolder, SftpClient.class, () -> {
+                try {
+                    return SftpClientFactory.instance().createSftpClient(session);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } finally {
+            client.stop();
+        }
+    }
+
+    private <C extends Closeable> void testClientListener(AtomicReference<Channel> channelHolder, Class<C> channelType, Factory<? extends C> factory) throws Exception {
+        assertNull(channelType.getSimpleName() + ": Unexpected currently active channel", channelHolder.get());
+
+        try (C instance = factory.create()) {
+            Channel expectedChannel;
+            if (instance instanceof Channel) {
+                expectedChannel = (Channel) instance;
+            } else if (instance instanceof SubsystemClient) {
+                expectedChannel = ((SubsystemClient) instance).getClientChannel();
+            } else {
+                throw new UnsupportedOperationException("Unknown test instance type" + instance.getClass().getSimpleName());
+            }
+
+            Channel actualChannel = channelHolder.get();
+            assertSame("Mismatched listener " + channelType.getSimpleName() + " instances", expectedChannel, actualChannel);
+        }
+
+        assertNull(channelType.getSimpleName() + ": Active channel closure not signalled", channelHolder.get());
+    }
+
+    @Test
+    public void testCreateChannelByType() throws Exception {
+        client.start();
+
+        Collection<ClientChannel> channels = new LinkedList<>();
+        try (ClientSession session = createTestClientSession()) {
+            // required since we do not use an SFTP subsystem
+            PropertyResolverUtils.updateProperty(session, ChannelSubsystem.REQUEST_SUBSYSTEM_REPLY, false);
+            channels.add(session.createChannel(Channel.CHANNEL_SUBSYSTEM, SftpConstants.SFTP_SUBSYSTEM_NAME));
+            channels.add(session.createChannel(Channel.CHANNEL_EXEC, getCurrentTestName()));
+            channels.add(session.createChannel(Channel.CHANNEL_SHELL, getClass().getSimpleName()));
+
+            Set<Integer> ids = new HashSet<>(channels.size());
+            for (ClientChannel c : channels) {
+                int id = c.getId();
+                assertTrue("Channel ID repeated: " + id, ids.add(id));
+            }
+        } finally {
+            for (Closeable c : channels) {
+                try {
+                    c.close();
+                } catch (IOException e) {
+                    // ignored
+                }
+            }
+            client.stop();
+        }
+
+        assertNull("Session closure not signalled", clientSessionHolder.get());
+    }
+
+    /**
+     * Makes sure that the {@link ChannelListener}s added to the client, session
+     * and channel are <U>cumulative</U> - i.e., all of them invoked
+     * @throws Exception If failed
+     */
+    @Test
+    public void testChannelListenersPropagation() throws Exception {
+        Map<String, TestChannelListener> clientListeners = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        addChannelListener(clientListeners, client, new TestChannelListener(client.getClass().getSimpleName()));
+
+        // required since we do not use an SFTP subsystem
+        PropertyResolverUtils.updateProperty(client, ChannelSubsystem.REQUEST_SUBSYSTEM_REPLY, false);
+        client.start();
+        try (ClientSession session = createTestClientSession()) {
+            addChannelListener(clientListeners, session, new TestChannelListener(session.getClass().getSimpleName()));
+            assertListenerSizes("ClientSessionOpen", clientListeners, 0, 0);
+
+            try (ClientChannel channel = session.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME)) {
+                channel.open().verify(5L, TimeUnit.SECONDS);
+
+                TestChannelListener channelListener = new TestChannelListener(channel.getClass().getSimpleName());
+                // need to emulate them since we are adding the listener AFTER the channel is open
+                channelListener.channelInitialized(channel);
+                channelListener.channelOpenSuccess(channel);
+                channel.addChannelListener(channelListener);
+                assertListenerSizes("ClientChannelOpen", clientListeners, 1, 1);
+            }
+
+            assertListenerSizes("ClientChannelClose", clientListeners, 0, 1);
+        } finally {
+            client.stop();
+        }
+
+        assertListenerSizes("ClientStop", clientListeners, 0, 1);
+    }
+
+    private static void assertListenerSizes(String phase, Map<String, ? extends TestChannelListener> listeners, int activeSize, int openSize) {
+        assertListenerSizes(phase, listeners.values(), activeSize, openSize);
+    }
+
+    private static void assertListenerSizes(String phase, Collection<? extends TestChannelListener> listeners, int activeSize, int openSize) {
+        if (GenericUtils.isEmpty(listeners)) {
+            return;
+        }
+
+        for (TestChannelListener l : listeners) {
+            if (activeSize >= 0) {
+                assertEquals(phase + ": mismatched active channels size for " + l.getName() + " listener", activeSize, GenericUtils.size(l.getActiveChannels()));
+            }
+
+            if (openSize >= 0) {
+                assertEquals(phase + ": mismatched open channels size for " + l.getName() + " listener", openSize, GenericUtils.size(l.getOpenChannels()));
+            }
+
+            assertEquals(phase + ": unexpected failed channels size for " + l.getName() + " listener", 0, GenericUtils.size(l.getFailedChannels()));
+        }
+    }
+
+    private static <L extends ChannelListener & NamedResource> void addChannelListener(Map<String, L> listeners, ChannelListenerManager manager, L listener) {
+        String name = listener.getName();
+        assertNull("Duplicate listener named " + name, listeners.put(name, listener));
+        manager.addChannelListener(listener);
+    }
+
+    private ClientSession createTestClientSession() throws IOException {
+        ClientSession session = createTestClientSession(TEST_LOCALHOST);
+        try {
+            InetSocketAddress addr = SshdSocketAddress.toInetSocketAddress(session.getConnectAddress());
+            assertEquals("Mismatched connect host", TEST_LOCALHOST, addr.getHostString());
+
+            ClientSession returnValue = session;
+            session = null; // avoid 'finally' close
+            return returnValue;
+        } finally {
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
+
+    private ClientSession createTestClientSession(String host) throws IOException {
+        ClientSession session = client.connect(getCurrentTestName(), host, port).verify(7L, TimeUnit.SECONDS).getSession();
+        try {
+            assertNotNull("Client session creation not signalled", clientSessionHolder.get());
+            session.addPasswordIdentity(getCurrentTestName());
+            session.auth().verify(5L, TimeUnit.SECONDS);
+
+            InetSocketAddress addr = SshdSocketAddress.toInetSocketAddress(session.getConnectAddress());
+            assertNotNull("No reported connect address", addr);
+            assertEquals("Mismatched connect port", port, addr.getPort());
+
+            ClientSession returnValue = session;
+            session = null; // avoid 'finally' close
+            return returnValue;
+        } finally {
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
+
+    public static class TestEchoShellFactory extends EchoShellFactory {
+        @Override
+        public Command create() {
+            return new TestEchoShell();
+        }
+    }
+
+    public static class TestEchoShell extends EchoShell {
+        // CHECKSTYLE:OFF
+        public static CountDownLatch latch;
+        // CHECKSTYLE:ON
+
+        public TestEchoShell() {
+            super();
+        }
+
+        @Override
+        public void destroy() {
+            if (latch != null) {
+                latch.countDown();
+            }
+            super.destroy();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/test/java/org/apache/sshd/client/simple/BaseSimpleClientTestSupport.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/simple/BaseSimpleClientTestSupport.java b/sshd-sftp/src/test/java/org/apache/sshd/client/simple/BaseSimpleClientTestSupport.java
new file mode 100644
index 0000000..60b9403
--- /dev/null
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/simple/BaseSimpleClientTestSupport.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.client.simple;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public abstract class BaseSimpleClientTestSupport extends BaseTestSupport {
+    public static final long CONNECT_TIMEOUT = TimeUnit.SECONDS.toMillis(5L);
+    public static final long AUTH_TIMEOUT = TimeUnit.SECONDS.toMillis(7L);
+
+    protected SshServer sshd;
+    protected SshClient client;
+    protected int port;
+    protected SimpleClient simple;
+
+    protected BaseSimpleClientTestSupport() {
+        super();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        sshd = setupTestServer();
+        sshd.start();
+        port = sshd.getPort();
+        client = setupTestClient();
+
+        simple = SshClient.wrapAsSimpleClient(client);
+        simple.setConnectTimeout(CONNECT_TIMEOUT);
+        simple.setAuthenticationTimeout(AUTH_TIMEOUT);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (sshd != null) {
+            sshd.stop(true);
+        }
+        if (simple != null) {
+            simple.close();
+        }
+        if (client != null) {
+            client.stop();
+        }
+    }
+}