You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/07/01 12:57:49 UTC

[08/10] mina-sshd git commit: [SSHD-509] Use targeted derived NamedFactory(ies) for the various generic parameters

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/a6e2bf9e/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
new file mode 100644
index 0000000..6110900
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/DefaultSftpClient.java
@@ -0,0 +1,1179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.ACE4_APPEND_DATA;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.ACE4_READ_ATTRIBUTES;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.ACE4_READ_DATA;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.ACE4_WRITE_ATTRIBUTES;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.ACE4_WRITE_DATA;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SFTP_V3;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SFTP_V4;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SFTP_V5;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SFTP_V6;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_ACCESSTIME;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_ACMODTIME;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_ALL;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_CREATETIME;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_MODIFYTIME;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_OWNERGROUP;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_PERMISSIONS;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_SIZE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_SUBSECOND_TIMES;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_ATTR_UIDGID;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_TYPE_DIRECTORY;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_TYPE_REGULAR;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FILEXFER_TYPE_SYMLINK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_APPEND;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_CREAT;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_CREATE_NEW;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_CREATE_TRUNCATE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_EXCL;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_OPEN_EXISTING;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_OPEN_OR_CREATE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_READ;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_TRUNC;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_TRUNCATE_EXISTING;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXF_WRITE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_ATTRS;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_BLOCK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_CLOSE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_DATA;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_FSETSTAT;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_FSTAT;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_HANDLE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_INIT;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_LINK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_LSTAT;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_MKDIR;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_NAME;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_OPEN;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_OPENDIR;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_READ;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_READDIR;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_READLINK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_REALPATH;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_REMOVE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_RENAME;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_RENAME_ATOMIC;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_RENAME_OVERWRITE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_RMDIR;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_SETSTAT;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_STAT;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_STATUS;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_SYMLINK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_UNBLOCK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_VERSION;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FXP_WRITE;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FX_EOF;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FX_OK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.S_IFDIR;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.S_IFLNK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.S_IFREG;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.attribute.FileTime;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sshd.client.SftpException;
+import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.FactoryManagerUtils;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+import org.apache.sshd.common.util.io.NoCloseInputStream;
+import org.apache.sshd.common.util.io.NoCloseOutputStream;
+import org.apache.sshd.common.util.io.OutputStreamWithChannel;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class DefaultSftpClient extends AbstractSftpClient {
+    private final ClientSession clientSession;
+    private final ChannelSubsystem channel;
+    private final Map<Integer, Buffer> messages;
+    private final AtomicInteger cmdId = new AtomicInteger(100);
+    private final Buffer receiveBuffer = new ByteArrayBuffer();
+    private boolean closing;
+    private int version;
+    private final Map<String, byte[]> extensions = new HashMap<>();
+
+    public DefaultSftpClient(ClientSession clientSession) throws IOException {
+        this.clientSession = clientSession;
+        this.channel = clientSession.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME);
+        this.messages = new HashMap<>();
+        this.channel.setOut(new OutputStream() {
+            @Override
+            public void write(int b) throws IOException {
+                write(new byte[] { (byte) b }, 0, 1);
+            }
+            @Override
+            public void write(byte[] b, int off, int len) throws IOException {
+                data(b, off, len);
+            }
+        });
+        this.channel.setErr(new ByteArrayOutputStream(Byte.MAX_VALUE));
+        this.channel.open().verify(FactoryManagerUtils.getLongProperty(clientSession, SFTP_CHANNEL_OPEN_TIMEOUT, DEFAULT_CHANNEL_OPEN_TIMEOUT));
+        this.channel.onClose(new Runnable() {
+            @SuppressWarnings("synthetic-access")
+            @Override
+            public void run() {
+                synchronized (messages) {
+                    closing = true;
+                    messages.notifyAll();
+                }
+            }
+        });
+        init();
+    }
+
+    @Override
+    public int getVersion() {
+        return version;
+    }
+
+    @Override
+    public boolean isClosing() {
+        return closing;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.channel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (isOpen()) {
+            this.channel.close(false);
+        }
+    }
+
+    /**
+     * Receive binary data
+     */
+    protected int data(byte[] buf, int start, int len) throws IOException {
+        Buffer incoming = new ByteArrayBuffer(buf,  start, len);
+        // If we already have partial data, we need to append it to the buffer and use it
+        if (receiveBuffer.available() > 0) {
+            receiveBuffer.putBuffer(incoming);
+            incoming = receiveBuffer;
+        }
+        // Process commands
+        int rpos = incoming.rpos();
+        for (int count=0; receive(incoming); count++) {
+            if (log.isTraceEnabled()) {
+                log.trace("Processed " + count + " data messages");
+            }
+        }
+
+        int read = incoming.rpos() - rpos;
+        // Compact and add remaining data
+        receiveBuffer.compact();
+        if (receiveBuffer != incoming && incoming.available() > 0) {
+            receiveBuffer.putBuffer(incoming);
+        }
+        return read;
+    }
+
+    /**
+     * Read SFTP packets from buffer
+     */
+    protected boolean receive(Buffer incoming) throws IOException {
+        int rpos = incoming.rpos();
+        int wpos = incoming.wpos();
+        clientSession.resetIdleTimeout();
+        if ((wpos - rpos) > 4) {
+            int length = incoming.getInt();
+            if (length < 5) {
+                throw new IOException("Illegal sftp packet length: " + length);
+            }
+            if ((wpos - rpos) >= (length + 4)) {
+                incoming.rpos(rpos);
+                incoming.wpos(rpos + 4 + length);
+                process(incoming);
+                incoming.rpos(rpos + 4 + length);
+                incoming.wpos(wpos);
+                return true;
+            }
+        }
+        incoming.rpos(rpos);
+        return false;
+    }
+
+    /**
+     * Process an SFTP packet
+     */
+    protected void process(Buffer incoming) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(incoming.available());
+        buffer.putBuffer(incoming);
+        buffer.rpos(5);
+        int id = buffer.getInt();
+        buffer.rpos(0);
+        synchronized (messages) {
+            messages.put(Integer.valueOf(id), buffer);
+            messages.notifyAll();
+        }
+    }
+
+    protected int send(int cmd, Buffer buffer) throws IOException {
+        int id = cmdId.incrementAndGet();
+        
+        try(DataOutputStream dos = new DataOutputStream(new NoCloseOutputStream(channel.getInvertedIn()))) {
+            dos.writeInt(5 + buffer.available());
+            dos.writeByte(cmd);
+            dos.writeInt(id);
+            dos.write(buffer.array(), buffer.rpos(), buffer.available());
+            dos.flush();
+        }
+
+        return id;
+    }
+
+    protected Buffer receive(int id) throws IOException {
+        synchronized (messages) {
+            while (true) {
+                if (closing) {
+                    throw new SshException("Channel has been closed");
+                }
+                Buffer buffer = messages.remove(Integer.valueOf(id));
+                if (buffer != null) {
+                    return buffer;
+                }
+                try {
+                    messages.wait();
+                } catch (InterruptedException e) {
+                    throw (IOException) new InterruptedIOException("Interrupted while waiting for messages").initCause(e);
+                }
+            }
+        }
+    }
+
+    protected Buffer read() throws IOException {
+        try(DataInputStream dis=new DataInputStream(new NoCloseInputStream(channel.getInvertedOut()))) {
+            int length = dis.readInt();
+            if (length < 5) {
+                throw new IllegalArgumentException("Bad length: " + length);
+            }
+            Buffer buffer = new ByteArrayBuffer(length + 4);
+            buffer.putInt(length);
+            int nb = length;
+            while (nb > 0) {
+                int readLen = dis.read(buffer.array(), buffer.wpos(), nb);
+                if (readLen < 0) {
+                    throw new IllegalArgumentException("Premature EOF while read " + length + " bytes - remaining=" + nb);
+                }
+                buffer.wpos(buffer.wpos() + readLen);
+                nb -= readLen;
+            }
+
+            return buffer;
+        }
+    }
+
+    protected void init() throws IOException {
+        // Init packet
+        try(DataOutputStream dos = new DataOutputStream(new NoCloseOutputStream(channel.getInvertedIn()))) {
+            dos.writeInt(5);
+            dos.writeByte(SSH_FXP_INIT);
+            dos.writeInt(SFTP_V6);
+            dos.flush();
+        }
+
+        Buffer buffer;
+        synchronized (messages) {
+            while (messages.isEmpty()) {
+                try {
+                    messages.wait();
+                } catch (InterruptedException e) {
+                    throw (IOException) new InterruptedIOException("Interruppted init()").initCause(e);
+                }
+            }
+            buffer = messages.remove(messages.keySet().iterator().next());
+
+        }
+        int length = buffer.getInt();
+        int type = buffer.getByte();
+        int id = buffer.getInt();
+        if (type == SSH_FXP_VERSION) {
+            if (id < SFTP_V3) {
+                throw new SshException("Unsupported sftp version " + id);
+            }
+            version = id;
+            while (buffer.available() > 0) {
+                String name = buffer.getString();
+                byte[] data = buffer.getBytes();
+                extensions.put(name, data);
+            }
+        } else if (type == SSH_FXP_STATUS) {
+            int substatus = buffer.getInt();
+            String msg = buffer.getString();
+            String lang = buffer.getString();
+            if (log.isTraceEnabled()) {
+                log.trace("init(id={}) - status: {} [{}] {}", Integer.valueOf(id), Integer.valueOf(substatus), lang, msg);
+            }
+
+            throw new SftpException(substatus, msg);
+        } else {
+            throw new SshException("Unexpected SFTP packet received: type=" + type + ", id=" + id + ", length=" + length);
+        }
+    }
+
+    protected void checkStatus(Buffer buffer) throws IOException {
+        int length = buffer.getInt();
+        int type = buffer.getByte();
+        int id = buffer.getInt();
+        if (type == SSH_FXP_STATUS) {
+            int substatus = buffer.getInt();
+            String msg = buffer.getString();
+            String lang = buffer.getString();
+            if (log.isTraceEnabled()) {
+                log.trace("checkStatus(id={}) - status: {} [{}] {}", Integer.valueOf(id), Integer.valueOf(substatus), lang, msg);
+            }
+
+            if (substatus != SSH_FX_OK) {
+                throw new SftpException(substatus, msg);
+            }
+        } else {
+            throw new SshException("Unexpected SFTP packet received: type=" + type + ", id=" + id + ", length=" + length);
+        }
+    }
+
+    protected String checkHandle(Buffer buffer) throws IOException {
+        int length = buffer.getInt();
+        int type = buffer.getByte();
+        int id = buffer.getInt();
+        if (type == SSH_FXP_STATUS) {
+            int substatus = buffer.getInt();
+            String msg = buffer.getString();
+            String lang = buffer.getString();
+            if (log.isTraceEnabled()) {
+                log.trace("checkHandle(id={}) - status: {} [{}] {}", Integer.valueOf(id), Integer.valueOf(substatus), lang, msg);
+            }
+            throw new SftpException(substatus, msg);
+        } else if (type == SSH_FXP_HANDLE) {
+            String handle = ValidateUtils.checkNotNullAndNotEmpty(buffer.getString(), "Null/empty handle in buffer", GenericUtils.EMPTY_OBJECT_ARRAY);
+            return handle;
+        } else {
+            throw new SshException("Unexpected SFTP packet received: type=" + type + ", id=" + id + ", length=" + length);
+        }
+    }
+
+    protected Attributes checkAttributes(Buffer buffer) throws IOException {
+        int length = buffer.getInt();
+        int type = buffer.getByte();
+        int id = buffer.getInt();
+        if (type == SSH_FXP_STATUS) {
+            int substatus = buffer.getInt();
+            String msg = buffer.getString();
+            String lang = buffer.getString();
+            if (log.isTraceEnabled()) {
+                log.trace("checkAttributes(id={}) - status: {} [{}] {}", Integer.valueOf(id), Integer.valueOf(substatus), lang, msg);
+            }
+            throw new SftpException(substatus, msg);
+        } else if (type == SSH_FXP_ATTRS) {
+            return readAttributes(buffer);
+        } else {
+            throw new SshException("Unexpected SFTP packet received: type=" + type + ", id=" + id + ", length=" + length);
+        }
+    }
+
+    protected String checkOneName(Buffer buffer) throws IOException {
+        int length = buffer.getInt();
+        int type = buffer.getByte();
+        int id = buffer.getInt();
+        if (type == SSH_FXP_STATUS) {
+            int substatus = buffer.getInt();
+            String msg = buffer.getString();
+            String lang = buffer.getString();
+            if (log.isTraceEnabled()) {
+                log.trace("checkOneName(id={}) - status: {} [{}] {}", Integer.valueOf(id), Integer.valueOf(substatus), lang, msg);
+            }
+            throw new SftpException(substatus, msg);
+        } else if (type == SSH_FXP_NAME) {
+            int len = buffer.getInt();
+            if (len != 1) {
+                throw new SshException("SFTP error: received " + len + " names instead of 1");
+            }
+            String name = buffer.getString(), longName = null;
+            if (version == SFTP_V3) {
+                longName = buffer.getString();
+            }
+            Attributes attrs = readAttributes(buffer);
+            if (log.isTraceEnabled()) {
+                log.trace("checkOneName(id={}) ({})[{}]: {}", Integer.valueOf(id), name, longName, attrs);
+            }
+            return name;
+        } else {
+            throw new SshException("Unexpected SFTP packet received: type=" + type + ", id=" + id + ", length=" + length);
+        }
+    }
+
+    protected Attributes readAttributes(Buffer buffer) throws IOException {
+        Attributes attrs = new Attributes();
+        int flags = buffer.getInt();
+        if (version == SFTP_V3) {
+            if ((flags & SSH_FILEXFER_ATTR_SIZE) != 0) {
+                attrs.flags.add(Attribute.Size);
+                attrs.size = buffer.getLong();
+            }
+            if ((flags & SSH_FILEXFER_ATTR_UIDGID) != 0) {
+                attrs.flags.add(Attribute.UidGid);
+                attrs.uid = buffer.getInt();
+                attrs.gid = buffer.getInt();
+            }
+            if ((flags & SSH_FILEXFER_ATTR_PERMISSIONS) != 0) {
+                attrs.flags.add(Attribute.Perms);
+                attrs.perms = buffer.getInt();
+            }
+            if ((flags & SSH_FILEXFER_ATTR_ACMODTIME) != 0) {
+                attrs.flags.add(Attribute.AcModTime);
+                attrs.atime = buffer.getInt();
+                attrs.mtime = buffer.getInt();
+            }
+        } else if (version >= SFTP_V4) {
+            attrs.type = buffer.getByte();
+            if ((flags & SSH_FILEXFER_ATTR_SIZE) != 0) {
+                attrs.flags.add(Attribute.Size);
+                attrs.size = buffer.getLong();
+            }
+            if ((flags & SSH_FILEXFER_ATTR_OWNERGROUP) != 0) {
+                attrs.flags.add(Attribute.OwnerGroup);
+                attrs.owner = buffer.getString();
+                attrs.group = buffer.getString();
+            }
+            if ((flags & SSH_FILEXFER_ATTR_PERMISSIONS) != 0) {
+                attrs.flags.add(Attribute.Perms);
+                attrs.perms = buffer.getInt();
+            }
+            
+            // update the permissions according to the type
+            switch (attrs.type) {
+                case SSH_FILEXFER_TYPE_REGULAR:
+                    attrs.perms |= S_IFREG;
+                    break;
+                case SSH_FILEXFER_TYPE_DIRECTORY:
+                    attrs.perms |= S_IFDIR;
+                    break;
+                case SSH_FILEXFER_TYPE_SYMLINK:
+                    attrs.perms |= S_IFLNK;
+                    break;
+                default:    // do nothing
+            }
+
+            if ((flags & SSH_FILEXFER_ATTR_ACCESSTIME) != 0) {
+                attrs.flags.add(Attribute.AccessTime);
+                attrs.accessTime = readTime(buffer, flags);
+                attrs.atime = (int) attrs.accessTime.to(TimeUnit.SECONDS);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_CREATETIME) != 0) {
+                attrs.flags.add(Attribute.CreateTime);
+                attrs.createTime = readTime(buffer, flags);
+                attrs.ctime = (int) attrs.createTime.to(TimeUnit.SECONDS);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_MODIFYTIME) != 0) {
+                attrs.flags.add(Attribute.ModifyTime);
+                attrs.modifyTime = readTime(buffer, flags);
+                attrs.mtime = (int) attrs.modifyTime.to(TimeUnit.SECONDS);
+            }
+            // TODO: acl
+        } else {
+            throw new IllegalStateException("readAttributes - unsupported version: " + version);
+        }
+        return attrs;
+    }
+
+    private FileTime readTime(Buffer buffer, int flags) {
+        long secs = buffer.getLong();
+        long millis = secs * 1000;
+        if ((flags & SSH_FILEXFER_ATTR_SUBSECOND_TIMES) != 0) {
+            millis += buffer.getInt() / 1000000l;
+        }
+        return FileTime.from(millis, TimeUnit.MILLISECONDS);
+    }
+
+    protected void writeAttributes(Buffer buffer, Attributes attributes) throws IOException {
+        if (version == SFTP_V3) {
+            int flags = 0;
+            for (Attribute a : attributes.flags) {
+                switch (a) {
+                    case Size:
+                        flags |= SSH_FILEXFER_ATTR_SIZE;
+                        break;
+                    case UidGid:
+                        flags |= SSH_FILEXFER_ATTR_UIDGID;
+                        break;
+                    case Perms:
+                        flags |= SSH_FILEXFER_ATTR_PERMISSIONS;
+                        break;
+                    case AcModTime:
+                        flags |= SSH_FILEXFER_ATTR_ACMODTIME;
+                        break;
+                    default:    // do nothing
+                }
+            }
+            buffer.putInt(flags);
+            if ((flags & SSH_FILEXFER_ATTR_SIZE) != 0) {
+                buffer.putLong(attributes.size);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_UIDGID) != 0) {
+                buffer.putInt(attributes.uid);
+                buffer.putInt(attributes.gid);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_PERMISSIONS) != 0) {
+                buffer.putInt(attributes.perms);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_ACMODTIME) != 0) {
+                buffer.putInt(attributes.atime);
+                buffer.putInt(attributes.mtime);
+            }
+        } else if (version >= SFTP_V4) {
+            int flags = 0;
+            for (Attribute a : attributes.flags) {
+                switch (a) {
+                    case Size:
+                        flags |= SSH_FILEXFER_ATTR_SIZE;
+                        break;
+                    case OwnerGroup:
+                        flags |= SSH_FILEXFER_ATTR_OWNERGROUP;
+                        break;
+                    case Perms:
+                        flags |= SSH_FILEXFER_ATTR_PERMISSIONS;
+                        break;
+                    case AccessTime:
+                        flags |= SSH_FILEXFER_ATTR_ACCESSTIME;
+                        break;
+                    case ModifyTime:
+                        flags |= SSH_FILEXFER_ATTR_MODIFYTIME;
+                        break;
+                    case CreateTime:
+                        flags |= SSH_FILEXFER_ATTR_CREATETIME;
+                        break;
+                    default:    // do nothing
+                }
+            }
+            buffer.putInt(flags);
+            buffer.putByte(attributes.type);
+            if ((flags & SSH_FILEXFER_ATTR_SIZE) != 0) {
+                buffer.putLong(attributes.size);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_OWNERGROUP) != 0) {
+                buffer.putString(attributes.owner != null ? attributes.owner : "OWNER@", StandardCharsets.UTF_8);
+                buffer.putString(attributes.group != null ? attributes.group : "GROUP@", StandardCharsets.UTF_8);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_PERMISSIONS) != 0) {
+                buffer.putInt(attributes.perms);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_ACCESSTIME) != 0) {
+                buffer.putLong(attributes.accessTime.to(TimeUnit.SECONDS));
+                if ((flags & SSH_FILEXFER_ATTR_SUBSECOND_TIMES) != 0) {
+                    long nanos = attributes.accessTime.to(TimeUnit.NANOSECONDS);
+                    nanos = nanos % TimeUnit.SECONDS.toNanos(1);
+                    buffer.putInt((int) nanos);
+                }
+                buffer.putInt(attributes.atime);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_CREATETIME) != 0) {
+                buffer.putLong(attributes.createTime.to(TimeUnit.SECONDS));
+                if ((flags & SSH_FILEXFER_ATTR_SUBSECOND_TIMES) != 0) {
+                    long nanos = attributes.createTime.to(TimeUnit.NANOSECONDS);
+                    nanos = nanos % TimeUnit.SECONDS.toNanos(1);
+                    buffer.putInt((int) nanos);
+                }
+                buffer.putInt(attributes.atime);
+            }
+            if ((flags & SSH_FILEXFER_ATTR_MODIFYTIME) != 0) {
+                buffer.putLong(attributes.modifyTime.to(TimeUnit.SECONDS));
+                if ((flags & SSH_FILEXFER_ATTR_SUBSECOND_TIMES) != 0) {
+                    long nanos = attributes.modifyTime.to(TimeUnit.NANOSECONDS);
+                    nanos = nanos % TimeUnit.SECONDS.toNanos(1);
+                    buffer.putInt((int) nanos);
+                }
+                buffer.putInt(attributes.atime);
+            }
+            // TODO: acl
+        } else {
+            throw new UnsupportedOperationException("writeAttributes(" + attributes + ") unsupported version: " + version);
+        }
+    }
+
+    @Override
+    public CloseableHandle open(String path, Collection<OpenMode> options) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(path.length() + Long.SIZE /* some extra fields */);
+        buffer.putString(path);
+        if (version == SFTP_V3) {
+            int mode = 0;
+            for (OpenMode m : options) {
+                switch (m) {
+                    case Read:
+                        mode |= SSH_FXF_READ;
+                        break;
+                    case Write:
+                        mode |= SSH_FXF_WRITE;
+                        break;
+                    case Append:
+                        mode |= SSH_FXF_APPEND;
+                        break;
+                    case Create:
+                        mode |= SSH_FXF_CREAT;
+                        break;
+                    case Truncate:
+                        mode |= SSH_FXF_TRUNC;
+                        break;
+                    case Exclusive:
+                        mode |= SSH_FXF_EXCL;
+                        break;
+                    default:    // do nothing
+                }
+            }
+            buffer.putInt(mode);
+        } else {
+            int mode = 0;
+            int access = 0;
+            if (options.contains(OpenMode.Read)) {
+                access |= ACE4_READ_DATA | ACE4_READ_ATTRIBUTES;
+            }
+            if (options.contains(OpenMode.Write)) {
+                access |= ACE4_WRITE_DATA | ACE4_WRITE_ATTRIBUTES;
+            }
+            if (options.contains(OpenMode.Append)) {
+                access |= ACE4_APPEND_DATA;
+            }
+            if (options.contains(OpenMode.Create) && options.contains(OpenMode.Exclusive)) {
+                mode |= SSH_FXF_CREATE_NEW;
+            } else if (options.contains(OpenMode.Create) && options.contains(OpenMode.Truncate)) {
+                mode |= SSH_FXF_CREATE_TRUNCATE;
+            } else if (options.contains(OpenMode.Create)) {
+                mode |= SSH_FXF_OPEN_OR_CREATE;
+            } else if (options.contains(OpenMode.Truncate)) {
+                mode |= SSH_FXF_TRUNCATE_EXISTING;
+            } else {
+                mode |= SSH_FXF_OPEN_EXISTING;
+            }
+            if (version >= SFTP_V5) {
+                buffer.putInt(access);
+            }
+            buffer.putInt(mode);
+        }
+        writeAttributes(buffer, new Attributes());
+        return new DefaultCloseableHandle(this, checkHandle(receive(send(SSH_FXP_OPEN, buffer))));
+    }
+
+    @Override
+    public void close(Handle handle) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(handle.id.length() + Long.SIZE /* some extra fields */);
+        buffer.putString(handle.id);
+        checkStatus(receive(send(SSH_FXP_CLOSE, buffer)));
+    }
+
+    @Override
+    public void remove(String path) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(path.length() + Long.SIZE /* some extra fields */);
+        buffer.putString(path);
+        checkStatus(receive(send(SSH_FXP_REMOVE, buffer)));
+    }
+
+    @Override
+    public void rename(String oldPath, String newPath, Collection<CopyMode> options) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(oldPath.length() + newPath.length() + Long.SIZE /* some extra fields */);
+        buffer.putString(oldPath);
+        buffer.putString(newPath);
+        
+        int numOptions = GenericUtils.size(options);
+        if (version >= SFTP_V5) {
+            int opts = 0;
+            if (numOptions > 0) {
+                for (CopyMode opt : options) {
+                    switch (opt) {
+                        case Atomic:
+                            opts |= SSH_FXP_RENAME_ATOMIC;
+                            break;
+                        case Overwrite:
+                            opts |= SSH_FXP_RENAME_OVERWRITE;
+                            break;
+                        default:    // do nothing
+                    }
+                }
+            }
+            buffer.putInt(opts);
+        } else if (numOptions > 0) {
+            throw new UnsupportedOperationException("rename(" + oldPath + " => " + newPath + ")"
+                                                  + " - copy options can not be used with this SFTP version: " + options);
+        }
+        checkStatus(receive(send(SSH_FXP_RENAME, buffer)));
+    }
+
+    @Override
+    public int read(Handle handle, long fileOffset, byte[] dst, int dstoff, int len) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(handle.id.length() + Long.SIZE /* some extra fields */);
+        buffer.putString(handle.id);
+        buffer.putLong(fileOffset);
+        buffer.putInt(len);
+        return checkData(receive(send(SSH_FXP_READ, buffer)), dstoff, dst);
+    }
+
+    protected int checkData(Buffer buffer, int dstoff, byte[] dst) throws IOException {
+        int length = buffer.getInt();
+        int type = buffer.getByte();
+        int id = buffer.getInt();
+        if (type == SSH_FXP_STATUS) {
+            int substatus = buffer.getInt();
+            String msg = buffer.getString();
+            String lang = buffer.getString();
+            if (log.isTraceEnabled()) {
+                log.trace("checkData(id={}) - status: {} [{}] {}", Integer.valueOf(id), Integer.valueOf(substatus), lang, msg);
+            }
+
+            if (substatus == SSH_FX_EOF) {
+                return -1;
+            }
+
+            throw new SftpException(substatus, msg);
+        } else if (type == SSH_FXP_DATA) {
+            int len = buffer.getInt();
+            buffer.getRawBytes(dst, dstoff, len);
+            return len;
+        } else {
+            throw new SshException("Unexpected SFTP packet received: type=" + type + ", id=" + id + ", length=" + length);
+        }
+    }
+
+    @Override
+    public void write(Handle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException {
+        // do some bounds checking first
+        if ((fileOffset < 0) || (srcoff < 0) || (len < 0)) {
+            throw new IllegalArgumentException("write(" + handle + ") please ensure all parameters "
+                                             + " are non-negative values: file-offset=" + fileOffset
+                                             + ", src-offset=" + srcoff + ", len=" + len);
+        }
+        if ((srcoff + len) > src.length) {
+            throw new IllegalArgumentException("write(" + handle + ")"
+                                             + " cannot read bytes " + srcoff + " to " + (srcoff + len)
+                                             + " when array is only of length " + src.length);
+        }
+
+        Buffer buffer = new ByteArrayBuffer(handle.id.length() + len + Long.SIZE /* some extra fields */);
+        buffer.putString(handle.id);
+        buffer.putLong(fileOffset);
+        buffer.putBytes(src, srcoff, len);
+        checkStatus(receive(send(SSH_FXP_WRITE, buffer)));
+    }
+
+    @Override
+    public void mkdir(String path) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(path.length() +  Long.SIZE /* some extra fields */);
+        buffer.putString(path, StandardCharsets.UTF_8);
+        buffer.putInt(0);
+        if (version != SFTP_V3) {
+            buffer.putByte((byte) 0);
+        }
+        checkStatus(receive(send(SSH_FXP_MKDIR, buffer)));
+    }
+
+    @Override
+    public void rmdir(String path) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(path.length() +  Long.SIZE /* some extra fields */);
+        buffer.putString(path);
+        checkStatus(receive(send(SSH_FXP_RMDIR, buffer)));
+    }
+
+    @Override
+    public CloseableHandle openDir(String path) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(path.length() + Long.SIZE /* some extra fields */);
+        buffer.putString(path);
+        return new DefaultCloseableHandle(this, checkHandle(receive(send(SSH_FXP_OPENDIR, buffer))));
+    }
+
+    @Override
+    public DirEntry[] readDir(Handle handle) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(handle.id.length() + Long.SIZE /* some extra fields */);
+        buffer.putString(handle.id);
+        return checkDir(receive(send(SSH_FXP_READDIR, buffer)));
+    }
+
+    protected DirEntry[] checkDir(Buffer buffer) throws IOException {
+        int length = buffer.getInt();
+        int type = buffer.getByte();
+        int id = buffer.getInt();
+        if (type == SSH_FXP_STATUS) {
+            int substatus = buffer.getInt();
+            String msg = buffer.getString();
+            String lang = buffer.getString();
+            if (log.isTraceEnabled()) {
+                log.trace("checkDir(id={}) - status: {} [{}] {}", Integer.valueOf(id), Integer.valueOf(substatus), lang, msg);
+            }
+            if (substatus == SSH_FX_EOF) {
+                return null;
+            }
+            throw new SftpException(substatus, msg);
+        } else if (type == SSH_FXP_NAME) {
+            int len = buffer.getInt();
+            DirEntry[] entries = new DirEntry[len];
+            for (int i = 0; i < len; i++) {
+                String name = buffer.getString();
+                String longName = (version == SFTP_V3) ? buffer.getString() : null;
+                Attributes attrs = readAttributes(buffer);
+                if (log.isTraceEnabled()) {
+                    log.trace("checkDir(id={})[{}] ({})[{}]: {}", Integer.valueOf(id), Integer.valueOf(i), name, longName, attrs);
+                }
+
+                entries[i] = new DirEntry(name, longName, attrs);
+            }
+            return entries;
+        } else {
+            throw new SshException("Unexpected SFTP packet received: type=" + type + ", id=" + id + ", length=" + length);
+        }
+    }
+
+    @Override
+    public String canonicalPath(String path) throws IOException {
+        Buffer buffer = new ByteArrayBuffer();
+        buffer.putString(path);
+        return checkOneName(receive(send(SSH_FXP_REALPATH, buffer)));
+    }
+
+    @Override
+    public Attributes stat(String path) throws IOException {
+        Buffer buffer = new ByteArrayBuffer();
+        buffer.putString(path);
+        if (version >= SFTP_V4) {
+            buffer.putInt(SSH_FILEXFER_ATTR_ALL);
+        }
+        return checkAttributes(receive(send(SSH_FXP_STAT, buffer)));
+    }
+
+    @Override
+    public Attributes lstat(String path) throws IOException {
+        Buffer buffer = new ByteArrayBuffer();
+        buffer.putString(path);
+        if (version >= SFTP_V4) {
+            buffer.putInt(SSH_FILEXFER_ATTR_ALL);
+        }
+        return checkAttributes(receive(send(SSH_FXP_LSTAT, buffer)));
+    }
+
+    @Override
+    public Attributes stat(Handle handle) throws IOException {
+        Buffer buffer = new ByteArrayBuffer();
+        buffer.putString(handle.id);
+        if (version >= SFTP_V4) {
+            buffer.putInt(SSH_FILEXFER_ATTR_ALL);
+        }
+        return checkAttributes(receive(send(SSH_FXP_FSTAT, buffer)));
+    }
+
+    @Override
+    public void setStat(String path, Attributes attributes) throws IOException {
+        Buffer buffer = new ByteArrayBuffer();
+        buffer.putString(path);
+        writeAttributes(buffer, attributes);
+        checkStatus(receive(send(SSH_FXP_SETSTAT, buffer)));
+    }
+
+    @Override
+    public void setStat(Handle handle, Attributes attributes) throws IOException {
+        Buffer buffer = new ByteArrayBuffer();
+        buffer.putString(handle.id);
+        writeAttributes(buffer, attributes);
+        checkStatus(receive(send(SSH_FXP_FSETSTAT, buffer)));
+    }
+
+    @Override
+    public String readLink(String path) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(path.length() + Long.SIZE /* some extra fields */);
+        buffer.putString(path);
+        return checkOneName(receive(send(SSH_FXP_READLINK, buffer)));
+    }
+
+    @Override
+    public void link(String linkPath, String targetPath, boolean symbolic) throws IOException {
+        Buffer buffer = new ByteArrayBuffer(linkPath.length() + targetPath.length() + Long.SIZE /* some extra fields */);
+        if (version < SFTP_V6) {
+            if (!symbolic) {
+                throw new UnsupportedOperationException("Hard links are not supported in sftp v" + version);
+            }
+            buffer.putString(targetPath);
+            buffer.putString(linkPath);
+            checkStatus(receive(send(SSH_FXP_SYMLINK, buffer)));
+        } else {
+            buffer.putString(targetPath);
+            buffer.putString(linkPath);
+            buffer.putBoolean(symbolic);
+            checkStatus(receive(send(SSH_FXP_LINK, buffer)));
+        }
+    }
+
+    @Override
+    public void lock(Handle handle, long offset, long length, int mask) throws IOException {
+        Buffer buffer = new ByteArrayBuffer();
+        buffer.putString(handle.id);
+        buffer.putLong(offset);
+        buffer.putLong(length);
+        buffer.putInt(mask);
+        checkStatus(receive(send(SSH_FXP_BLOCK, buffer)));
+    }
+
+    @Override
+    public void unlock(Handle handle, long offset, long length) throws IOException {
+        Buffer buffer = new ByteArrayBuffer();
+        buffer.putString(handle.id);
+        buffer.putLong(offset);
+        buffer.putLong(length);
+        checkStatus(receive(send(SSH_FXP_UNBLOCK, buffer)));
+    }
+
+    @Override
+    public Iterable<DirEntry> readDir(final String path) throws IOException {
+        return new Iterable<DirEntry>() {
+            @Override
+            public Iterator<DirEntry> iterator() {
+                return new Iterator<DirEntry>() {
+                    private CloseableHandle handle;
+                    private DirEntry[] entries;
+                    private int index;
+
+                    {
+                        open();
+                        load();
+                    }
+
+                    @Override
+                    public boolean hasNext() {
+                        return (entries != null) && (index < entries.length);
+                    }
+
+                    @Override
+                    public DirEntry next() {
+                        DirEntry entry = entries[index++];
+                        if (index >= entries.length) {
+                            load();
+                        }
+                        return entry;
+                    }
+
+                    @SuppressWarnings("synthetic-access")
+                    private void open() {
+                        try {
+                            handle = openDir(path);
+                            if (log.isDebugEnabled()) {
+                                log.debug("readDir(" + path + ") handle=" + handle);
+                            }
+                        } catch (IOException e) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("readDir(" + path + ") failed (" + e.getClass().getSimpleName() + ") to open dir: " + e.getMessage());
+                            }
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    @SuppressWarnings("synthetic-access")
+                    private void load() {
+                        try {
+                            entries = readDir(handle);
+                            index = 0;
+                            if (entries == null) {
+                                handle.close();
+                            }
+                        } catch (IOException e) {
+                            entries = null;
+                            try {
+                                handle.close();
+                            } catch (IOException t) {
+                                if (log.isTraceEnabled()) {
+                                    log.trace(t.getClass().getSimpleName() + " while close handle=" + handle
+                                            + " due to " + e.getClass().getSimpleName() + " [" + e.getMessage() + "]"
+                                            + ": " + t.getMessage());
+                                }
+                            }
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException("readDir(" + path + ") Iterator#remove() N/A");
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public InputStream read(final String path, final int bufferSize, final Collection<OpenMode> mode) throws IOException {
+        if (bufferSize < MIN_READ_BUFFER_SIZE) {
+            throw new IllegalArgumentException("Insufficient read buffer size: " + bufferSize + ", min.=" + MIN_READ_BUFFER_SIZE);
+        }
+
+        return new InputStreamWithChannel() {
+            private byte[] bb = new byte[1];
+            private byte[] buffer = new byte[bufferSize];
+            private int index;
+            private int available;
+            private CloseableHandle handle = DefaultSftpClient.this.open(path, mode);
+            private long offset;
+
+            @Override
+            public boolean isOpen() {
+                return (handle != null) && handle.isOpen();
+            }
+
+            @Override
+            public int read() throws IOException {
+                int read = read(bb, 0, 1);
+                if (read > 0) {
+                    return bb[0];
+                }
+
+                return read;
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                if (!isOpen()) {
+                    throw new IOException("read(" + path + ") stream closed");
+                }
+
+                int idx = off;
+                while (len > 0) {
+                    if (index >= available) {
+                        available = DefaultSftpClient.this.read(handle, offset, buffer, 0, buffer.length);
+                        if (available < 0) {
+                            if (idx == off) {
+                                return -1;
+                            } else {
+                                break;
+                            }
+                        }
+                        offset += available;
+                        index = 0;
+                    }
+                    if (index >= available) {
+                        break;
+                    }
+                    int nb = Math.min(len, available - index);
+                    System.arraycopy(buffer, index, b, idx, nb);
+                    index += nb;
+                    idx += nb;
+                    len -= nb;
+                }
+                return idx - off;
+            }
+
+            @Override
+            public void close() throws IOException {
+                if (isOpen()) {
+                    try {
+                        handle.close();
+                    } finally {
+                        handle = null;
+                    }
+                }
+            }
+        };
+    }
+
+    @Override
+    public OutputStream write(final String path, final int bufferSize, final Collection<OpenMode> mode) throws IOException {
+        if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
+            throw new IllegalArgumentException("Insufficient write buffer size: " + bufferSize + ", min.=" + MIN_WRITE_BUFFER_SIZE);
+        }
+
+        return new OutputStreamWithChannel() {
+            private byte[] bb = new byte[1];
+            private byte[] buffer = new byte[bufferSize];
+            private int index;
+            private CloseableHandle handle = DefaultSftpClient.this.open(path, mode);
+            private long offset;
+
+            @Override
+            public boolean isOpen() {
+                return (handle != null) && handle.isOpen();
+            }
+
+            @Override
+            public void write(int b) throws IOException {
+                bb[0] = (byte) b;
+                write(bb, 0, 1);
+            }
+
+            @Override
+            public void write(byte[] b, int off, int len) throws IOException {
+                if (!isOpen()) {
+                    throw new IOException("write(" + path + ")[len=" + len + "] stream is closed");
+                }
+
+                do {
+                    int nb = Math.min(len, buffer.length - index);
+                    System.arraycopy(b, off, buffer, index, nb);
+                    index += nb;
+                    if (index == buffer.length) {
+                        flush();
+                    }
+                    off += nb;
+                    len -= nb;
+                } while (len > 0);
+            }
+
+            @Override
+            public void flush() throws IOException {
+                if (!isOpen()) {
+                    throw new IOException("flush(" + path + ") stream is closed");
+                }
+
+                DefaultSftpClient.this.write(handle, offset, buffer, 0, index);
+                offset += index;
+                index = 0;
+            }
+
+            @Override
+            public void close() throws IOException {
+                if (isOpen()) {
+                    try {
+                        try {
+                            if (index > 0) {
+                                flush();
+                            }
+                        } finally {
+                            handle.close();
+                        }
+                    } finally {
+                        handle = null;
+                    }
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/a6e2bf9e/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
new file mode 100644
index 0000000..2af762c
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpClient.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.S_IFDIR;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.S_IFLNK;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.S_IFMT;
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.S_IFREG;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channel;
+import java.nio.file.attribute.FileTime;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.client.subsystem.SubsystemClient;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public interface SftpClient extends SubsystemClient {
+
+    enum OpenMode {
+        Read,
+        Write,
+        Append,
+        Create,
+        Truncate,
+        Exclusive
+    }
+
+    enum CopyMode {
+        Atomic,
+        Overwrite
+    }
+
+    enum Attribute {
+        Size,
+        UidGid,
+        Perms,
+        AcModTime,
+        OwnerGroup,
+        AccessTime,
+        ModifyTime,
+        CreateTime,
+    }
+
+    public static class Handle {
+        public final String id;
+        public Handle(String id) {
+            this.id = ValidateUtils.checkNotNullAndNotEmpty(id, "No handle ID", GenericUtils.EMPTY_OBJECT_ARRAY);
+        }
+        
+        @Override
+        public String toString() {
+            return id;
+        }
+    }
+
+    public static abstract class CloseableHandle extends Handle implements Channel, Closeable {
+        protected CloseableHandle(String id) {
+            super(id);
+        }
+    }
+
+    public static class Attributes {
+        public final Set<Attribute> flags = EnumSet.noneOf(Attribute.class);
+        public long size;
+        public byte type;
+        public int uid;
+        public int gid;
+        public int perms;
+        public int atime;
+        public int ctime;
+        public int mtime;
+        public String owner;
+        public String group;
+        public FileTime accessTime;
+        public FileTime createTime;
+        public FileTime modifyTime;
+
+        @Override
+        public String toString() {
+            return "type=" + type
+                 + ";size=" + size
+                 + ";uid=" + uid
+                 + ";gid=" + gid
+                 + ";perms=0x" + Integer.toHexString(perms)
+                 + ";flags=" + flags
+                 + ";owner=" + owner
+                 + ";group=" + group
+                 + ";aTime=(" + atime + ")[" + accessTime + "]"
+                 + ";cTime=(" + ctime + ")[" + createTime + "]"
+                 + ";mTime=(" + mtime + ")[" + modifyTime + "]"
+                 ;
+        }
+
+        public Attributes size(long size) {
+            flags.add(Attribute.Size);
+            this.size = size;
+            return this;
+        }
+        public Attributes owner(String owner) {
+            flags.add(Attribute.OwnerGroup);
+            this.owner = owner;
+            if (GenericUtils.isEmpty(group)) {
+                group = "GROUP@";
+            }
+            return this;
+        }
+        public Attributes group(String group) {
+            flags.add(Attribute.OwnerGroup);
+            this.group = group;
+            if (GenericUtils.isEmpty(owner)) {
+                owner = "OWNER@";
+            }
+            return this;
+        }
+        public Attributes owner(int uid, int gid) {
+            flags.add(Attribute.UidGid);
+            this.uid = uid;
+            this.gid = gid;
+            return this;
+        }
+        public Attributes perms(int perms) {
+            flags.add(Attribute.Perms);
+            this.perms = perms;
+            return this;
+        }
+        public Attributes atime(int atime) {
+            flags.add(Attribute.AccessTime);
+            this.atime = atime;
+            this.accessTime = FileTime.from(atime, TimeUnit.SECONDS);
+            return this;
+        }
+        public Attributes ctime(int ctime) {
+            flags.add(Attribute.CreateTime);
+            this.ctime = ctime;
+            this.createTime = FileTime.from(atime, TimeUnit.SECONDS);
+            return this;
+        }
+        public Attributes mtime(int mtime) {
+            flags.add(Attribute.ModifyTime);
+            this.mtime = mtime;
+            this.modifyTime = FileTime.from(atime, TimeUnit.SECONDS);
+            return this;
+        }
+        public Attributes time(int atime, int mtime) {
+            flags.add(Attribute.AcModTime);
+            this.atime = atime;
+            this.mtime = mtime;
+            return this;
+        }
+        public Attributes accessTime(FileTime atime) {
+            flags.add(Attribute.AccessTime);
+            this.atime = (int) atime.to(TimeUnit.SECONDS);
+            this.accessTime = atime;
+            return this;
+        }
+        public Attributes createTime(FileTime ctime) {
+            flags.add(Attribute.CreateTime);
+            this.ctime = (int) ctime.to(TimeUnit.SECONDS);
+            this.createTime = ctime;
+            return this;
+        }
+        public Attributes modifyTime(FileTime mtime) {
+            flags.add(Attribute.ModifyTime);
+            this.mtime = (int) mtime.to(TimeUnit.SECONDS);
+            this.modifyTime = mtime;
+            return this;
+        }
+        public boolean isRegularFile() {
+            return (perms & S_IFMT) == S_IFREG;
+        }
+        public boolean isDirectory() {
+            return (perms & S_IFMT) == S_IFDIR;
+        }
+        public boolean isSymbolicLink() {
+            return (perms & S_IFMT) == S_IFLNK;
+        }
+        public boolean isOther() {
+            return !isRegularFile() && !isDirectory() && !isSymbolicLink();
+        }
+    }
+
+    public static class DirEntry {
+        public String filename;
+        public String longFilename;
+        public Attributes attributes;
+        public DirEntry(String filename, String longFilename, Attributes attributes) {
+            this.filename = filename;
+            this.longFilename = longFilename;
+            this.attributes = attributes;
+        }
+    }
+
+    int getVersion();
+
+    boolean isClosing();
+
+    //
+    // Low level API
+    //
+
+    CloseableHandle open(String path) throws IOException;
+    CloseableHandle open(String path, OpenMode ... options) throws IOException;
+    CloseableHandle open(String path, Collection<OpenMode> options) throws IOException;
+
+    void close(Handle handle) throws IOException;
+
+    void remove(String path) throws IOException;
+
+    void rename(String oldPath, String newPath) throws IOException;
+    void rename(String oldPath, String newPath, CopyMode... options) throws IOException;
+    void rename(String oldPath, String newPath, Collection<CopyMode> options) throws IOException;
+
+    int read(Handle handle, long fileOffset, byte[] dst) throws IOException;
+    int read(Handle handle, long fileOffset, byte[] dst, int dstOffset, int len) throws IOException;
+
+    void write(Handle handle, long fileOffset, byte[] src) throws IOException;
+    void write(Handle handle, long fileOffset, byte[] src, int srcOffset, int len) throws IOException;
+
+    void mkdir(String path) throws IOException;
+
+    void rmdir(String path) throws IOException;
+
+    CloseableHandle openDir(String path) throws IOException;
+
+    DirEntry[] readDir(Handle handle) throws IOException;
+
+    String canonicalPath(String path) throws IOException;
+
+    Attributes stat(String path) throws IOException;
+
+    Attributes lstat(String path) throws IOException;
+
+    Attributes stat(Handle handle) throws IOException;
+
+    void setStat(String path, Attributes attributes) throws IOException;
+
+    void setStat(Handle handle, Attributes attributes) throws IOException;
+
+    String readLink(String path) throws IOException;
+
+    void symLink(String linkPath, String targetPath) throws IOException;
+
+    void link(String linkPath, String targetPath, boolean symbolic) throws IOException;
+
+    void lock(Handle handle, long offset, long length, int mask) throws IOException;
+
+    void unlock(Handle handle, long offset, long length) throws IOException;
+
+    //
+    // High level API
+    //
+
+    Iterable<DirEntry> readDir(String path) throws IOException;
+
+    // default values used if none specified
+    int MIN_BUFFER_SIZE=Byte.MAX_VALUE, MIN_READ_BUFFER_SIZE=MIN_BUFFER_SIZE, MIN_WRITE_BUFFER_SIZE=MIN_BUFFER_SIZE;
+    int IO_BUFFER_SIZE=32 * 1024, DEFAULT_READ_BUFFER_SIZE=IO_BUFFER_SIZE, DEFAULT_WRITE_BUFFER_SIZE=IO_BUFFER_SIZE;
+    long DEFAULT_WAIT_TIMEOUT=TimeUnit.SECONDS.toMillis(30L);
+
+    /**
+     * Property that can be used on the {@link org.apache.sshd.common.FactoryManager}
+     * to control the internal timeout used by the client to open a channel.
+     * If not specified then {@link #DEFAULT_CHANNEL_OPEN_TIMEOUT} value
+     * is used
+     */
+    String SFTP_CHANNEL_OPEN_TIMEOUT = "sftp-channel-open-timeout";
+        long DEFAULT_CHANNEL_OPEN_TIMEOUT = DEFAULT_WAIT_TIMEOUT;
+
+    InputStream read(String path) throws IOException;
+    InputStream read(String path, int bufferSize) throws IOException;
+    InputStream read(String path, OpenMode ... mode) throws IOException;
+    InputStream read(String path, int bufferSize, OpenMode ... mode) throws IOException;
+    InputStream read(String path, Collection<OpenMode> mode) throws IOException;
+    InputStream read(String path, int bufferSize, Collection<OpenMode> mode) throws IOException;
+
+    OutputStream write(String path) throws IOException;
+    OutputStream write(String path, int bufferSize) throws IOException;
+    OutputStream write(String path, OpenMode ... mode) throws IOException;
+    OutputStream write(String path, int bufferSize, OpenMode ... mode) throws IOException;
+    OutputStream write(String path, Collection<OpenMode> mode) throws IOException;
+    OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/a6e2bf9e/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileChannel.java
new file mode 100644
index 0000000..a5ba947
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/SftpFileChannel.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import static org.apache.sshd.common.subsystem.sftp.SftpConstants.SSH_FX_LOCK_CONFLICT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.sshd.client.SftpException;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+
+public class SftpFileChannel extends FileChannel {
+
+    private final SftpPath p;
+    private final Collection<SftpClient.OpenMode> modes;
+    private final SftpClient sftp;
+    private final SftpClient.CloseableHandle handle;
+    private final Object lock = new Object();
+    private volatile long pos;
+    private volatile Thread blockingThread;
+
+    public SftpFileChannel(SftpPath p, Collection<SftpClient.OpenMode> modes) throws IOException {
+        this.p = ValidateUtils.checkNotNull(p, "No target path", GenericUtils.EMPTY_OBJECT_ARRAY);
+        this.modes = ValidateUtils.checkNotNull(modes, "No channel modes specified", GenericUtils.EMPTY_OBJECT_ARRAY);
+        
+        SftpFileSystem  fs=p.getFileSystem();
+        sftp = fs.getClient();
+        handle = sftp.open(p.toString(), modes);
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        return (int) doRead(Collections.singletonList(dst), -1);
+    }
+
+    @Override
+    public int read(ByteBuffer dst, long position) throws IOException {
+        if (position < 0) {
+            throw new IllegalArgumentException("read(" + p + ") illegal position to read from: " + position);
+        }
+        return (int) doRead(Collections.singletonList(dst), position);
+    }
+
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+        List<ByteBuffer> buffers = Arrays.asList(dsts).subList(offset, offset + length);
+        return doRead(buffers, -1);
+    }
+
+    public static final Set<SftpClient.OpenMode> READ_MODES=
+            Collections.unmodifiableSet(EnumSet.of(SftpClient.OpenMode.Read));
+
+    protected long doRead(List<ByteBuffer> buffers, long position) throws IOException {
+        ensureOpen(READ_MODES);
+        synchronized (lock) {
+            boolean completed = false;
+            boolean eof = false;
+            long curPos = position >= 0 ? position : pos;
+            try {
+                long totalRead = 0;
+                beginBlocking();
+                loop:
+                for (ByteBuffer buffer : buffers) {
+                    while (buffer.remaining() > 0) {
+                        ByteBuffer wrap = buffer;
+                        if (!buffer.hasArray()) {
+                            wrap = ByteBuffer.allocate(Math.min(8192, buffer.remaining()));
+                        }
+                        int read = sftp.read(handle, curPos, wrap.array(), wrap.arrayOffset() + wrap.position(), wrap.remaining());
+                        if (read > 0) {
+                            if (wrap == buffer) {
+                                wrap.position(wrap.position() + read);
+                            } else {
+                                buffer.put(wrap.array(), wrap.arrayOffset(), read);
+                            }
+                            curPos += read;
+                            totalRead += read;
+                        } else {
+                            eof = read == -1;
+                            break loop;
+                        }
+                    }
+                }
+                completed = true;
+                return totalRead > 0 ? totalRead : eof ? -1 : 0;
+            } finally {
+                if (position < 0) {
+                    pos = curPos;
+                }
+                endBlocking(completed);
+            }
+        }
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        return (int) doWrite(Collections.singletonList(src), -1);
+    }
+
+    @Override
+    public int write(ByteBuffer src, long position) throws IOException {
+        if (position < 0) {
+            throw new IllegalArgumentException("write(" + p + ") illegal position to write to: " + position);
+        }
+        return (int) doWrite(Collections.singletonList(src), position);
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+        List<ByteBuffer> buffers = Arrays.asList(srcs).subList(offset, offset + length);
+        return doWrite(buffers, -1);
+    }
+
+    public static final Set<SftpClient.OpenMode> WRITE_MODES=
+            Collections.unmodifiableSet(
+                EnumSet.of(SftpClient.OpenMode.Write, SftpClient.OpenMode.Append, SftpClient.OpenMode.Create, SftpClient.OpenMode.Truncate));
+
+    protected long doWrite(List<ByteBuffer> buffers, long position) throws IOException {
+        ensureOpen(WRITE_MODES);
+        synchronized (lock) {
+            boolean completed = false;
+            long curPos = position >= 0 ? position : pos;
+            try {
+                long totalWritten = 0;
+                beginBlocking();
+                for (ByteBuffer buffer : buffers) {
+                    while (buffer.remaining() > 0) {
+                        ByteBuffer wrap = buffer;
+                        if (!buffer.hasArray()) {
+                            wrap = ByteBuffer.allocate(Math.min(8192, buffer.remaining()));
+                            buffer.get(wrap.array(), wrap.arrayOffset(), wrap.remaining());
+                        }
+                        int written = wrap.remaining();
+                        sftp.write(handle, curPos, wrap.array(), wrap.arrayOffset() + wrap.position(), written);
+                        if (wrap == buffer) {
+                            wrap.position(wrap.position() + written);
+                        }
+                        curPos += written;
+                        totalWritten += written;
+                    }
+                }
+                completed = true;
+                return totalWritten;
+            } finally {
+                if (position < 0) {
+                    pos = curPos;
+                }
+                endBlocking(completed);
+            }
+        }
+    }
+
+    @Override
+    public long position() throws IOException {
+        ensureOpen(Collections.<SftpClient.OpenMode>emptySet());
+        return pos;
+    }
+
+    @Override
+    public FileChannel position(long newPosition) throws IOException {
+        if (newPosition < 0) {
+            throw new IllegalArgumentException("position(" + p + ") illegal file channel position: " + newPosition);
+        }
+
+        ensureOpen(Collections.<SftpClient.OpenMode>emptySet());
+        synchronized (lock) {
+            pos = newPosition;
+            return this;
+        }
+    }
+
+    @Override
+    public long size() throws IOException {
+        ensureOpen(Collections.<SftpClient.OpenMode>emptySet());
+        return sftp.stat(handle).size;
+    }
+
+    @Override
+    public FileChannel truncate(long size) throws IOException {
+        ensureOpen(Collections.<SftpClient.OpenMode>emptySet());
+        sftp.setStat(handle, new SftpClient.Attributes().size(size));
+        return this;
+    }
+
+    @Override
+    public void force(boolean metaData) throws IOException {
+        ensureOpen(Collections.<SftpClient.OpenMode>emptySet());
+    }
+
+    @Override
+    public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
+        if ((position < 0) || (count < 0)) {
+            throw new IllegalArgumentException("transferTo(" + p + ") illegal position (" + position + ") or count (" + count + ")");
+        }
+        ensureOpen(READ_MODES);
+        synchronized (lock) {
+            boolean completed = false;
+            boolean eof = false;
+            long curPos = position;
+            try {
+                beginBlocking();
+
+                int bufSize = (int) Math.min(count, 32768);
+                byte[] buffer = new byte[bufSize];
+                long totalRead = 0L;
+                while (totalRead < count) {
+                    int read = sftp.read(handle, curPos, buffer, 0, buffer.length);
+                    if (read > 0) {
+                        ByteBuffer wrap = ByteBuffer.wrap(buffer);
+                        while (wrap.remaining() > 0) {
+                            target.write(wrap);
+                        }
+                        curPos += read;
+                        totalRead += read;
+                    } else {
+                        eof = read == -1;
+                    }
+                }
+                completed = true;
+                return totalRead > 0 ? totalRead : eof ? -1 : 0;
+            } finally {
+                endBlocking(completed);
+            }
+        }
+    }
+
+    @Override
+    public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
+        if ((position < 0) || (count < 0)) {
+            throw new IllegalArgumentException("transferFrom(" + p + ") illegal position (" + position + ") or count (" + count + ")");
+        }
+        ensureOpen(WRITE_MODES);
+
+        synchronized(lock) {
+            boolean completed = false;
+            long curPos = position >= 0 ? position : pos;
+            try {
+                long totalRead = 0;
+                beginBlocking();
+
+                byte[] buffer = new byte[32768];
+                while (totalRead < count) {
+                    ByteBuffer wrap = ByteBuffer.wrap(buffer, 0, (int) Math.min(buffer.length, count - totalRead));
+                    int read = src.read(wrap);
+                    if (read > 0) {
+                        sftp.write(handle, curPos, buffer, 0, read);
+                        curPos += read;
+                        totalRead += read;
+                    } else {
+                        break;
+                    }
+                }
+                completed = true;
+                return totalRead;
+            } finally {
+                endBlocking(completed);
+            }
+        }
+    }
+
+    @Override
+    public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
+        throw new UnsupportedOperationException("map(" + p + ")[" + mode + "," + position + "," + size + "] N/A");
+    }
+
+    @Override
+    public FileLock lock(long position, long size, boolean shared) throws IOException {
+        return tryLock(position, size, shared);
+    }
+
+    @Override
+    public FileLock tryLock(final long position, final long size, boolean shared) throws IOException {
+        ensureOpen(Collections.<SftpClient.OpenMode>emptySet());
+
+        try {
+            sftp.lock(handle, position, size, 0);
+        } catch (SftpException e) {
+            if (e.getStatus() == SSH_FX_LOCK_CONFLICT) {
+                throw new OverlappingFileLockException();
+            }
+            throw e;
+        }
+
+        return new FileLock(this, position, size, shared) {
+            private final AtomicBoolean valid = new AtomicBoolean(true);
+
+            @Override
+            public boolean isValid() {
+                return acquiredBy().isOpen() && valid.get();
+            }
+
+            @SuppressWarnings("synthetic-access")
+            @Override
+            public void release() throws IOException {
+                if (valid.compareAndSet(true, false)) {
+                    sftp.unlock(handle, position, size);
+                }
+            }
+        };
+    }
+
+    @Override
+    protected void implCloseChannel() throws IOException {
+        try {
+            final Thread thread = blockingThread;
+            if (thread != null) {
+                thread.interrupt();
+            }
+        } finally {
+            try {
+                handle.close();
+            } finally {
+                sftp.close();
+            }
+        }
+    }
+
+    private void beginBlocking() {
+        begin();
+        blockingThread = Thread.currentThread();
+    }
+
+    private void endBlocking(boolean completed) throws AsynchronousCloseException {
+        blockingThread = null;
+        end(completed);
+    }
+
+    /**
+     * Checks that the channel is open and that its current mode contains
+     * at least one of the required ones
+     * @param reqModes The required modes - ignored if {@code null}/empty
+     * @throws IOException If channel not open or the required modes are not
+     * satisfied
+     */
+    private void ensureOpen(Collection<SftpClient.OpenMode> reqModes) throws IOException {
+        if (!isOpen()) {
+            throw new ClosedChannelException();
+        }
+        
+        if (GenericUtils.size(reqModes) > 0) {
+            for (SftpClient.OpenMode m : reqModes) {
+                if (this.modes.contains(m)) {
+                    return;
+                }
+            }
+            
+            throw new IOException("ensureOpen(" + p + ") current channel modes (" + this.modes + ") do contain any of the required: " + reqModes);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toString(p);
+    }
+}